Bevor Sie fortfahren: Diese Operationen ist noch eine weitere andere groupByKey
. Während es mehrere legitime Anwendungen hat, ist es relativ teuer, also verwenden Sie es nur bei Bedarf.
Nicht gerade kurz oder effiziente Lösung, aber Sie können UserDefinedAggregateFunction
in Spark-1.5.0 eingeführt verwenden:
object GroupConcat extends UserDefinedAggregateFunction {
def inputSchema = new StructType().add("x", StringType)
def bufferSchema = new StructType().add("buff", ArrayType(StringType))
def dataType = StringType
def deterministic = true
def initialize(buffer: MutableAggregationBuffer) = {
buffer.update(0, ArrayBuffer.empty[String])
}
def update(buffer: MutableAggregationBuffer, input: Row) = {
if (!input.isNullAt(0))
buffer.update(0, buffer.getSeq[String](0) :+ input.getString(0))
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
buffer1.update(0, buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0))
}
def evaluate(buffer: Row) = UTF8String.fromString(
buffer.getSeq[String](0).mkString(","))
}
Beispiel Nutzung:
val df = sc.parallelize(Seq(
("username1", "friend1"),
("username1", "friend2"),
("username2", "friend1"),
("username2", "friend3")
)).toDF("username", "friend")
df.groupBy($"username").agg(GroupConcat($"friend")).show
## +---------+---------------+
## | username| friends|
## +---------+---------------+
## |username1|friend1,friend2|
## |username2|friend1,friend3|
## +---------+---------------+
Sie können auch einen Python-Wrapper erstellen, wie gezeigt in Spark: How to map Python with Scala or Java User Defined Functions?
In der Praxis kann es sein Fas um RDD, groupByKey
, mkString
zu extrahieren und DataFrame neu zu erstellen.
Sie können einen ähnlichen Effekt durch die Kombination von collect_list
Funktion (Spark> = 1.6.0) mit concat_ws
:
import org.apache.spark.sql.functions.{collect_list, udf, lit}
df.groupBy($"username")
.agg(concat_ws(",", collect_list($"friend")).alias("friends"))
What If Ich möchte es in SQL verwenden Wie kann ich diese UDF in Spark SQL registrieren? –
@MurtazaKanchwala [Es gibt eine 'register' Methode, die UDAFS akzeptiert] (https://github.com/apache/spark/blob/37c617e4f580482b59e1abbe3c0c27c7125cf605/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration .scala # L63-L69), so sollte es als Standard-UDF funktionieren. – zero323
@ Zero323 jeder Ansatz, um das selbe in Spark sql 1.4.1 –