2014-11-20 1 views
15

Wenn die Anzahl der Partitionen verringert werden soll, kann coalesce verwendet werden, was großartig ist, da es kein Shuffle verursacht und augenblicklich sofort funktioniert (erfordert keine zusätzliche Job-Stufe).Spark: Anzahl der Partitionen erhöhen, ohne einen Shuffle zu verursachen?

Ich möchte manchmal das Gegenteil tun, aber repartition induziert einen Shuffle. Ich denke, dass ich vor ein paar Monaten funktionierte, indem ich CoalescedRDD mit balanceSlack = 1.0 verwendete - was passieren würde, wäre es eine Partition teilen würde, so dass die resultierenden Partitionen wo alle auf dem gleichen Knoten (so klein net IO).

Diese Art der Funktionalität ist in Hadoop automatisch, man optimiert nur die Split-Größe. Es scheint in Spark nicht so zu funktionieren, es sei denn, man verringert die Anzahl der Partitionen. Ich denke, die Lösung könnte sein, einen benutzerdefinierten Partitionierer zusammen mit einer benutzerdefinierten RDD zu schreiben, wo wir getPreferredLocations definieren ... aber ich dachte, das ist so eine einfache und übliche Sache, um sicher zu tun, muss es eine geradlinige Art, es zu tun?

Dinge versucht:

.set("spark.default.parallelism", partitions) auf meinem SparkConf, und wenn sie im Kontext des Lesens Parkett habe ich sqlContext.sql("set spark.sql.shuffle.partitions= ... versucht, die auf 1.0.0 einen Fehler verursacht und nicht wirklich wollen, ich will, ich will Partition Anzahl, die für alle Arten von Jobs geändert werden soll, nicht nur für das Mischen.

+0

Haben Sie Glück, eine Lösung dafür zu finden? – nbubis

Antwort

0

Ich verstehe nicht genau, was Ihr Punkt ist. Meinst du, du hast jetzt 5 Partitionen, aber nach der nächsten Operation wollen Sie Daten auf 10 verteilt? Denn 10 zu haben, aber immer noch 5 zu verwenden macht wenig Sinn ... Der Prozess des Sendens von Daten an neue Partitionen muss irgendwann passieren.

Wenn coalesce tun, können Sie von unsued Partitionen entfernen, zum Beispiel: Wenn Sie zunächst 100 haben, dann aber nach reduceByKey Sie bekam 10 (wie dort, wo nur 10 Tasten), können Sie coalesce einstellen.

Wenn Sie den Prozess wollen den anderen Weg zu gehen, können Sie nur eine Art von Partitionierung erzwingen:

[RDD].partitionBy(new HashPartitioner(100)) 

Ich bin nicht sicher, dass das, was Sie suchen, aber die Hoffnung so.

+3

Jede Partition hat eine Position, d. H. Einen Knoten, angenommen, ich habe 5 Partitionen und 5 Knoten. Wenn ich "repartition" oder Ihren Code auf 10 Partitionen anwähle, werden die Daten gemischt - dh die Daten für jeden der 5 Knoten können über das Netzwerk auf andere Knoten übertragen werden. Was ich möchte, ist, dass Spark jede Partition in 2 teilt, ohne irgendwelche Daten zu verschieben - das passiert in Hadoop, wenn man Split-Einstellungen optimiert. – samthebest

+0

Ich bin mir nicht sicher, ob Sie es schaffen können. Ich nehme an, dass Sie eine Art ".forEachNode" -Funktion benötigen würden. Aber so etwas habe ich noch nie gesehen. Und ich bin mir nicht sicher, ob es leicht umgesetzt werden kann. Der Partitionierer muss jedes Mal dieselbe Partition für das gleiche Objekt zurückgeben. Standardmäßig verwendet Spark 'HashPartitioner', die ** hashCode modulo number_of_partitions **. Wenn Sie Daten einfach in zwei neue Partitionen aufteilen, würden sie definitiv nicht an ihren Orten landen. Deshalb ist Shuffle notwendig. Vielleicht, wenn Sie Ihren eigenen Partitionierer haben, könnte es die Anzahl der Partitionen erhöhen, ohne über das Netz zu schlurfen. – szefuf