2015-12-03 9 views
6

Ich habe ein Python-Skript, das Folgendes tut: i. die eine Eingabedatei mit Daten verwendet (normalerweise geschachteltes JSON-Format) ii. übergibt die Daten Zeile für Zeile an eine andere Funktion, die die Daten in das gewünschte Format manipuliert. iii. und schließlich schreibt es die Ausgabe in eine Datei.die Tasche nicht alle Kerne verwenden? Alternativen?

Hier ist meine aktuelle einfache Python Linie, das dies tut ...

def manipulate(line): 
    # a pure python function which transforms the data 
    # ... 
    return manipulated_json 

for line in f: 
    components.append(manipulate(ujson.loads(line))) 
    write_to_csv(components)` 

Dies funktioniert, aber mit dem Python GIL es zu einem Kern auf dem Server zu begrenzen, ist es sehr langsam, vor allem mit großen Mengen an Daten.

Die Menge der Daten, mit denen ich normalerweise zu tun habe, ist etwa 4 Gigs gzip komprimiert, aber gelegentlich muss ich Daten verarbeiten, die Hunderte von Gigs gzip komprimiert ist. Es ist nicht unbedingt Big Data, kann aber immer noch nicht alle im Speicher verarbeitet werden und mit Pythons GIL ist sehr langsam zu verarbeiten.

Auf der Suche nach einer Lösung zur Optimierung unserer Datenverarbeitung stieß ich auf dask. Während PySpark zu dieser Zeit die offensichtliche Lösung für mich zu sein schien, überzeugten mich die Versprechen von dask und seine Einfachheit und ich beschloss, es zu versuchen.

Nach viel Forschung in dask und wie man es benutzt, habe ich ein sehr kleines Skript zusammengestellt, um meinen aktuellen Prozess zu replizieren. Das Skript sieht wie folgt aus:

import dask.bag as bag 
import json 
bag.from_filenames('input.json.gz').map(json.loads).map(lambda x:manipulate(x)).concat().to_dataframe().to_csv('output.csv.gz')` 

Dies funktioniert und erzeugt die gleichen Ergebnisse wie meine ursprüngliche nicht-dask Skript, aber es noch verwendet nur eine CPU auf dem Server. Also hat es überhaupt nicht geholfen. In der Tat ist es langsamer.

Was mache ich falsch? Fehle ich etwas? Ich bin noch ziemlich neu zu dask, lass es mich wissen, wenn ich etwas übersehen habe oder wenn ich etwas ganz anderes machen sollte.

Gibt es auch Alternativen zu DASK für die Nutzung der vollen Kapazität des Servers (d. H. Alle CPUs) für was ich tun muss?

Danke,

T

+0

Hmm noch nie von 'dask gehört, wirklich interessant, danke. Haben Sie sich den Box-Standard 'Multiprocessing' angesehen? Es ist einfach (istisch), aber es funktioniert. –

+0

Fragen Sie vielleicht auf der [Blaze-Mailingliste] (https://groups.google.com/a/continuum.io/forum/#!forum/blaze-dev) nach. Dask ist relativ neu und in Bewegung und, wie ich gesehen habe, gab es bisher nur 20 StackOverflow-Fragen dazu, also können nicht sehr viele Leute Ihre Frage hier sehen und wissen genug, um zu helfen. – BrenBarn

+0

FWIW, ich abonniere dieses Tag, so dass immer jemand es beobachtet. Stackoverflow ist ein großartiger Ort für solche Fragen. – MRocklin

Antwort

2

Das Problem hier ist mit dask.dataframe.to_csv, die Sie auf Single-Core-Modus zwingt.

Ich empfehle, dask.bag zu verwenden, um Ihre Lese- und Bearbeitungsvorgänge durchzuführen und dann auf eine Reihe von CSV-Dateien parallel abzulegen. Das Dumping zu vielen CSV-Dateien ist viel einfacher zu koordinieren als das Dumping zu einer einzelnen CSV-Datei.

import dask.bag as bag 
import json 
b = bag.from_filenames('input.json.gz').map(json.loads).map(manipulate).concat() 
b.map(lambda t: ','.join(map(str, t)).to_textfiles('out.*.csv').compute() 

Es könnte auch ein Problem mit dem Versuch, eine einzelne GZIP Datei parallel zu lesen, aber die oben genannten sollten Sie loslegen.

+0

Vielen Dank @MRocklin! Es hat nicht funktioniert. Lol ... Aber dann habe ich die Eingabedatei in mehrere Teile geteilt und es hat funktioniert. Es scheint, dass es nur so viele CPUs verwendet, wie es eine Anzahl von Eingabedateien gibt. Irgendwelche Pläne, diese Funktionalität dynamisch zu machen, so dass Sie eine Eingabedatei und einen Beutel weitergeben können, teilen diese auf und verarbeiten sie parallel unter der Haube? – tamjd1

+0

dask.bag macht das jetzt, nur unvollkommen. Ein mögliches Problem hier ist, dass GZIP schlechte Unterstützung für den wahlfreien Zugriff hat. – MRocklin

0

Es scheint, dass Taschen nur so parallel sind wie die Anzahl der Partitionen, die sie haben.

Für mich ergibt

mybag=bag.from_filenames(filename, chunkbytes=1e7) 
mybag.npartitions 

laufen

, die das Problem gelöst und machte die Verarbeitung vollständig parallelizable.