Ich habe zwei RDD, dass ich und sie so aussehen anschließen möchten:Spark: Was ist die beste Strategie für den Beitritt einer 2-Tuple-Key-RDD mit Single-Key-RDD?
val rdd1:RDD[(T,U)]
val rdd2:RDD[((T,W), V)]
Es ist der Fall zu sein, kommt vor, dass die Schlüsselwerte von rdd1
einzigartig sind und auch, dass die Tupel-Schlüsselwerte von rdd2
einzigartig sind . Ich möchte die beiden Datensätze verbinden, so dass ich die folgende rdd erhalten:
val rdd_joined:RDD[((T,W), (U,V))]
Was ist der effizienteste Weg, dies zu erreichen? Hier sind ein paar Ideen, an die ich gedacht habe.
Option 1:
val m = rdd1.collectAsMap
val rdd_joined = rdd2.map({case ((t,w), u) => ((t,w), u, m.get(t))})
Option 2:
val distinct_w = rdd2.map({case ((t,w), u) => w}).distinct
val rdd_joined = rdd1.cartesian(distinct_w).join(rdd2)
Option 1 werden alle Daten sammeln Master, nicht wahr? Das scheint also keine gute Option zu sein, wenn rdd1 groß ist (in meinem Fall ist es relativ groß, obwohl es eine Größenordnung kleiner als rdd2 ist). Option 2 macht ein hässliches distinktes und kartesisches Produkt, was ebenfalls sehr ineffizient erscheint. Eine andere Möglichkeit, die mir in den Sinn kam (aber noch nicht ausprobiert wurde), ist Option 1 zu tun und die Karte zu übertragen, obwohl es besser wäre, auf eine "intelligente" Art und Weise zu senden, so dass die Schlüssel der Karte zusammen mit der Schlüssel von rdd2
.
Hat jemand schon einmal eine solche Situation erlebt? Ich wäre glücklich, deine Gedanken zu haben.
Danke!
Ich denke, du bist zweite Option ist wahrscheinlich der einfachste Weg zu gehen, obwohl die Umstrukturierung rdd2 bequem wäre. – Noah
Ich muss mehr darüber lernen, wie die mapPartitions-Funktion funktioniert, aber das scheint nach dem zu sein, wonach ich gesucht habe. Ich stimme auch zu, dass ich "rdd2" neu strukturieren und durch eine Reihe von Karten zurück zu der ursprünglichen Sache kommen konnte, die ich wollte. Ich werde beide Optionen untersuchen und sehen, wie gut sie für meinen Anwendungsfall funktionieren. Danke für die Vorschläge! – RyanH
Für die erste Option, wenn ich versuche, val rdd1Broadcast = sc.broadcast (rdd1.collectAsMap()) gibt es Daten unvollständig zurück. Gibt es eine Möglichkeit, die erste Option mit collect() anstelle von collecAsMap() anzupassen? –