Gegeben ein Dataset von [K, V] und eine Notwendigkeit, basierend auf Schlüssel beizutreten, welche Methode die beste Methode zur Sicherstellung der Lokalität/Geschwindigkeit einer benutzerdefinierten Partition/Hash-Partitionierung bietet oder die SortBy() verwendet Methode gefolgt von der Join-Methode? Und warum ist eine Methode leistungsfähiger als die andere?Spark Custom Partioner vs SortByKey-Funktionalität
Zusätzlich, wenn Sie die Mehrheit der Schlüssel haben den gleichen Schlüssel beeinflusst dies die Methode Wahl?
Fall 1
val rdd = sc.parallelize(Seq(("gh5", "id1"), ("gh4", "id1"), ("gh5", "id2"),("gh5", "id3"))
val sorted = rdd.sortBy(_._1)
val joined = sorted.join(sorted)
Fall 2
val rdd = sc.parallelize(Seq(("gh5", "id1"), ("gh4", "id1"), ("gh5", "id2"),("gh5", "id3"), ...)
val custom_partitioned = rdd.partitionBy(100)).persist()
val joined = custom_partitioned.join(custom_partitioned)
Wenn es zu einer stark verzerrten Verteilung kommt, ist SortBy der beste Weg, um die Datenlokalität/-leistung auf dem Join zu gewährleisten, oder gibt es tatsächlich eine bessere Möglichkeit, dieselbe Funktion auszuführen (anders als die vorgeschlagenen). – SChorlton
Nach dem Mischen mit dem gleichen Partitionierer wird jede Operation per Definition lokal sein. Und die Wahl des Partitioners ('RangePartioner' wird zum Sortieren verwendet) hängt von Ihren Daten und der tatsächlichen Verteilung ab. Es gibt keine universelle Lösung. Wenn Sie wissen, dass Daten in einer bestimmten Weise organisiert sind, erstellen Sie einen benutzerdefinierten Partitionierer, der dies widerspiegelt. – zero323