Meine Lösung ist sehr ähnlich zu Loki's answer with collect_set_limit
.
ich eine UDF verwenden würde, das tun würde, was Sie wollen nach collect_set
(oder collect_list
) oder ein sehr viel schwieriger UDAF.
Angesichts mehr Erfahrung mit UDFs, würde ich damit zuerst gehen. Obwohl UDFs nicht optimiert sind, ist es für diesen Anwendungsfall in Ordnung.
val limitUDF = udf { (nums: Seq[Long], limit: Int) => nums.take(limit) }
val sample = spark.range(50).withColumn("key", $"id" % 5)
scala> sample.groupBy("key").agg(collect_set("id") as "all").show(false)
+---+--------------------------------------+
|key|all |
+---+--------------------------------------+
|0 |[0, 15, 30, 45, 5, 20, 35, 10, 25, 40]|
|1 |[1, 16, 31, 46, 6, 21, 36, 11, 26, 41]|
|3 |[33, 48, 13, 38, 3, 18, 28, 43, 8, 23]|
|2 |[12, 27, 37, 2, 17, 32, 42, 7, 22, 47]|
|4 |[9, 19, 34, 49, 24, 39, 4, 14, 29, 44]|
+---+--------------------------------------+
scala> sample.
groupBy("key").
agg(collect_set("id") as "all").
withColumn("limit(3)", limitUDF($"all", lit(3))).
show(false)
+---+--------------------------------------+------------+
|key|all |limit(3) |
+---+--------------------------------------+------------+
|0 |[0, 15, 30, 45, 5, 20, 35, 10, 25, 40]|[0, 15, 30] |
|1 |[1, 16, 31, 46, 6, 21, 36, 11, 26, 41]|[1, 16, 31] |
|3 |[33, 48, 13, 38, 3, 18, 28, 43, 8, 23]|[33, 48, 13]|
|2 |[12, 27, 37, 2, 17, 32, 42, 7, 22, 47]|[12, 27, 37]|
|4 |[9, 19, 34, 49, 24, 39, 4, 14, 29, 44]|[9, 19, 34] |
+---+--------------------------------------+------------+
Siehe functions Objekt (udf
Funktion docs).
Danke für die Antwort. Jedoch 1) Ich möchte nur eine Liste von _distinct_ Werten. Ich sehe, es gibt eine rdd.distinct(), aber das scheint nicht zu einem Grenzwert-Parameter 2) Nicht sicher, wie eine Filterfunktion in Collect verwenden. Wie würde ich einen Filter verwenden, um nur eine bestimmte Anzahl von Werten zu erhalten? – user1500142
Auch im Idealfall würde ich vermeiden, rdds zu verwenden. Ich bin momentan etwas wie df.groupBy(). Agg (
user1500142