2016-05-26 6 views
3

Ich habe einen großen Datenrahmen, den ich mit 800 Partitionen erstellt habe.Die Verwendung von dropDuplicates im Datenframe verursacht Änderungen in der Partitionsnummer

df.rdd.getNumPartitions() 
800 

Als ich dropDuplicates auf dem Datenrahmen verwenden, ändert es die Partitionen für mich 200

df = df.dropDuplicates() 
df.rdd.getNumPartitions() 
200 

Dieses Verhalten führt Problem auf dem Standard, wie es aus der Erinnerung führen wird.

Haben Sie Vorschläge zur Behebung dieses Problems? Ich habe versucht, spark.sql.shuffle.partition auf 800 zu setzen, aber es funktioniert nicht. Dank

+0

Mögliches Duplikat von [Entfernen von Duplikaten aus Zeilen basierend auf bestimmten Spalten in einem RDD/Spark DataFrame] (http://stackoverflow.com/questions/30248221/removing-duplicates-from-rows-based-on-specific-columns -in-rdd-spark-datafram) – eliasah

Antwort

4

Dies passiert, weil dropDuplicates einen Shuffle erfordert. Wenn Sie eine bestimmte Anzahl von Partitionen erhalten möchten, sollten Sie spark.sql.shuffle.partitions gesetzt (der Standardwert ist 200)

df = sc.parallelize([("a", 1)]).toDF() 
df.rdd.getNumPartitions() 
## 8 

df.dropDuplicates().rdd.getNumPartitions() 
## 200 

sqlContext.setConf("spark.sql.shuffle.partitions", "800") 

df.dropDuplicates().rdd.getNumPartitions() 
## 800 

Ein alternativer Ansatz (Spark-1.6+) zunächst zu partitionieren ist:

df.repartition(801, *df.columns).dropDuplicates().rdd.getNumPartitions() 
## 801 

Es ist etwas flexibler, aber weniger effizient, da keine lokale Aggregation durchgeführt wird.

+1

Danke. Ich habe bemerkt, dass mein Fehler darin besteht, das letzte Zeichen in der spark.sql.shuffle.partition zu verwerfen. – Michael