2016-07-30 11 views
0

Um zu vermeiden, die Berechnung aller möglichen Kombinationen verwenden, ich versuche zu einem bestimmten Schlüssel zur Gruppenwerte nach, und dann das kartesische Produkt der Werte für jeden Schlüssel zu berechnen, das heißt:kartesisches Produkt Optimierung Schlüssel in Funken

Input [(k1, v1), (k1, v2), (k2, v3)] 

Wunsch Ausgabe: [(v1, v1), (v1, v2), (v2, v2), (v2, v1), (v3, v3)] Hier ist der Code habe ich versucht, die Ausführung:

val input = sc.textFile('data.csv') 
val rdd = input.map(s=>s.split(",")) 
       .map(s => (s(1).toString, s(2).toString)) 
val group_result:RDD[String, Iterable[String]] = rdd.groupByKey() 
group_result.flatMap { t => 
{ 
    val stream1= t._2.toStream 
    val stream2= t._2.toStream 

    stream1.flatMap { src => 
    stream2.par.map { trg => 
      src + "," + trg 
    } 
    } 
} 
} 

Dies funktioniert für sehr kleine Dateien in Ordnung, aber wenn die Liste (Iterable) der Länge von ~ 1000 die Berechnung vollständig gefriert.

+0

Sie sollten RDD-Karten (Transformationen im Allgemeinen) nicht mit Scala-nativen mischen. Innere sind nicht von Spark optimiert; Vielleicht ist das der Grund, warum es einfriert. –

+1

@ TomaszBłachut Whaaaat? Warum solltest du nicht? :) 'groupByKey' ist wahrscheinlich nicht die beste Idee hier,' toStream' bietet keine Vorteile, und einfaches selbst 'Join' wäre genug, aber es ist nichts besonderes falsch mit" Scala map ". – zero323

+0

@ zero323 Danke für die Klarstellung, ich denke, ich sollte anfangen, meine Kommentare mit AFAIR voranzutreiben:> Das existiert in meinem Kopf als etwas, was ich nicht tun soll, vielleicht habe ich verschiedene Beispiele gesehen, die schlecht aussahen –

Antwort

0

Wie @ zero323 sagte, der beste Weg, dies zu lösen, ist durch PairRDDFunctions' Methode join jedoch, um mit diesem Sie eine PairedRDD, muss das zu erreichen, die durch die Verwendung RDD Methode keyBy erhalten werden kann.

Man könnte so etwas wie tun:

val rdd = sc.parallelize(Array(("k1", "v1"), ("k1", "v2"), ("k2", "v3"))).keyBy(_._1) 

val result = rdd.join(rdd).map{ 
    case (key: String, (x: Tuple2[String, String], y: Tuple2[String, String])) => (x._2, y._2) 
} 

result.take(20) 
// res9: Array[(String, String)] = Array((v1,v1), (v1,v2), (v2,v1), (v2,v2), (v3, v3)) 

Here ich das Notebook mit dem Code teilen.