2016-06-02 3 views
0

Mein Problem ist, dass mein pyspark Job ist nicht läuft parallel.Dataproc Pyspark Job läuft nur auf einem Knoten

-Code und Datenformat:
Mein PySpark sieht ungefähr so ​​aus (vereinfacht, natürlich):

class TheThing: 
    def __init__(self, dInputData, lDataInstance): 
     # ... 
    def does_the_thing(self): 
     """About 0.01 seconds calculation time per row""" 
     # ... 
     return lProcessedData 

#contains input data pre-processed from other RDDs 
#done like this because one RDD cannot work with others inside its transformation 
#is about 20-40MB in size 
#everything in here loads and processes from BigQuery in about 7 minutes 
dInputData = {'dPreloadedData': dPreloadedData} 

#rddData contains about 3M rows 
#is about 200MB large in csv format 
#rddCalculated is about the same size as rddData 
rddCalculated = (
    rddData 
     .map(
      lambda l, dInputData=dInputData: TheThing(dInputData, l).does_the_thing() 
     ) 
) 

llCalculated = rddCalculated.collect() 
#save as csv, export to storage 

Laufen auf Dataproc Cluster:
Cluster wird über die Dataproc UI erstellt.
Job wird wie folgt ausgeführt:
gcloud --project <project> dataproc jobs submit pyspark --cluster <cluster_name> <script.py>

Ich beobachtete den Jobstatus über die Benutzeroberfläche, started like this. Beim Durchstöbern habe ich festgestellt, dass nur ein (scheinbar zufälliger) meiner Worker-Knoten irgendetwas getan hat. Alle anderen waren völlig untätig.

Der ganze Punkt von PySpark ist, dieses Ding parallel zu führen, und ist offensichtlich nicht der Fall. Ich habe diese Daten in allen Arten von Cluster-Konfigurationen ausgeführt, von denen die letzte massiv ist. Und deshalb dauert es zu lange, bis meine Aufgaben abgeschlossen sind, und die Zeit scheint unabhängig von der Clustergröße zu sein.

Alle Tests mit kleineren Datensätzen gehen ohne Probleme auf meinem lokalen Rechner und im Cluster durch. Ich muss wirklich nur upscale.

EDIT
I
geändert llCalculated = rddCalculated.collect()
#... save to csv and export
zu
rddCalculated.saveAsTextFile("gs://storage-bucket/results")

und nur ein Knoten immer noch die Arbeit machen.

Antwort

2

Je nachdem, ob Sie rddData von GCS oder HDFS geladen haben, ist die standardmäßige Splitgröße wahrscheinlich entweder 64 MB oder 128 MB, was bedeutet, dass Ihr 200 MB-Datensatz nur 2-4 Partitionen hat. Spark tut dies, weil typische grundlegende Daten - parallele Tasks - Daten schnell genug durchlaufen, so dass 64 MB-128 MB eine Verarbeitungszeit von einigen zehn Sekunden bedeuten. Es macht also keinen Vorteil, sich in kleinere Partien aufzuteilen, da dann der Startup-Overhead dominieren würde.

In Ihrem Fall klingt es so, als wäre die Verarbeitungszeit pro MB viel höher, da Sie sich mit dem anderen Datensatz verbinden und vielleicht ziemlich schwere Berechnungen für jeden Datensatz durchführen. Sie werden also eine größere Anzahl von Partitionen haben wollen, sonst wird Spark, egal wie viele Knoten Sie haben, nicht in mehr als 2 bis 4 Arbeitseinheiten aufteilen (die wahrscheinlich auch auf einer einzelnen Maschine gepackt werden würden, wenn jede Maschine) hat mehrere Kerne).

Sie müssen also einfach repartition nennen:

rddCalculated = (
    rddData 
     .repartition(200) 
     .map(
      lambda l, dInputData=dInputData: TheThing(dInputData, l).does_the_thing() 
     ) 
) 

Oder die Neuaufteilung zu einer früheren Zeile hinzu:

rddData = rddData.repartition(200) 

Oder können Sie eine bessere Effizienz, wenn Sie bei Lesezeit neu partitionieren:

+0

so die Repartition Nummer 'repartition (x)', sollte das mit der Anzahl der Knoten oder CP gleich sein Uns? – Roman

+0

Scheint 'x' ist die maximale Anzahl an Kernen, für die jeder Knoten Speicher hat. Danke @Dennis. Held der Woche! – Roman

+0

Sie können also die Neuverteilung aggressiver fortsetzen, solange jede Partition mehr als ein paar Sekunden benötigt, um verarbeitet zu werden. Wenn Sie mehr Partitionen als 'number_of_nodes * cores_per_node' haben, ist das eigentlich ein okay-Muster, es bedeutet nur, dass Ihre Worker durch diese Partitionen in mehr als 1" Welle "tuckern. So ist die Standard-Praxis entweder Partitionen gleich der Gesamtzahl der verfügbaren Kerne (um alles in 1 Welle zu haben) oder um ein Vielfaches der Gesamtzahl der Kerne zu machen. Mit weiteren Partitionen können Sie die Mitarbeiter bei Bedarf auch in der Mitte vergrößern. –