2016-06-21 25 views
4

Dies ist der einfachste DataFrame, an den ich denken konnte. Ich benutze PySpark 1.6.1.Warum sammelt collect() auf einem DataFrame mit 1 Zeile 2000 Excectors?

So passt der Datenrahmen vollständig in den Speicher, hat keine Verweise auf irgendwelche Dateien und sieht für mich ziemlich trivial aus.

Doch wenn ich die Daten sammeln, verwendet es 2000 Testamentsvollstrecker:

df.collect() 

während collect werden 2000 Testamentsvollstrecker eingesetzt:

[Stage 2:===================================================>(1985 + 15)/2000] 

und dann die erwartete Ausgabe:

[Row(a=1, b=2)] 

Warum passiert das? Sollte der DataFrame nicht vollständig im Speicher des Treibers gespeichert sein?

Antwort

2

Also schaute ich in den Code ein bisschen, um herauszufinden, was los war. Es scheint, dass sqlContext.createDataFrame wirklich keinen Versuch unternimmt, vernünftige Parameterwerte basierend auf den Daten zu setzen.

Warum 2000 Aufgaben?

Spark verwendet 2000 Aufgaben, weil mein Datenrahmen 2000 Partitionen hatte. (Auch wenn es wie klar Unsinn scheint mehr Partitionen als Reihen haben.)

Dies kann gesehen werden:

>>> df.rdd.getNumPartitions() 
2000 

Warum die Datenrahmen haben 2000 Partitionen haben?

Das passiert, weil sqlContext.createDataFrame die Standardanzahl der Partitionen (2000 in meinem Fall) abschließt, unabhängig davon, wie die Daten organisiert sind oder wie viele Zeilen es hat.

Der Code Trail ist wie folgt.

In sql/context.py die sqlContext.createDataFrame Funktionsaufrufe (in diesem Beispiel):

rdd, schema = self._createFromLocal(data, schema) 

die wiederum ruft:

return self._sc.parallelize(data), schema 

Und die sqlContext.parallelize Funktion definiert in context.py:

numSlices = int(numSlices) if numSlices is not None else self.defaultParallelism 

Keine Überprüfung erfolgt auf der Anzahl der Zeilen, und es ist nicht möglich, die Anzahl der Slices von sqlContext.createDataFrame anzugeben.

Wie kann ich ändern, wie viele Partitionen der DataFrame hat?

Verwenden DataFrame.coalesce.

>>> smdf = df.coalesce(1) 
>>> smdf.rdd.getNumPartitions() 
1 
>>> smdf.explain() 
== Physical Plan == 
Coalesce 1 
+- Scan ExistingRDD[a#0L,b#1L] 
>>> smdf.collect() 
[Row(a=1, b=2)] 
0

Sie können die Anzahl der Executoren konfigurieren. In vielen Fällen benötigt Spark so viele Executoren wie verfügbar, und die Ausführungszeit ist viel schlechter als bei einer Beschränkung auf eine kleine Anzahl von Executoren.

conf = SparkConf() 
conf.set('spark.dynamicAllocation.enabled','true') 
conf.set('spark.dynamicAllocation.maxExecutors','32') 
+0

Ok .. aber warum wäre dies einer der vielen Fälle, in denen es viele Executoren verwendet? Es gibt keine verteilten Dateien und der DataFrame passt in den Speicher. Ich hätte erwartet, dass es 0 (oder vielleicht 1) Executor verwenden würde. – Corey

+0

Es kommt auf Ihre Sparkcon mehr als auf den Job selbst – marmouset

+0

Huh. Ich bin ein wenig geschockt, dass Spark nicht intelligenter über das Ressourcenmanagement ist. Ich hätte gedacht, dass es versucht, jede verteilte Datenpartition auf dem Knoten zu verarbeiten, der dem Speicherort der Partition am nächsten ist. Der Fall ohne verteilte Daten wäre also ziemlich trivial. Ich glaube, ich verstehe falsch, was Sparks Manager macht ... – Corey