2016-08-04 37 views
2

Ich habe große CSV-Dateien mit einer Größe von mehr als 10 mb und über 50+ solche Dateien. Diese Eingänge haben mehr als 25 Spalten und mehr als 50.000 Zeilen.pandas.io.common.CParserError: Fehler beim Token von Daten. C-Fehler: Pufferüberlauf abgefangen - mögliche fehlerhafte Eingabedatei

Alle diese haben gleiche Header und ich versuche, sie zu einem csv mit Headern zusammenzufassen, um nur einmal erwähnt zu werden.

Option: One Code: Arbeiten für kleine csv - 25+ Spalten aber Größe der Datei in kbs.

import pandas as pd 
import glob 

interesting_files = glob.glob("*.csv") 
df_list = [] 
for filename in sorted(interesting_files): 
    df_list.append(pd.read_csv(filename)) 

full_df = pd.concat(df_list) 

full_df.to_csv('output.csv') 

Aber der obige Code funktioniert nicht für die größeren Dateien und gibt den Fehler.

Fehler:

Traceback (most recent call last): 
    File "merge_large.py", line 6, in <module> 
    all_files = glob.glob("*.csv", encoding='utf8', engine='python')  
TypeError: glob() got an unexpected keyword argument 'encoding' 
[email protected]:~/Desktop/Twitter_Lat_lon/nasik_rain/rain_2$ python merge_large.py 
Traceback (most recent call last): 
    File "merge_large.py", line 10, in <module> 
    df = pd.read_csv(file_,index_col=None, header=0) 
    File "/usr/local/lib/python2.7/dist-packages/pandas/io/parsers.py", line 562, in parser_f 
    return _read(filepath_or_buffer, kwds) 
    File "/usr/local/lib/python2.7/dist-packages/pandas/io/parsers.py", line 325, in _read 
    return parser.read() 
    File "/usr/local/lib/python2.7/dist-packages/pandas/io/parsers.py", line 815, in read 
    ret = self._engine.read(nrows) 
    File "/usr/local/lib/python2.7/dist-packages/pandas/io/parsers.py", line 1314, in read 
    data = self._reader.read(nrows) 
    File "pandas/parser.pyx", line 805, in pandas.parser.TextReader.read (pandas/parser.c:8748) 
    File "pandas/parser.pyx", line 827, in pandas.parser.TextReader._read_low_memory (pandas/parser.c:9003) 
    File "pandas/parser.pyx", line 881, in pandas.parser.TextReader._read_rows (pandas/parser.c:9731) 
    File "pandas/parser.pyx", line 868, in pandas.parser.TextReader._tokenize_rows (pandas/parser.c:9602) 
    File "pandas/parser.pyx", line 1865, in pandas.parser.raise_parser_error (pandas/parser.c:23325) 
pandas.io.common.CParserError: Error tokenizing data. C error: Buffer overflow caught - possible malformed input file. 

Code: Columns 25+, aber die Größe der Datei mehr als 10 MB

Option: Two Option: Three

Option: Vier

import pandas as pd 
import glob 

    interesting_files = glob.glob("*.csv") 
    df_list = [] 
    for filename in sorted(interesting_files): 
     df_list.append(pd.read_csv(filename)) 

    full_df = pd.concat(df_list) 

    full_df.to_csv('output.csv') 

Fehler:

Ich habe ausgiebig gesucht, aber ich bin nicht in der Lage, eine Lösung zu finden, große csv-Dateien mit gleichen Headern in eine Datei zu verketten.

Edit:

Code:

import dask.dataframe as dd 

ddf = dd.read_csv('*.csv') 

ddf.to_csv('master.csv',index=False) 

Fehler:

Traceback (most recent call last): 
    File "merge_csv_dask.py", line 5, in <module> 
    ddf.to_csv('master.csv',index=False) 
    File "/usr/local/lib/python2.7/dist-packages/dask/dataframe/core.py", line 792, in to_csv 
    return to_csv(self, filename, **kwargs) 
    File "/usr/local/lib/python2.7/dist-packages/dask/dataframe/io.py", line 762, in to_csv 
    compute(*values) 
    File "/usr/local/lib/python2.7/dist-packages/dask/base.py", line 179, in compute 
    results = get(dsk, keys, **kwargs) 
    File "/usr/local/lib/python2.7/dist-packages/dask/threaded.py", line 58, in get 
    **kwargs) 
    File "/usr/local/lib/python2.7/dist-packages/dask/async.py", line 481, in get_async 
    raise(remote_exception(res, tb)) 
dask.async.ValueError: could not convert string to float: {u'type': u'Point', u'coordinates': [4.34279, 50.8443]} 

Traceback 
--------- 
    File "/usr/local/lib/python2.7/dist-packages/dask/async.py", line 263, in execute_task 
    result = _execute_task(task, data) 
    File "/usr/local/lib/python2.7/dist-packages/dask/async.py", line 245, in _execute_task 
    return func(*args2) 
    File "/usr/local/lib/python2.7/dist-packages/dask/dataframe/csv.py", line 49, in bytes_read_csv 
    coerce_dtypes(df, dtypes) 
    File "/usr/local/lib/python2.7/dist-packages/dask/dataframe/csv.py", line 73, in coerce_dtypes 
    df[c] = df[c].astype(dtypes[c]) 
    File "/usr/local/lib/python2.7/dist-packages/pandas/core/generic.py", line 2950, in astype 
    raise_on_error=raise_on_error, **kwargs) 
    File "/usr/local/lib/python2.7/dist-packages/pandas/core/internals.py", line 2938, in astype 
    return self.apply('astype', dtype=dtype, **kwargs) 
    File "/usr/local/lib/python2.7/dist-packages/pandas/core/internals.py", line 2890, in apply 
    applied = getattr(b, f)(**kwargs) 
    File "/usr/local/lib/python2.7/dist-packages/pandas/core/internals.py", line 434, in astype 
    values=values, **kwargs) 
    File "/usr/local/lib/python2.7/dist-packages/pandas/core/internals.py", line 477, in _astype 
    values = com._astype_nansafe(values.ravel(), dtype, copy=True) 
    File "/usr/local/lib/python2.7/dist-packages/pandas/core/common.py", line 1920, in _astype_nansafe 
    return arr.astype(dtype 

)

Antwort

3

Wenn ich Ihr Problem zu verstehen, Sie haben große CSV-Dateien mit der gleichen Struktur dass Sie in eine große CSV-Datei zusammenführen möchten.

Mein Vorschlag ist, dask von Continuum Analytics zu verwenden, um diesen Job zu verarbeiten. Sie können Ihre Dateien zusammenführen, aber auch Out-of-Core-Berechnungen und Analysen der Daten durchführen, genau wie Pandas.

### make sure you include the [complete] tag 
pip install dask[complete] 

Lösung Ihrer Beispieldaten aus DropBox Mit

Zunächst überprüft Versionen von dask. Für mich dask = 0.11.0 und Pandas = 0.18.1

import dask 
import pandas as pd 
print (dask.__version__) 
print (pd.__version__) 

Hier ist der Code in alle Ihre CSVs zu lesen. Ich hatte keine Fehler bei der Verwendung Ihrer DropBox-Beispieldaten.

import dask.dataframe as dd 
from dask.delayed import delayed 
import dask.bag as db 
import glob 

filenames = glob.glob('/Users/linwood/Downloads/stack_bundle/rio*.csv') 

''' 
The key to getting around the CParse error was using sep=None 
Came from this post 
http://stackoverflow.com/questions/37505577/cparsererror-error-tokenizing-data 
''' 

# custom saver function for dataframes using newfilenames 
def reader(filename): 
    return pd.read_csv(filename,sep=None) 

# build list of delayed pandas csv reads; then read in as dask dataframe 

dfs = [delayed(reader)(fn) for fn in filenames] 
df = dd.from_delayed(dfs) 


''' 
This is the final step. The .compute() code below turns the 
dask dataframe into a single pandas dataframe with all your 
files merged. If you don't need to write the merged file to 
disk, I'd skip this step and do all the analysis in 
dask. Get a subset of the data you want and save that. 
''' 
df = df.reset_index().compute() 
df.to_csv('./test.csv') 

Der Rest ist das Extramaterial

# print the count of values in each column; perfect data would have the same count 
# you have dirty data as the counts will show 

print (df.count().compute()) 

Der nächste Schritt einige Pandas artige Analyse tut. Hier ist ein Code von mir, der zuerst Ihre Daten für die 'tweetFavoriteCt'-Spalte "bereinigt". Alle Daten sind keine ganze Zahl, also ersetze ich Zeichenfolgen durch "0" und wandle alles andere in eine Ganzzahl um. Sobald ich die ganze Zahl Umwandlung erhalten, zeigen, dass ich eine einfache analytische wo ich die gesamte Datenrahmen filtern, um nur die Zeilen zu umfassen, in denen die favoriteCt größer als 3

# function to convert numbers to integer and replace string with 0; sample analytics in dask dataframe 
# you can come up with your own..this is just for an example 
def conversion(value): 
    try: 
     return int(value) 
    except: 
     return int(0) 

# apply the function to the column, create a new column of cleaned data 
clean = df['tweetFavoriteCt'].apply(lambda x: (conversion(x)),meta=('stuff',str)) 

# set new column equal to our cleaning code above; your data is dirty :-(
df['cleanedFavoriteCt'] = clean 

Letzte Stück Code zeigt Analyse dask und wie diese zu laden fusionierte Datei in Pandas und schreiben Sie auch die zusammengeführte Datei auf die Festplatte. Seien Sie gewarnt, wenn Sie Tonnen von CSVs haben, wenn Sie den .compute() Code unten verwenden, wird es diesen zusammengeführten CSV in den Speicher laden.

# retreive the 50 tweets with the highest favorite count 
print(df.nlargest(50,['cleanedFavoriteCt']).compute()) 

# only show me the tweets that have been favorited at least 3 times 
# TweetID 763525237166268416, is VERRRRY popular....7000+ favorites 
print((df[df.cleanedFavoriteCt.apply(lambda x: x>3,meta=('stuff',str))]).compute()) 

''' 
This is the final step. The .compute() code below turns the 
dask dataframe into a single pandas dataframe with all your 
files merged. If you don't need to write the merged file to 
disk, I'd skip this step and do all the analysis in 
dask. Get a subset of the data you want and save that. 
''' 
df = df.reset_index().compute() 
df.to_csv('./test.csv') 

Nun, wenn Sie Pandas für die fusionierte CSV-Datei wechseln möchten:

import pandas as pd 
dff = pd.read_csv('./test.csv') 

Lassen Sie mich wissen, ob das funktioniert.

Stop here

ARCHIV: Bisherige Lösung; Gut zum Beispiel der Verwendung von DASK zum Zusammenführen von CSVs

Der erste Schritt ist sicherzustellen, dass Sie dask installiert haben. Es gibt install instructions for dask in the documentation page, aber das sollte funktionieren:

Mit dask installiert ist es einfach zu lesen in den Dateien.

Einige Haushalt zuerst. Angenommen, wir haben ein Verzeichnis mit csvs, in dem die Dateinamen my18.csv, my19.csv, my20.csv usw. lauten. Die Standardisierung von Namen und die Lokalisierung einzelner Verzeichnisse sind der Schlüssel. Dies funktioniert, wenn Sie Ihre CSV-Dateien in einem Verzeichnis ablegen und die Namen irgendwie serialisieren.

In den Schritten:

  1. Import dask, lesen Sie alle CSV-Dateien in Platzhalter verwenden. Dies führt alle csvs zu einem einzigen Objekt zusammen. Sie können Pandas-ähnlichen Betrieb sofort nach diesem Schritt tun, wenn Sie möchten.
import dask.dataframe as dd 
ddf = dd.read_csv('./daskTest/my*.csv') 
ddf.describe().compute() 
  1. schreiben Datenrahmen-Datei auf die Festplatte im selben Verzeichnis wie die Originaldateien zusammengefügt und nennen Sie es master.csv
ddf.to_csv('./daskTest/master.csv',index=False) 
  1. Optional, lesen Sie master.csv, eine viel größere Größe, in dasask.dataframe Objekt für Berechnungen. Dies kann auch nach dem ersten Schritt erfolgen; dask kann Pandas wie Operationen auf den gestaffelten Dateien ausführen ...dies ist ein Weg, "Big Data" in Python
# reads in the merged file as one BIG out-of-core dataframe; can perform functions like pangas  
newddf = dd.read_csv('./daskTest/master.csv') 

#check the length; this is now length of all merged files. in this example, 50,000 rows times 11 = 550000 rows. 
len(newddf) 

# perform pandas-like summary stats on entire dataframe 
newddf.describe().compute() 

Hoffentlich hilft die Antwort auf Ihre Frage zu tun. In drei Schritten lesen Sie alle Dateien ein, verschmelzen zu einem einzelnen Datenrahmen und schreiben diesen massiven Datenrahmen mit nur einem Header und all Ihren Zeilen auf die Festplatte.

+0

Vielen Dank für die detaillierte Erklärung. Das ist sehr informativ. Lassen Sie mich den Code ausprobieren und werde Sie wissen lassen, wenn ich irgendwelche Zweifel habe. Nochmals vielen Dank :) –

+0

@SitzBlogz, nur überprüft, ob es funktioniert hat. – Linwoodc3

+0

Entschuldigung für die späte Antwort. Ich bekomme einen Fehler, und ich habe den Code und den Fehler in den Bearbeitungsbereich eingefügt. Bitte könnten Sie nachsehen. –