2016-06-30 5 views
3

Ich habe einen Dataframe Auftrag:Aggregate Array-Typ in Spark-Dataframe

+-----------------+-----------+--------------+ 
|    Id| Order |  Gender| 
+-----------------+-----------+--------------+ 
|    1622|[101330001]|   Male| 
|    1622| [147678]|   Male| 
|    3837| [1710544]|   Male| 
+-----------------+-----------+--------------+ 

, die ich auf Id und Geschlecht GROUPBY will und dann Aggregat Aufträge. I org.apache.spark.sql.functions Paket und Code verwenden wie folgt aussieht:

DataFrame group = orders.withColumn("orders", col("order")) 
       .groupBy(col("Id"), col("Gender")) 
       .agg(collect_list("products")); 

Da jedoch Spalt Auftrag ist vom Typ Array bekomme ich diese Ausnahme, weil es eine primitive Art erwartet:

User class threw exception: org.apache.spark.sql.AnalysisException: No handler for Hive udf class org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCollectList because: Only primitive type arguments are accepted but array<string> was passed as parameter 1 

Ich habe in das Paket geschaut und es gibt Sortierfunktionen für Arrays, aber keine Aggregatfunktionen. Irgendeine Idee, wie es geht? Vielen Dank.

Antwort

0

In diesem Fall können Sie Ihre eigene Funktion definieren und registrieren Sie es als UDF

val userDefinedFunction = ??? 
val udfFunctionName = udf[U,T](userDefinedFunction) 

Dann statt dann innerhalb dieser Funktion die Spalte übergeben, so dass sie in primitiven Typ umgewandelt wird und dann in den Pass mit Spaltenmethode

Etwas wie folgt aus:

val dataF:(Array[Int])=>Int=_.head 

val dataUDF=udf[Int,Array[Int]](dataF) 


DataFrame group = orders.withColumn("orders", dataUDF(col("order"))) 
       .groupBy(col("Id"), col("Gender")) 
       .agg(collect_list("products")); 

Ich hoffe, es funktioniert!

+0

Danke Shivansh! –