2013-07-12 15 views
45

Ich habe zwei RDD, dass ich und sie so aussehen anschließen möchten:Spark: Was ist die beste Strategie für den Beitritt einer 2-Tuple-Key-RDD mit Single-Key-RDD?

val rdd1:RDD[(T,U)] 
val rdd2:RDD[((T,W), V)] 

Es ist der Fall zu sein, kommt vor, dass die Schlüsselwerte von rdd1 einzigartig sind und auch, dass die Tupel-Schlüsselwerte von rdd2 einzigartig sind . Ich möchte die beiden Datensätze verbinden, so dass ich die folgende rdd erhalten:

val rdd_joined:RDD[((T,W), (U,V))] 

Was ist der effizienteste Weg, dies zu erreichen? Hier sind ein paar Ideen, an die ich gedacht habe.

Option 1:

val m = rdd1.collectAsMap 
val rdd_joined = rdd2.map({case ((t,w), u) => ((t,w), u, m.get(t))}) 

Option 2:

val distinct_w = rdd2.map({case ((t,w), u) => w}).distinct 
val rdd_joined = rdd1.cartesian(distinct_w).join(rdd2) 

Option 1 werden alle Daten sammeln Master, nicht wahr? Das scheint also keine gute Option zu sein, wenn rdd1 groß ist (in meinem Fall ist es relativ groß, obwohl es eine Größenordnung kleiner als rdd2 ist). Option 2 macht ein hässliches distinktes und kartesisches Produkt, was ebenfalls sehr ineffizient erscheint. Eine andere Möglichkeit, die mir in den Sinn kam (aber noch nicht ausprobiert wurde), ist Option 1 zu tun und die Karte zu übertragen, obwohl es besser wäre, auf eine "intelligente" Art und Weise zu senden, so dass die Schlüssel der Karte zusammen mit der Schlüssel von rdd2.

Hat jemand schon einmal eine solche Situation erlebt? Ich wäre glücklich, deine Gedanken zu haben.

Danke!

Antwort

56

Eine Option besteht darin, einen Broadcast-Join durchzuführen, indem rdd1 zum Treiber gesammelt und an alle Mapper übertragen wird; es richtig gemacht, lässt uns das eine teure Shuffle der großen rdd2 RDD vermeiden:

val rdd1 = sc.parallelize(Seq((1, "A"), (2, "B"), (3, "C"))) 
val rdd2 = sc.parallelize(Seq(((1, "Z"), 111), ((1, "ZZ"), 111), ((2, "Y"), 222), ((3, "X"), 333))) 

val rdd1Broadcast = sc.broadcast(rdd1.collectAsMap()) 
val joined = rdd2.mapPartitions({ iter => 
    val m = rdd1Broadcast.value 
    for { 
    ((t, w), u) <- iter 
    if m.contains(t) 
    } yield ((t, w), (u, m.get(t).get)) 
}, preservesPartitioning = true) 

Die preservesPartitioning = true sagt Funke, der diese Karte Funktion nicht die Schlüssel von rdd2 nicht verändert; Dies ermöglicht Spark, die erneute Partitionierung von rdd2 für alle nachfolgenden Operationen zu vermeiden, die basierend auf dem Schlüssel (t, w) beitreten.

Diese Übertragung kann ineffizient sein, da sie einen Kommunikationsengpass beim Treiber beinhaltet. Im Prinzip ist es möglich, eine RDD zu einer anderen zu senden, ohne den Treiber zu involvieren; Ich habe einen Prototyp, den ich generalisieren und zu Spark hinzufügen möchte.

Eine andere Möglichkeit besteht darin, die Schlüssel von rdd2 neu abzubilden und die Methode Spark join zu verwenden; dies wird eine vollständige Shuffle von rdd2 (und möglicherweise rdd1) beinhaltet:

rdd1.join(rdd2.map { 
    case ((t, w), u) => (t, (w, u)) 
}).map { 
    case (t, (v, (w, u))) => ((t, w), (u, v)) 
}.collect() 

Auf meinem Abtastwerteingang, beiden Methoden zum gleichen Ergebnis:

res1: Array[((Int, java.lang.String), (Int, java.lang.String))] = Array(((1,Z),(111,A)), ((1,ZZ),(111,A)), ((2,Y),(222,B)), ((3,X),(333,C))) 

Eine dritte Möglichkeit zur Umstrukturierung wäre rdd2 so dass t ist sein Schlüssel, dann führen Sie die oben genannten Join.

+1

Ich denke, du bist zweite Option ist wahrscheinlich der einfachste Weg zu gehen, obwohl die Umstrukturierung rdd2 bequem wäre. – Noah

+0

Ich muss mehr darüber lernen, wie die mapPartitions-Funktion funktioniert, aber das scheint nach dem zu sein, wonach ich gesucht habe. Ich stimme auch zu, dass ich "rdd2" neu strukturieren und durch eine Reihe von Karten zurück zu der ursprünglichen Sache kommen konnte, die ich wollte. Ich werde beide Optionen untersuchen und sehen, wie gut sie für meinen Anwendungsfall funktionieren. Danke für die Vorschläge! – RyanH

+0

Für die erste Option, wenn ich versuche, val rdd1Broadcast = sc.broadcast (rdd1.collectAsMap()) gibt es Daten unvollständig zurück. Gibt es eine Möglichkeit, die erste Option mit collect() anstelle von collecAsMap() anzupassen? –

12

Eine andere Möglichkeit besteht darin, einen benutzerdefinierten Partitionierer zu erstellen und dann zipPartitions zu verwenden, um Ihre RDDs zu verbinden.

import org.apache.spark.HashPartitioner 

class RDD2Partitioner(partitions: Int) extends HashPartitioner(partitions) { 

    override def getPartition(key: Any): Int = key match { 
    case k: Tuple2[Int, String] => super.getPartition(k._1) 
    case _ => super.getPartition(key) 
    } 

} 

val numSplits = 8 
val rdd1 = sc.parallelize(Seq((1, "A"), (2, "B"), (3, "C"))).partitionBy(new HashPartitioner(numSplits)) 
val rdd2 = sc.parallelize(Seq(((1, "Z"), 111), ((1, "ZZ"), 111), ((1, "AA"), 123), ((2, "Y"), 222), ((3, "X"), 333))).partitionBy(new RDD2Partitioner(numSplits)) 

val result = rdd2.zipPartitions(rdd1)(
    (iter2, iter1) => { 
    val m = iter1.toMap 
    for { 
     ((t: Int, w), u) <- iter2 
     if m.contains(t) 
     } yield ((t, w), (u, m.get(t).get)) 
    } 
).partitionBy(new HashPartitioner(numSplits)) 

result.glom.collect