Soweit Ich kann sagen, es gibt nichts zu gewinnen * in diesem speziellen Fall mit aggregateByKey
oder einer ähnlichen Funktion. Da Sie eine Liste erstellen, gibt es keine "echte" Reduktion und die Menge an Daten, die gemischt werden muss, ist mehr oder weniger die gleiche.
Um wirklich etwas Leistungszunahme zu beobachten, brauchen Sie Transformationen, die tatsächlich die Menge der übertragenen Daten reduzieren, zum Beispiel das Zählen, das Berechnen von zusammenfassenden Statistiken, das Finden einzigartiger Elemente.
In Bezug auf Unterschiede Vorteile der Verwendung von reduceByKey()
, combineByKey()
oder foldByKey()
gibt es einen wichtigen begrifflichen Unterschied, die leichter zu sehen, wenn Sie Scala API singatures betrachten.
Beide reduceByKey
und foldByKey
Karte von RDD[(K, V)]
bis RDD[(K, V)]
, während die zweite zusätzliche Nullelement bietet.
reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)]
foldByKey(zeroValue: V)(func: (V, V) ⇒ V): RDD[(K, V)]
combineByKey
(es gibt keine aggregateByKey
, aber es ist die gleiche Art von Transformation) transformiert RDD[(K, V)]
-RDD[(K, C)]
:
combineByKey[C](
createCombiner: (V) ⇒ C,
mergeValue: (C, V) ⇒ C,
mergeCombiners: (C, C) ⇒ C): RDD[(K, C)]
zurück zu Ihrem Beispiel geht nur combineByKey
(und in PySpark aggregateByKey
) ist wirklich anwendbar, da Sie von RDD[(String, Int)]
zu RDD[(String, List[Int])]
umwandeln.
Während in einer dynamischen Sprache wie Python es tatsächlich möglich ist, solch eine Operation auszuführen foldByKey
oder reduceByKey
macht es Semantik des Codes unklar und zitiert @ tim-peters „sollte es one-- sein und vorzugsweise nur ein --offensichtlicher Weg, es zu tun "[1].
Unterschied zwischen aggregateByKey
und combineByKey
ist so ziemlich die gleiche wie zwischen reduceByKey
und foldByKey
so für eine Liste es meist Geschmackssache ist: obwohl
def merge_value(acc, x):
acc.append(x)
return acc
def merge_combiners(acc1, acc2):
acc1.extend(acc2)
return acc1
rdd = (sc.parallelize([("a", 7), ("b", 3), ("a", 8)])
.combineByKey(
lambda x: [x],
lambda u, v: u + [v],
lambda u1,u2: u1+u2))
In der Praxis sollten Sie groupByKey
bevorzugen. Die PySpark-Implementierung ist im Vergleich zur naiven Implementierung, wie oben beschrieben, deutlich optimiert.
1.Peters, T. PEP 20 - Das Zen von Python. (2004). bei https://www.python.org/dev/peps/pep-0020/
* In der Praxis gibt es tatsächlich eine ganze Menge hier zu verlieren, vor allem bei der Verwendung von PySpark. Python-Implementierung von groupByKey
ist wesentlich mehr optimiert als naive Kombination nach Schlüssel. Sie können überprüfen, Be Smart About groupByKey, erstellt von mir und für eine zusätzliche Diskussion.
Sind Ihre Daten nach dem Zufallsprinzip oder nach Schlüssel partitioniert? Wenn Sie sicherstellen können, dass sich alle Datensätze mit a._1 = "a" auf der gleichen Partition befinden, können Sie die Geschwindigkeit drastisch erhöhen. Sie können möglicherweise ohne die Verwendung von Shuffle, die für die erste Partitionierung erforderlich sind, wegkommen . Vielleicht versuchen Sie es mit einem Hash-Partitionierer? –