2016-04-28 6 views
3

In meinem Projekt möchte ich ADD (+) Funktion erreichen, aber mein Parameter vielleicht LongType, DoubleType, IntType. Ich benutze sqlContext.udf.register("add",XXX), aber ich weiß nicht, wie man XXX schreibt, die allgemeine Funktionen machen soll.Wie können Sie in Spark SQL eine generische UDF registrieren und verwenden?

+0

Ich bin neugierig - warum würden Sie Ihre eigene Implementierung von '+' schreiben? Es gibt bereits eine Plus-Funktion, z.B. 'df.select (col (" a ") + col (" b "))' –

+0

Es tut mir leid, ich meine, dass zum Beispiel der Parameter von col ("a") IntType, Parameter von col ("b ") ist LongType, Parameter von col (" c ") ist DoubleType, jetzt möchte ich Add 1 erreichen, Jedem kann ich sqlContext.udf.register (" add ", (x: Int oder Double oder Long) = schreiben > x + 1), aber ich weiß nicht, wie man eine Funktion benutzt, um alle als generische Funktionen zu lösen. Kannst du mir helfen, Danke – yjxyjx

+0

Nun, das kann auch ohne neue UDF gemacht werden: 'df.select (col ("a") + lit (1)) 'würde für jeden numerischen Typ der Spalte' a' funktionieren :) aber ich verstehe, dass Ihre eigentliche Frage über das Typproblem ist und wie man etwas Spezifisches implementiert (richtig?) –

Antwort

0

Ich glaube nicht, dass Sie eine generische UDF registrieren können.

Wenn wir einen Blick auf den signature der register Methode nehmen (tatsächlich, es ist nur eine der 22 register Überlastungen, mit einem Argumente für benutzerdefinierte Funktionen verwendet, die andere sind äquivalent):

def register[RT: TypeTag, A1: TypeTag](name: String, func: Function1[A1, RT]): UserDefinedFunction 

Wir können sehen, dass es mit einem A1: TypeTag Typ parametriert ist - der TypeTag bedeutet, dass zum Zeitpunkt der Registrierung müssen wir Beweise des tatsächlichen Typs der UDF-Argument haben. Also - eine generische Funktion übergeben func ohne sie explizit eingeben kann nicht kompilieren.

Für Ihren Fall könnten Sie in der Lage sein, die Vorteile von Spark Fähigkeit zu nehmen numerische Typen automatisch zu werfen - nur schreiben ein UDF für Double s, und Sie können es auch auf Int s gelten (die Ausgabe Double sein würde, obwohl) :

sqlContext.udf.register("add", (i: Double) => i + 1) 

// creating a table with Double and Int types: 
sqlContext.createDataFrame(Seq((1.5, 4), (2.2, 5))).registerTempTable("table1") 

// applying UDF to both types: 
sqlContext.sql("SELECT add(_1), add(_2) FROM table1").show() 

// output: 
// +---+---+ 
// |_c0|_c1| 
// +---+---+ 
// |2.5|5.0| 
// |3.2|6.0| 
// +---+---+ 
+0

danke, Aber wenn Ihr Typ LongType ist, ist es ein Fehler, sagt, dass lange nicht auf Double zu werfen ist.Jetzt verwende ich java.lang.Number, die Super-Klasse von Double/Long/Int, implementiert es. – yjxyjx

4

Sie können eine generische UDF durch eine StructType mit struct($"col1", $"col2") schaffen, die Ihre Werte hält und Ihre UDF Arbeit weg von dieser erstellen. Es wird in Ihrem UDF als Objekt übergeben, so dass Sie etwas tun können:

val multiAdd = udf[Double,Row](r => { 
    var n = 0.0 
    r.toSeq.foreach(n1 => n = n + (n1 match { 
    case l: Long => l.toDouble 
    case i: Int => i.toDouble 
    case d: Double => d 
    case f: Float => f.toDouble 
    })) 
    n 
}) 

val df = Seq((1.0,2),(3.0,4)).toDF("c1","c2") 
df.withColumn("add", multiAdd(struct($"c1", $"c2"))).show 
+---+---+---+ 
| c1| c2|add| 
+---+---+---+ 
|1.0| 2|3.0| 
|3.0| 4|7.0| 
+---+---+---+ 

Sie können auch interessante Dinge wie eine variable Anzahl von Spalten als Eingabe. In der Tat, unsere UDF oben bereits definiert ist, dass:

val df = Seq((1, 2L, 3.0f,4.0),(5, 6L, 7.0f,8.0)).toDF("int","long","float","double") 

df.printSchema 
root 
|-- int: integer (nullable = false) 
|-- long: long (nullable = false) 
|-- float: float (nullable = false) 
|-- double: double (nullable = false) 

df.withColumn("add", multiAdd(struct($"int", $"long", $"float", $"double"))).show 
+---+----+-----+------+----+ 
|int|long|float|double| add| 
+---+----+-----+------+----+ 
| 1| 2| 3.0| 4.0|10.0| 
| 5| 6| 7.0| 8.0|26.0| 
+---+----+-----+------+----+ 

Sie können sogar eine hartcodierte Zahl in den Mix:

df.withColumn("add", multiAdd(struct(lit(100), $"int", $"long"))).show 
+---+----+-----+------+-----+ 
|int|long|float|double| add| 
+---+----+-----+------+-----+ 
| 1| 2| 3.0| 4.0|103.0| 
| 5| 6| 7.0| 8.0|111.0| 
+---+----+-----+------+-----+ 

Wenn Sie die UDF in SQL-Syntax verwenden möchten, können Sie tun können:

sqlContext.udf.register("multiAdd", (r: Row) => { 
    var n = 0.0 
    r.toSeq.foreach(n1 => n = n + (n1 match { 
    case l: Long => l.toDouble 
    case i: Int => i.toDouble 
    case d: Double => d 
    case f: Float => f.toDouble 
    })) 
    n 
}) 
df.registerTempTable("df") 

// Note that 'int' and 'long' are column names 
sqlContext.sql("SELECT *, multiAdd(struct(int, long)) as add from df").show 
+---+----+-----+------+----+ 
|int|long|float|double| add| 
+---+----+-----+------+----+ 
| 1| 2| 3.0| 4.0| 3.0| 
| 5| 6| 7.0| 8.0|11.0| 
+---+----+-----+------+----+ 

auch dies funktioniert:

sqlContext.sql("SELECT *, multiAdd(struct(*)) as add from df").show 
+---+----+-----+------+----+ 
|int|long|float|double| add| 
+---+----+-----+------+----+ 
| 1| 2| 3.0| 4.0|10.0| 
| 5| 6| 7.0| 8.0|26.0| 
+---+----+-----+------+----+