2016-06-09 30 views
8

Ich frage mich, ob es eine Möglichkeit gibt, eine benutzerdefinierte Aggregationsfunktion für Spark-Datenrahmen über mehrere Spalten anzugeben.Aggregieren mehrerer Spalten mit benutzerdefinierten Funktion in Spark

Ich habe eine Tabelle wie folgt von der Art (Name, Artikel, Preis):

john | tomato | 1.99 
john | carrot | 0.45 
bill | apple | 0.99 
john | banana | 1.29 
bill | taco | 2.59 

zu:

Ich möchte den Artikel aggregieren und es ist Kosten für jede Person in eine Liste wie folgt:

john | (tomato, 1.99), (carrot, 0.45), (banana, 1.29) 
bill | (apple, 0.99), (taco, 2.59) 

Ist dies in Datenrahmen möglich? Ich habe kürzlich über collect_list gelernt, aber es scheint nur für eine Spalte zu arbeiten.

Antwort

15

Der einfachste Weg, dies als DataFrame zu tun ist, zunächst zwei Listen zu sammeln und dann gemeinsam ein UDF-zip die beiden Listen verwenden. Etwas wie:

import org.apache.spark.sql.functions.{collect_list, udf} 
import sqlContext.implicits._ 

val zipper = udf[Seq[(String, Double)], Seq[String], Seq[Double]](_.zip(_)) 

val df = Seq(
    ("john", "tomato", 1.99), 
    ("john", "carrot", 0.45), 
    ("bill", "apple", 0.99), 
    ("john", "banana", 1.29), 
    ("bill", "taco", 2.59) 
).toDF("name", "food", "price") 

val df2 = df.groupBy("name").agg(
    collect_list(col("food")) as "food", 
    collect_list(col("price")) as "price" 
).withColumn("food", zipper(col("food"), col("price"))).drop("price") 

df2.show(false) 
# +----+---------------------------------------------+ 
# |name|food           | 
# +----+---------------------------------------------+ 
# |john|[[tomato,1.99], [carrot,0.45], [banana,1.29]]| 
# |bill|[[apple,0.99], [taco,2.59]]     | 
# +----+---------------------------------------------+ 
+0

Gute Antwort! :) – eliasah

+1

Ich habe 'col (...)' anstelle von '$" ... "' aus einem Grund verwendet - ich finde 'col (...)' arbeitet mit weniger Arbeit innerhalb von Dingen wie 'class' Definitionen . –

+0

Gibt es irgendeine Funktion, um Spalten neu auszurichten, wie zum Beispiel in der Zip-Funktion, sagen Sie, dass Sie zuerst ein Element aus dem Ende der Spalte hinzufügen und eines aus dem Kopf entfernen und dann zippen? In diesem Fall können Sie zum Beispiel den nächsten Preis für die Artikel haben, wenn Sie die Preise täglich lesen und es eine Zeitspalte gibt. –

2

Hier ist eine Option, indem Sie den Datenrahmen in eine RDD von Map konvertieren und dann eine groupByKey darauf aufrufen. Das Ergebnis wäre eine Liste von Schlüssel-Wert-Paaren, wobei Wert eine Liste von Tupeln ist.

df.show 
+----+------+----+ 
| _1| _2| _3| 
+----+------+----+ 
|john|tomato|1.99| 
|john|carrot|0.45| 
|bill| apple|0.99| 
|john|banana|1.29| 
|bill| taco|2.59| 
+----+------+----+ 


val tuples = df.map(row => row(0) -> (row(1), row(2))) 
tuples: org.apache.spark.rdd.RDD[(Any, (Any, Any))] = MapPartitionsRDD[102] at map at <console>:43 

tuples.groupByKey().map{ case(x, y) => (x, y.toList) }.collect 
res76: Array[(Any, List[(Any, Any)])] = Array((bill,List((apple,0.99), (taco,2.59))), (john,List((tomato,1.99), (carrot,0.45), (banana,1.29)))) 
15

Betrachten Sie die struct Funktion Gruppe mit den Spalten zusammen, bevor sie als Liste zu sammeln:

import org.apache.spark.sql.functions.{collect_list, struct} 
import sqlContext.implicits._ 

val df = Seq(
    ("john", "tomato", 1.99), 
    ("john", "carrot", 0.45), 
    ("bill", "apple", 0.99), 
    ("john", "banana", 1.29), 
    ("bill", "taco", 2.59) 
).toDF("name", "food", "price") 

df.groupBy($"name") 
    .agg(collect_list(struct($"food", $"price")).as("foods")) 
    .show(false) 

Ausgänge:

+----+---------------------------------------------+ 
|name|foods          | 
+----+---------------------------------------------+ 
|john|[[tomato,1.99], [carrot,0.45], [banana,1.29]]| 
|bill|[[apple,0.99], [taco,2.59]]     | 
+----+---------------------------------------------+ 
+0

Ich möchte erwähnen, dass dieser Ansatz sauberer als die akzeptierte aussieht Antwort, funktioniert aber leider nicht mit Spark 1.6, da 'collect_list()' keine Struktur akzeptiert. – trudolf