Mein Problem ist, dass mein pyspark Job ist nicht läuft parallel.Dataproc Pyspark Job läuft nur auf einem Knoten
-Code und Datenformat:
Mein PySpark sieht ungefähr so aus (vereinfacht, natürlich):
class TheThing:
def __init__(self, dInputData, lDataInstance):
# ...
def does_the_thing(self):
"""About 0.01 seconds calculation time per row"""
# ...
return lProcessedData
#contains input data pre-processed from other RDDs
#done like this because one RDD cannot work with others inside its transformation
#is about 20-40MB in size
#everything in here loads and processes from BigQuery in about 7 minutes
dInputData = {'dPreloadedData': dPreloadedData}
#rddData contains about 3M rows
#is about 200MB large in csv format
#rddCalculated is about the same size as rddData
rddCalculated = (
rddData
.map(
lambda l, dInputData=dInputData: TheThing(dInputData, l).does_the_thing()
)
)
llCalculated = rddCalculated.collect()
#save as csv, export to storage
Laufen auf Dataproc Cluster:
Cluster wird über die Dataproc UI erstellt.
Job wird wie folgt ausgeführt:
gcloud --project <project> dataproc jobs submit pyspark --cluster <cluster_name> <script.py>
Ich beobachtete den Jobstatus über die Benutzeroberfläche, started like this. Beim Durchstöbern habe ich festgestellt, dass nur ein (scheinbar zufälliger) meiner Worker-Knoten irgendetwas getan hat. Alle anderen waren völlig untätig.
Der ganze Punkt von PySpark ist, dieses Ding parallel zu führen, und ist offensichtlich nicht der Fall. Ich habe diese Daten in allen Arten von Cluster-Konfigurationen ausgeführt, von denen die letzte massiv ist. Und deshalb dauert es zu lange, bis meine Aufgaben abgeschlossen sind, und die Zeit scheint unabhängig von der Clustergröße zu sein.
Alle Tests mit kleineren Datensätzen gehen ohne Probleme auf meinem lokalen Rechner und im Cluster durch. Ich muss wirklich nur upscale.
EDIT
I
geändert llCalculated = rddCalculated.collect()
#... save to csv and export
zu
rddCalculated.saveAsTextFile("gs://storage-bucket/results")
und nur ein Knoten immer noch die Arbeit machen.
so die Repartition Nummer 'repartition (x)', sollte das mit der Anzahl der Knoten oder CP gleich sein Uns? – Roman
Scheint 'x' ist die maximale Anzahl an Kernen, für die jeder Knoten Speicher hat. Danke @Dennis. Held der Woche! – Roman
Sie können also die Neuverteilung aggressiver fortsetzen, solange jede Partition mehr als ein paar Sekunden benötigt, um verarbeitet zu werden. Wenn Sie mehr Partitionen als 'number_of_nodes * cores_per_node' haben, ist das eigentlich ein okay-Muster, es bedeutet nur, dass Ihre Worker durch diese Partitionen in mehr als 1" Welle "tuckern. So ist die Standard-Praxis entweder Partitionen gleich der Gesamtzahl der verfügbaren Kerne (um alles in 1 Welle zu haben) oder um ein Vielfaches der Gesamtzahl der Kerne zu machen. Mit weiteren Partitionen können Sie die Mitarbeiter bei Bedarf auch in der Mitte vergrößern. –