2014-12-12 17 views
18

Ich muss zwei gewöhnlichen RDDs auf einer/mehreren Spalten verbinden. Logischerweise entspricht diese Operation der Datenbankverbindungsoperation von zwei Tabellen. Ich frage mich, ob dies nur durch Spark SQL möglich ist oder es andere Möglichkeiten gibt, es zu tun.Verbinden Sie zwei gewöhnliche RDDs mit/ohne Spark SQL

Als konkretes Beispiel betrachten RDD r1 mit Primärschlüssel ITEM_ID:

(ITEM_ID, ITEM_NAME, ITEM_UNIT, COMPANY_ID) 

und RDD r2 mit Primärschlüssel COMPANY_ID:

(COMPANY_ID, COMPANY_NAME, COMPANY_CITY) 

Ich möchte r1 und r2 beizutreten.

Wie kann das gemacht werden?

Antwort

8

So etwas sollte funktionieren.

scala> case class Item(id:String, name:String, unit:Int, companyId:String) 

scala> case class Company(companyId:String, name:String, city:String) 

scala> val i1 = Item("1", "first", 2, "c1") 

scala> val i2 = i1.copy(id="2", name="second") 

scala> val i3 = i1.copy(id="3", name="third", companyId="c2") 

scala> val items = sc.parallelize(List(i1,i2,i3)) 
items: org.apache.spark.rdd.RDD[Item] = ParallelCollectionRDD[14] at parallelize at <console>:20 

scala> val c1 = Company("c1", "company-1", "city-1") 

scala> val c2 = Company("c2", "company-2", "city-2") 

scala> val companies = sc.parallelize(List(c1,c2)) 

scala> val groupedItems = items.groupBy(x => x.companyId) 
groupedItems: org.apache.spark.rdd.RDD[(String, Iterable[Item])] = ShuffledRDD[16] at groupBy at <console>:22 

scala> val groupedComp = companies.groupBy(x => x.companyId) 
groupedComp: org.apache.spark.rdd.RDD[(String, Iterable[Company])] = ShuffledRDD[18] at groupBy at <console>:20 

scala> groupedItems.join(groupedComp).take(10).foreach(println) 

14/12/12 00:52:32 INFO DAGScheduler: Job 5 finished: take at <console>:35, took 0.021870 s 
(c1,(CompactBuffer(Item(1,first,2,c1), Item(2,second,2,c1)),CompactBuffer(Company(c1,company-1,city-1)))) 
(c2,(CompactBuffer(Item(3,third,2,c2)),CompactBuffer(Company(c2,company-2,city-2)))) 
23

Soumya Simanta gab eine gute Antwort. Die Werte in verknüpfter RDD sind jedoch Iterable, daher sind die Ergebnisse möglicherweise nicht sehr ähnlich wie beim Verbinden gewöhnlicher Tabellen.

Alternativ können Sie:

val mappedItems = items.map(item => (item.companyId, item)) 
val mappedComp = companies.map(comp => (comp.companyId, comp)) 
mappedItems.join(mappedComp).take(10).foreach(println) 

Der Ausgang wäre:

(c1,(Item(1,first,2,c1),Company(c1,company-1,city-1))) 
(c1,(Item(2,second,2,c1),Company(c1,company-1,city-1))) 
(c2,(Item(3,third,2,c2),Company(c2,company-2,city-2))) 
+1

Ihre Karten sind identisch mit 'items.keyBy {_. CompanyId}', 'com panies.keyBy {_. companyId} '. Da sie Teil von Spark sind, gibt es eine Chance, die effizienter wäre? –

+0

@Paul Dies ist der Zündquellencode für keyBy: 'def keyBy [K] (f: T => K): RDD [(K, T)] = {' 'map (x => (f (x), x)) ' '} ' so sind Ihre Lösung und @virya Lösung ganz gleich – jlopezmat

+1

OK :) Noch, vielleicht Absicht etwas klarer mit keyBy. Kein wichtiger Punkt, obwohl –

2

Spark-SQL auf SPARK RDDs beitreten ausführen können.

Below-Code führt auf Gesellschaft und Artikel RDDs SQL beitreten

object SparkSQLJoin { 

case class Item(id:String, name:String, unit:Int, companyId:String) 
case class Company(companyId:String, name:String, city:String) 

def main(args: Array[String]) { 

    val sparkConf = new SparkConf() 
    val sc= new SparkContext(sparkConf) 
    val sqlContext = new SQLContext(sc) 

    import sqlContext.createSchemaRDD 

    val i1 = Item("1", "first", 1, "c1") 
    val i2 = Item("2", "second", 2, "c2") 
    val i3 = Item("3", "third", 3, "c3") 
    val c1 = Company("c1", "company-1", "city-1") 
    val c2 = Company("c2", "company-2", "city-2") 

    val companies = sc.parallelize(List(c1,c2)) 
    companies.registerAsTable("companies") 

    val items = sc.parallelize(List(i1,i2,i3)) 
    items.registerAsTable("items") 

    val result = sqlContext.sql("SELECT * FROM companies C JOIN items I ON C.companyId= I.companyId").collect 

    result.foreach(println) 

    } 
} 

Ausgabe als

angezeigt wird
 [c1,company-1,city-1,1,first,1,c1] 
    [c2,company-2,city-2,2,second,2,c2] 
+0

Ich habe mehrere Spalten, daher muss ich das Schema programmgesteuert angeben. Auch die RDDs werden aus großen Textdateien auf HDFS erstellt. Ich glaube, der obige Ansatz funktioniert immer noch, oder? Bitte lassen Sie mich wissen, wenn Änderungen notwendig sind. –

+0

Ja, dieser Ansatz funktioniert auch für große Datenmengen. Um das Schema programmgesteuert zu definieren, besuchen Sie http://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema –

8

(Verwenden von Scala) Lassen Sie sagen, Sie haben zwei RDDs:

  • emp: (empid, ename, dept)

  • dept: (dname, abt)

Es folgt eine andere Art und Weise:

//val emp = sc.parallelize(Seq((1,"jordan",10), (2,"ricky",20), (3,"matt",30), (4,"mince",35), (5,"rhonda",30))) 
val emp = sc.parallelize(Seq(("jordan",10), ("ricky",20), ("matt",30), ("mince",35), ("rhonda",30))) 

val dept = sc.parallelize(Seq(("hadoop",10), ("spark",20), ("hive",30), ("sqoop",40))) 

//val shifted_fields_emp = emp.map(t => (t._3, t._1, t._2)) 
val shifted_fields_emp = emp.map(t => (t._2, t._1)) 

val shifted_fields_dept = dept.map(t => (t._2,t._1)) 

shifted_fields_emp.join(shifted_fields_dept) 
// Create emp RDD 
val emp = sc.parallelize(Seq((1,"jordan",10), (2,"ricky",20), (3,"matt",30), (4,"mince",35), (5,"rhonda",30))) 

// Create dept RDD 
val dept = sc.parallelize(Seq(("hadoop",10), ("spark",20), ("hive",30), ("sqoop",40))) 

// Establishing that the third field is to be considered as the Key for the emp RDD 
val manipulated_emp = emp.keyBy(t => t._3) 

// Establishing that the second field need to be considered as the Key for dept RDD 
val manipulated_dept = dept.keyBy(t => t._2) 

// Inner Join 
val join_data = manipulated_emp.join(manipulated_dept) 
// Left Outer Join 
val left_outer_join_data = manipulated_emp.leftOuterJoin(manipulated_dept) 
// Right Outer Join 
val right_outer_join_data = manipulated_emp.rightOuterJoin(manipulated_dept) 
// Full Outer Join 
val full_outer_join_data = manipulated_emp.fullOuterJoin(manipulated_dept) 

// Formatting the Joined Data for better understandable (using map) 
val cleaned_joined_data = join_data.map(t => (t._2._1._1, t._2._1._2, t._1, t._2._2._1)) 

Das wird die Ausgabe geben wie:

// die Ausgangs cleaned_joined_data auf der

Konsole Drucken
scala> cleaned_joined_data.collect() 
res13: Array[(Int, String, Int, String)] = Array((3,matt,30,hive), (5,rhonda,30,hive), (2,ricky,20,spark), (1,jordan,10,hadoop))