2016-07-28 24 views
5

Ich habe eine einfache UDF zum Konvertieren oder Extrahieren einiger Werte aus einem Zeitfeld in einem Temptabl in Spark. Ich registriere die Funktion, aber wenn ich die Funktion mit sql aufrufen, löst es eine NullPointerException aus. Unten ist meine Funktion und Prozess der Ausführung. Ich benutze Zeppelin. Schlimm war das gestern, aber es hat heute Morgen aufgehört zu arbeiten.Scala und Spark UDF-Funktion

Funktion

def convert(time:String) : String = { 
    val sdf = new java.text.SimpleDateFormat("HH:mm") 
    val time1 = sdf.parse(time) 
    return sdf.format(time1) 
} 

die Funktion Register

sqlContext.udf.register("convert",convert _) 

Test, um die Funktion ohne SQL - Dies funktioniert

convert(12:12:12) -> returns 12:12 

Testen Sie die Funktion mit SQL in Zeppelin dies nicht gelingt.

%sql 
select convert(time) from temptable limit 10 

Struktur der temptable

root 
|-- date: string (nullable = true) 
|-- time: string (nullable = true) 
|-- serverip: string (nullable = true) 
|-- request: string (nullable = true) 
|-- resource: string (nullable = true) 
|-- protocol: integer (nullable = true) 
|-- sourceip: string (nullable = true) 

Ein Teil der Stacktrace, die ich immer bin.

java.lang.NullPointerException 
    at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionInfo(FunctionRegistry.java:643) 
    at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionInfo(FunctionRegistry.java:652) 
    at org.apache.spark.sql.hive.HiveFunctionRegistry.lookupFunction(hiveUdfs.scala:54) 
    at org.apache.spark.sql.hive.HiveContext$$anon$3.org$apache$spark$sql$catalyst$analysis$OverrideFunctionRegistry$$super$lookupFunction(HiveContext.scala:376) 
    at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction$2.apply(FunctionRegistry.scala:44) 
    at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction$2.apply(FunctionRegistry.scala:44) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$class.lookupFunction(FunctionRegistry.scala:44) 

Antwort

7

Verwenden UDF stattdessen eine Funktion direkt von definieren

import org.apache.spark.sql.functions._ 

val convert = udf[String, String](time => { 
     val sdf = new java.text.SimpleDateFormat("HH:mm") 
     val time1 = sdf.parse(time) 
     sdf.format(time1) 
    } 
) 

Eingabeparameter des UDF ist Spalte (oder Spalten). Und der Rückgabetyp ist Spalte.

case class UserDefinedFunction protected[sql] (
    f: AnyRef, 
    dataType: DataType, 
    inputTypes: Option[Seq[DataType]]) { 

    def apply(exprs: Column*): Column = { 
    Column(ScalaUDF(f, dataType, exprs.map(_.expr), inputTypes.getOrElse(Nil))) 
    } 
}