14

Gegeben ist Tabelle 1 mit einer Spalte "x" vom Typ String. Ich möchte Tabelle 2 mit einer Spalte "y" erstellen, die eine ganzzahlige Darstellung der in "x" angegebenen Datumszeichenfolgen ist.SparkSQL: Wie mit Nullwerten in benutzerdefinierten Funktion umgehen?

Essential ist zu halten null Werte in Spalte "y".

Tabelle 1 (Datenrahmen DF1):

+----------+ 
|   x| 
+----------+ 
|2015-09-12| 
|2015-09-13| 
|  null| 
|  null| 
+----------+ 
root 
|-- x: string (nullable = true) 

Tabelle 2 (Datenrahmen DF2):

+----------+--------+                 
|   x|  y| 
+----------+--------+ 
|  null| null| 
|  null| null| 
|2015-09-12|20150912| 
|2015-09-13|20150913| 
+----------+--------+ 
root 
|-- x: string (nullable = true) 
|-- y: integer (nullable = true) 

Während die benutzerdefinierte Funktion (UDF) auf Werte aus der Spalte "X" Umwandlung in die der Spalte „y“ ist:

val extractDateAsInt = udf[Int, String] (
    (d:String) => d.substring(0, 10) 
     .filterNot("-".toSet) 
     .toInt) 

und arbeitet, mit Nullwerten zu tun ist nicht möglich.

Obwohl, ich kann so etwas tun

val extractDateAsIntWithNull = udf[Int, String] (
    (d:String) => 
    if (d != null) d.substring(0, 10).filterNot("-".toSet).toInt 
    else 1) 

Ich habe keine Möglichkeit gefunden, um null Werte über udfs (natürlich als Int s nicht null sein kann) „produzieren“.

Meine aktuelle Lösung für die Erstellung von DF2 (Tabelle 2) ist wie folgt:

// holds data of table 1 
val df1 = ... 

// filter entries from df1, that are not null 
val dfNotNulls = df1.filter(df1("x") 
    .isNotNull) 
    .withColumn("y", extractDateAsInt(df1("x"))) 
    .withColumnRenamed("x", "right_x") 

// create df2 via a left join on df1 and dfNotNull having 
val df2 = df1.join(dfNotNulls, df1("x") === dfNotNulls("right_x"), "leftouter").drop("right_x") 

Fragen:

  • Die aktuelle Lösung scheint umständlich (und wahrscheinlich nicht effizient WRT Leistung.) . Gibt es einen besseren Weg?
  • @ Spark-Entwickler: Gibt es einen Typ NullableInt geplant/available, so dass das folgende udf möglich ist (siehe Code-Auszug)?

Codeauszug

val extractDateAsNullableInt = udf[NullableInt, String] (
    (d:String) => 
    if (d != null) d.substring(0, 10).filterNot("-".toSet).toInt 
    else null) 
+0

Einfache Lösung ist geschachtelte Typen zu verwenden: http://stackoverflow.com/questions/42791912/how-to-deal-with-spark-udf-input-output-of-primitive-nullable-type/42791913 # 42791913 –

+0

Einfach Die Lösung besteht darin, geboxte Typen zu verwenden: http://stackoverflow.com/questions/42791912/how-to-deal-with-spark-udf-input-output-of-primitive-nullable-type/42791913#42791913 –

Antwort

31

Dies ist, wo Option praktisch ist:

import scala.util.Try 

val extractDateAsOptionInt = udf((d: String) => Try(
    d.substring(0, 10).filterNot("-".toSet).toInt 
).toOption) 

Alle Kreditkarten:

val extractDateAsOptionInt = udf((d: String) => d match { 
    case null => None 
    case s => Some(s.substring(0, 10).filterNot("-".toSet).toInt) 
}) 

oder es sicherer im allgemeinen Fall leicht zu machen geht zu Dmitriy Selivanov wer po haben habe diese Lösung als (fehlende?) Bearbeitung here eingegeben.

Alternative null außerhalb der UDF zu handhaben ist:

import org.apache.spark.sql.functions.{lit, when} 
import org.apache.spark.sql.types.IntegerType 

val extractDateAsInt = udf(
    (d: String) => d.substring(0, 10).filterNot("-".toSet).toInt 
) 

df.withColumn("y", 
    when($"x".isNull, lit(null)) 
    .otherwise(extractDateAsInt($"x")) 
    .cast(IntegerType) 
) 
+0

Hallo zero323, hört sich toll an. Werde das ausprobieren und belohne dich, sobald es funktioniert! BTW, danke für die schnelle Antwort !!! –

+0

Die Option Variante funktioniert und ist wirklich ordentlich !!! Danke für das Aufzeigen! –

6

Ergänzungscode

Mit der schön Antwort von @ zero323, habe ich den folgenden Code, benutzerdefinierte Funktionen zur Verfügung, den Griff haben Nullwerte wie beschrieben. Hoffe, es ist hilfreich für andere!

/** 
* Set of methods to construct [[org.apache.spark.sql.UserDefinedFunction]]s that 
* handle `null` values. 
*/ 
object NullableFunctions { 

    import org.apache.spark.sql.functions._ 
    import scala.reflect.runtime.universe.{TypeTag} 
    import org.apache.spark.sql.UserDefinedFunction 

    /** 
    * Given a function A1 => RT, create a [[org.apache.spark.sql.UserDefinedFunction]] such that 
    * * if fnc input is null, None is returned. This will create a null value in the output Spark column. 
    * * if A1 is non null, Some(f(input) will be returned, thus creating f(input) as value in the output column. 
    * @param f function from A1 => RT 
    * @tparam RT return type 
    * @tparam A1 input parameter type 
    * @return a [[org.apache.spark.sql.UserDefinedFunction]] with the behaviour describe above 
    */ 
    def nullableUdf[RT: TypeTag, A1: TypeTag](f: Function1[A1, RT]): UserDefinedFunction = { 
    udf[Option[RT],A1]((i: A1) => i match { 
     case null => None 
     case s => Some(f(i)) 
    }) 
    } 

    /** 
    * Given a function A1, A2 => RT, create a [[org.apache.spark.sql.UserDefinedFunction]] such that 
    * * if on of the function input parameters is null, None is returned. 
    *  This will create a null value in the output Spark column. 
    * * if both input parameters are non null, Some(f(input) will be returned, thus creating f(input1, input2) 
    *  as value in the output column. 
    * @param f function from A1 => RT 
    * @tparam RT return type 
    * @tparam A1 input parameter type 
    * @tparam A2 input parameter type 
    * @return a [[org.apache.spark.sql.UserDefinedFunction]] with the behaviour describe above 
    */ 
    def nullableUdf[RT: TypeTag, A1: TypeTag, A2: TypeTag](f: Function2[A1, A2, RT]): UserDefinedFunction = { 
    udf[Option[RT], A1, A2]((i1: A1, i2: A2) => (i1, i2) match { 
     case (null, _) => None 
     case (_, null) => None 
     case (s1, s2) => Some((f(s1,s2))) 
    }) 
    } 
} 
8

Scala hat eigentlich eine schöne Fabrik Funktion, Option(), dass diese noch prägnanter machen:

val extractDateAsOptionInt = udf((d: String) => 
    Option(d).map(_.substring(0, 10).filterNot("-".toSet).toInt)) 

intern die Methode anwenden die Option Objekt wird die Null-Check für Sie gerade tun:

def apply[A](x: A): Option[A] = if (x == null) None else Some(x)