2016-02-04 7 views
11

In der Dokumentation von Spark heißt es, dass die RDDs-Methode reduce eine assoziative UND-kommutative Binärfunktion benötigt.Spark: Unterschied der Semantik zwischen reduce und reduceByKey

Die Methode reduceByKey erfordert jedoch nur eine assoziative Binärfunktion.

sc.textFile("file4kB", 4) 

Ich habe einige Tests gemacht, und anscheinend ist es das Verhalten, das ich bekomme. Warum dieser Unterschied? Warum stellt reduceByKey sicher, dass die Binärfunktion immer in einer bestimmten Reihenfolge angewendet wird (um die fehlende Kommutativität auszugleichen), wenn reduce nicht funktioniert?

Beispiel, wenn eine Last einige (kleine) Text mit 4 Partitionen (Minimum):

val r = sc.textFile("file4k", 4) 

dann:

r.reduce(_ + _) 

einen String zurückgibt, wo Teile, die nicht immer in der gleichen Reihenfolge sind, während:

r.map(x => (1,x)).reduceByKey(_ + _).first 

gibt immer die gleiche Zeichenfolge (wo alles in der gleichen Reihenfolge als in der ursprünglichen ist l Datei).

(Ich überprüft mit r.glom und der Dateiinhalt ist in der Tat verteilt über 4 Partitionen, es gibt keine leere Partition).

+2

ich die Idee mit 'reduceByKey' Vermutung ist, dass Sie wahrscheinlich viele verschiedene Schlüssel, so dass es in Ordnung ist, alles für einen einzelnen Schlüssel in einem einzelnen Thread zu reduzieren, was bedeutet, dass Sie die Berechnung immer von links nach rechts ausführen können. Im Gegensatz dazu wird "reduce" oft für einen großen Datensatz verwendet, so dass die Reihenfolge der Operationen nicht wichtig ist. –

+0

Wie viele Executoren verwenden Sie in Ihren Experimenten? – gprivitera

Antwort

7

Soweit es mich betrifft, ist dies ein Fehler in der Dokumentation und Ergebnisse, die Sie sehen, sind nur nebensächlich. Praxis, other resources und ein einfaches analysis of the code zeigen, dass die Funktion, die an reduceByKey übergeben wird, nicht nur assoziativ sondern auch kommutativ sein sollte.

  • Praxis - während es wie die Reihenfolge sieht in einem lokalen Modus beibehalten wird, ist es nicht mehr der Fall, wenn Sie Funken auf einem Cluster ausgeführt werden, einschließlich Standalone-Modus.

  • andere Ressourcen - zu zitieren Data Exploration Using Spark von AmpCamp 3:

    Es ist eine bequeme Methode reduceByKey Spark für genau dieses Muster genannt. Beachten Sie, dass das zweite Argument für reduceByKey die Anzahl der zu verwendenden Reduzierungen bestimmt. Standardmäßig geht Spark davon aus, dass die Reduzierungsfunktion kommutativ und assoziativ ist und Combiner auf der Mapper-Seite anwendet.

  • Code - reduceByKey wird mit combineByKeyWithClassTag implementiert und schafft ShuffledRDD. Da Spark nicht garantiert, dass die Reihenfolge nach dem Mischen die einzige Möglichkeit zur Wiederherstellung bietet, würde es sein, einige Metadaten an die teilweise reduzierten Datensätze anzuhängen. Soweit ich das beurteilen kann, passiert nichts dergleichen.

Auf einer seitlichen Anmerkung reduce wie es in PySpark umgesetzt wird, wird mit einer Funktion gut funktionieren, die nur kommutativ ist. Es ist natürlich nur ein Detail einer Implementierung und kein Teil des Vertrages.

+3

Ich würde hinzufügen, dass Reduzieren eine Aktion ist, die Daten an den Treiber zurückgibt, während reduceByKey eine Umwandlung ist, die einen anderen RDD zurückgibt – rhernando

+0

Danke! Aber gibt es einen Weg in Spark, um die Korrektheit einer NICHT-kommutativen Behandlung sicherzustellen? Oder liegt es außerhalb der Reichweite von Spark? –

+0

Ich bin mir nicht sicher, ob ich die Frage verstehe. Fragen Sie, ob es möglich ist, die Kommutativität automatisch zu testen/zu beweisen oder einfach die nichtkommutative Funktion mit 'reduce' zu ​​verwenden? Wenn dies der zweite Fall ist, der das Verhalten von PySpark nachahmt ('mapPartitions (reduceFunc)' => 'collect' => reduce (reduceFunc)') sollte mit einigen Leistungseinbußen arbeiten. – zero323

1

Gemäß der Codedokumentation, vor kurzem aktualisiert/korrigiert.(thanks @ zero323):

reduceByKey führt die Werte für jede Taste mit einer assoziativen und kommutativen Reduzierungsfunktion zusammen. Dies führt auch das Zusammenführen lokal auf jedem Mapper durch, bevor Ergebnisse an einen Reduzierer gesendet werden, ähnlich einem "Kombinierer" in MapReduce.

So war es in der Tat tatsächlich ein Dokumentationsfehler wie @ Zero323 in seiner Antwort darauf hingewiesen.

können Sie überprüfen die folgenden Links, um den Code, um sicherzustellen: