2015-06-26 9 views
11

Die Apache Spark pyspark.RDD API-Dokumentation erwähnt, dass groupByKey() ineffizient ist. Stattdessen wird empfohlen, stattdessen reduceByKey(), aggregateByKey(), combineByKey() oder foldByKey() zu verwenden. Dies führt dazu, dass vor dem Mischen etwas von der Aggregation in den Arbeitern ausgeführt wird, wodurch das Mischen von Daten zwischen Arbeitern reduziert wird.Apache Spark: Was ist die entsprechende Implementierung von RDD.groupByKey() mit RDD.aggregateByKey()?

Angesichts der folgenden Datensatz und groupByKey() Ausdruck, was ist eine äquivalente und effiziente Implementierung (reduziert Cross-Worker-Daten mischen), die nicht verwendet groupByKey(), aber liefert das gleiche Ergebnis?

dataset = [("a", 7), ("b", 3), ("a", 8)] 
rdd = (sc.parallelize(dataset) 
     .groupByKey()) 
print sorted(rdd.mapValues(list).collect()) 

Ausgang:

[('a', [7, 8]), ('b', [3])] 
+0

Sind Ihre Daten nach dem Zufallsprinzip oder nach Schlüssel partitioniert? Wenn Sie sicherstellen können, dass sich alle Datensätze mit a._1 = "a" auf der gleichen Partition befinden, können Sie die Geschwindigkeit drastisch erhöhen. Sie können möglicherweise ohne die Verwendung von Shuffle, die für die erste Partitionierung erforderlich sind, wegkommen . Vielleicht versuchen Sie es mit einem Hash-Partitionierer? –

Antwort

18

Soweit Ich kann sagen, es gibt nichts zu gewinnen * in diesem speziellen Fall mit aggregateByKey oder einer ähnlichen Funktion. Da Sie eine Liste erstellen, gibt es keine "echte" Reduktion und die Menge an Daten, die gemischt werden muss, ist mehr oder weniger die gleiche.

Um wirklich etwas Leistungszunahme zu beobachten, brauchen Sie Transformationen, die tatsächlich die Menge der übertragenen Daten reduzieren, zum Beispiel das Zählen, das Berechnen von zusammenfassenden Statistiken, das Finden einzigartiger Elemente.

In Bezug auf Unterschiede Vorteile der Verwendung von reduceByKey(), combineByKey() oder foldByKey() gibt es einen wichtigen begrifflichen Unterschied, die leichter zu sehen, wenn Sie Scala API singatures betrachten.

Beide reduceByKey und foldByKey Karte von RDD[(K, V)] bis RDD[(K, V)], während die zweite zusätzliche Nullelement bietet.

reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)] 
foldByKey(zeroValue: V)(func: (V, V) ⇒ V): RDD[(K, V)] 

combineByKey (es gibt keine aggregateByKey, aber es ist die gleiche Art von Transformation) transformiert RDD[(K, V)]-RDD[(K, C)]:

combineByKey[C](
    createCombiner: (V) ⇒ C, 
    mergeValue: (C, V) ⇒ C, 
    mergeCombiners: (C, C) ⇒ C): RDD[(K, C)] 

zurück zu Ihrem Beispiel geht nur combineByKey (und in PySpark aggregateByKey) ist wirklich anwendbar, da Sie von RDD[(String, Int)] zu RDD[(String, List[Int])] umwandeln.

Während in einer dynamischen Sprache wie Python es tatsächlich möglich ist, solch eine Operation auszuführen foldByKey oder reduceByKey macht es Semantik des Codes unklar und zitiert @ tim-peters „sollte es one-- sein und vorzugsweise nur ein --offensichtlicher Weg, es zu tun "[1].

Unterschied zwischen aggregateByKey und combineByKey ist so ziemlich die gleiche wie zwischen reduceByKey und foldByKey so für eine Liste es meist Geschmackssache ist: obwohl

def merge_value(acc, x): 
    acc.append(x) 
    return acc 

def merge_combiners(acc1, acc2): 
    acc1.extend(acc2) 
    return acc1 

rdd = (sc.parallelize([("a", 7), ("b", 3), ("a", 8)]) 
    .combineByKey(
     lambda x: [x], 
     lambda u, v: u + [v], 
     lambda u1,u2: u1+u2)) 

In der Praxis sollten Sie groupByKey bevorzugen. Die PySpark-Implementierung ist im Vergleich zur naiven Implementierung, wie oben beschrieben, deutlich optimiert.

1.Peters, T. PEP 20 - Das Zen von Python. (2004). bei https://www.python.org/dev/peps/pep-0020/


* In der Praxis gibt es tatsächlich eine ganze Menge hier zu verlieren, vor allem bei der Verwendung von PySpark. Python-Implementierung von groupByKey ist wesentlich mehr optimiert als naive Kombination nach Schlüssel. Sie können überprüfen, Be Smart About groupByKey, erstellt von mir und für eine zusätzliche Diskussion.

+0

Wenn Sie einen Partitionierer verwenden (sagen wir, Partition durch einen Hash des Schlüssels), können Sie ohne weitere Shuffles entkommen? –

+0

@GlennStrycker Soweit ich weiß, ist die Antwort positiv. Wenn RDD durch Schlüssel partitioniert wird, sollten alle Werte für einen bestimmten Schlüssel lokal auf einem einzelnen Knoten verarbeitet werden. Mögliches Problem ist jedoch eine schiefe Verteilung der Schlüssel. – zero323

3

Hier ist eine Option, die aggregateByKey() verwendet. Ich würde neugierig sein zu hören, wie dies mit reduceByKey(), combineByKey() oder foldByKey() getan werden kann, und welche Kosten/Nutzen gibt es für jede Alternative.

rdd = (sc.parallelize([("a", 7), ("b", 3), ("a", 8)]) 
     .aggregateByKey(list(), 
         lambda u,v: u+[v], 
         lambda u1,u2: u1+u2)) 
print sorted(rdd.mapValues(list).collect()) 

Output:

[('a', [7, 8]), ('b', [3])] 

Das folgende ist eine etwas Speicher effiziente Implementierung, wenn auch weniger lesbar der Python Anfänger, die die gleiche Ausgabe erzeugt:

rdd = (sc.parallelize([("a", 7), ("b", 3), ("a", 8)]) 
     .aggregateByKey(list(), 
         lambda u,v: itertools.chain(u,[v]), 
         lambda u1,u2: itertools.chain(u1,u2))) 
print sorted(rdd.mapValues(list).collect())