In meinem Projekt möchte ich ADD (+
) Funktion erreichen, aber mein Parameter vielleicht LongType
, DoubleType
, IntType
. Ich benutze sqlContext.udf.register("add",XXX)
, aber ich weiß nicht, wie man XXX
schreibt, die allgemeine Funktionen machen soll.Wie können Sie in Spark SQL eine generische UDF registrieren und verwenden?
Antwort
Ich glaube nicht, dass Sie eine generische UDF registrieren können.
Wenn wir einen Blick auf den signature der register
Methode nehmen (tatsächlich, es ist nur eine der 22 register
Überlastungen, mit einem Argumente für benutzerdefinierte Funktionen verwendet, die andere sind äquivalent):
def register[RT: TypeTag, A1: TypeTag](name: String, func: Function1[A1, RT]): UserDefinedFunction
Wir können sehen, dass es mit einem A1: TypeTag
Typ parametriert ist - der TypeTag bedeutet, dass zum Zeitpunkt der Registrierung müssen wir Beweise des tatsächlichen Typs der UDF-Argument haben. Also - eine generische Funktion übergeben func
ohne sie explizit eingeben kann nicht kompilieren.
Für Ihren Fall könnten Sie in der Lage sein, die Vorteile von Spark Fähigkeit zu nehmen numerische Typen automatisch zu werfen - nur schreiben ein UDF für Double
s, und Sie können es auch auf Int
s gelten (die Ausgabe Double
sein würde, obwohl) :
sqlContext.udf.register("add", (i: Double) => i + 1)
// creating a table with Double and Int types:
sqlContext.createDataFrame(Seq((1.5, 4), (2.2, 5))).registerTempTable("table1")
// applying UDF to both types:
sqlContext.sql("SELECT add(_1), add(_2) FROM table1").show()
// output:
// +---+---+
// |_c0|_c1|
// +---+---+
// |2.5|5.0|
// |3.2|6.0|
// +---+---+
danke, Aber wenn Ihr Typ LongType ist, ist es ein Fehler, sagt, dass lange nicht auf Double zu werfen ist.Jetzt verwende ich java.lang.Number, die Super-Klasse von Double/Long/Int, implementiert es. – yjxyjx
Sie können eine generische UDF
durch eine StructType
mit struct($"col1", $"col2")
schaffen, die Ihre Werte hält und Ihre UDF
Arbeit weg von dieser erstellen. Es wird in Ihrem UDF
als Objekt übergeben, so dass Sie etwas tun können:
val multiAdd = udf[Double,Row](r => {
var n = 0.0
r.toSeq.foreach(n1 => n = n + (n1 match {
case l: Long => l.toDouble
case i: Int => i.toDouble
case d: Double => d
case f: Float => f.toDouble
}))
n
})
val df = Seq((1.0,2),(3.0,4)).toDF("c1","c2")
df.withColumn("add", multiAdd(struct($"c1", $"c2"))).show
+---+---+---+
| c1| c2|add|
+---+---+---+
|1.0| 2|3.0|
|3.0| 4|7.0|
+---+---+---+
Sie können auch interessante Dinge wie eine variable Anzahl von Spalten als Eingabe. In der Tat, unsere UDF
oben bereits definiert ist, dass:
val df = Seq((1, 2L, 3.0f,4.0),(5, 6L, 7.0f,8.0)).toDF("int","long","float","double")
df.printSchema
root
|-- int: integer (nullable = false)
|-- long: long (nullable = false)
|-- float: float (nullable = false)
|-- double: double (nullable = false)
df.withColumn("add", multiAdd(struct($"int", $"long", $"float", $"double"))).show
+---+----+-----+------+----+
|int|long|float|double| add|
+---+----+-----+------+----+
| 1| 2| 3.0| 4.0|10.0|
| 5| 6| 7.0| 8.0|26.0|
+---+----+-----+------+----+
Sie können sogar eine hartcodierte Zahl in den Mix:
df.withColumn("add", multiAdd(struct(lit(100), $"int", $"long"))).show
+---+----+-----+------+-----+
|int|long|float|double| add|
+---+----+-----+------+-----+
| 1| 2| 3.0| 4.0|103.0|
| 5| 6| 7.0| 8.0|111.0|
+---+----+-----+------+-----+
Wenn Sie die UDF
in SQL-Syntax verwenden möchten, können Sie tun können:
sqlContext.udf.register("multiAdd", (r: Row) => {
var n = 0.0
r.toSeq.foreach(n1 => n = n + (n1 match {
case l: Long => l.toDouble
case i: Int => i.toDouble
case d: Double => d
case f: Float => f.toDouble
}))
n
})
df.registerTempTable("df")
// Note that 'int' and 'long' are column names
sqlContext.sql("SELECT *, multiAdd(struct(int, long)) as add from df").show
+---+----+-----+------+----+
|int|long|float|double| add|
+---+----+-----+------+----+
| 1| 2| 3.0| 4.0| 3.0|
| 5| 6| 7.0| 8.0|11.0|
+---+----+-----+------+----+
auch dies funktioniert:
sqlContext.sql("SELECT *, multiAdd(struct(*)) as add from df").show
+---+----+-----+------+----+
|int|long|float|double| add|
+---+----+-----+------+----+
| 1| 2| 3.0| 4.0|10.0|
| 5| 6| 7.0| 8.0|26.0|
+---+----+-----+------+----+
Ich bin neugierig - warum würden Sie Ihre eigene Implementierung von '+' schreiben? Es gibt bereits eine Plus-Funktion, z.B. 'df.select (col (" a ") + col (" b "))' –
Es tut mir leid, ich meine, dass zum Beispiel der Parameter von col ("a") IntType, Parameter von col ("b ") ist LongType, Parameter von col (" c ") ist DoubleType, jetzt möchte ich Add 1 erreichen, Jedem kann ich sqlContext.udf.register (" add ", (x: Int oder Double oder Long) = schreiben > x + 1), aber ich weiß nicht, wie man eine Funktion benutzt, um alle als generische Funktionen zu lösen. Kannst du mir helfen, Danke – yjxyjx
Nun, das kann auch ohne neue UDF gemacht werden: 'df.select (col ("a") + lit (1)) 'würde für jeden numerischen Typ der Spalte' a' funktionieren :) aber ich verstehe, dass Ihre eigentliche Frage über das Typproblem ist und wie man etwas Spezifisches implementiert (richtig?) –