2

Ist es möglich, eine in Scala geschriebene UDF (oder Funktion) in PySpark zu verwenden? ZB:Registrierung UDF zu SqlContext von Scala zur Verwendung in PySpark

val mytable = sc.parallelize(1 to 2).toDF("spam") 
mytable.registerTempTable("mytable") 
def addOne(m: Integer): Integer = m + 1 
// Spam: 1, 2 

In Scala ist folgendes möglich:

val UDFaddOne = sqlContext.udf.register("UDFaddOne", addOne _) 
val mybiggertable = mytable.withColumn("moreSpam", UDFaddOne(mytable("spam"))) 
// Spam: 1, 2 
// moreSpam: 2, 3 

Ich möchte "UDFaddOne" in PySpark verwenden wie

%pyspark 

mytable = sqlContext.table("mytable") 
UDFaddOne = sqlContext.udf("UDFaddOne") # does not work 
mybiggertable = mytable.withColumn("+1", UDFaddOne(mytable("spam"))) # does not work 

Hintergrund: Wir sind ein Team von Entwicklern, von denen einige in Scala und einige in Python kodieren, und möchten bereits geschriebene Funktionen teilen. Es wäre auch möglich, es in einer Bibliothek zu speichern und zu importieren.

Antwort

3

Soweit ich weiß, bietet PySpark kein Äquivalent zur callUDF-Funktion, und deshalb ist es nicht möglich, direkt auf registrierte UDF zuzugreifen.

Die einfachste Lösung ist hier rohen SQL-Ausdruck verwenden:

mytable.withColumn("moreSpam", expr("UDFaddOne({})".format("spam"))) 

## OR 
sqlContext.sql("SELECT *, UDFaddOne(spam) AS moreSpam FROM mytable") 

## OR 
mytable.selectExpr("*", "UDFaddOne(spam) AS moreSpam") 

Dieser Ansatz eher so begrenzt ist, wenn Sie komplexere Workflows zur Unterstützung benötigen, sollten Sie ein Paket erstellen und kompletten Python-Wrapper bieten. Sie finden und Beispiel UDAF Wrapper in meiner Antwort auf Spark: How to map Python with Scala or Java User Defined Functions?

+0

Vielen Dank für diese und Ihre anderen Antworten - ich löste es wie Sie vorgeschlagen! – Andarin

3

Die folgenden für mich gearbeitet (im Grunde eine Zusammenfassung von mehreren Stellen einschließlich der Verbindung von zero323 zur Verfügung gestellt):

In scala:

package com.example 
import org.apache.spark.sql.functions.udf 

object udfObj extends Serializable { 
    def createUDF = { 
    udf((x: Int) => x + 1) 
    } 
} 

in python (nehmen sc der Funke Zusammenhang Wenn Sie Funken 2.0 verwenden können Sie es von der Funken Sitzung erhalten.):

from py4j.java_gateway import java_import 
from pyspark.sql.column import Column 

jvm = sc._gateway.jvm 
java_import(jvm, "com.example") 
def udf_f(col): 
    return Column(jvm.com.example.udfObj.createUDF().apply(col)) 

Und natürlich machen s ure das Glas in scala erstellt wird hinzugefügt --jars und --driver-Klasse-Pfad

Also, was hier passiert:

Wir haben eine Funktion in einem serializable-Objekt erstellen, das die UDF in scala zurück (ich bin nicht 100% sicher, dass Serializable benötigt wird, war es für mich für komplexere UDF erforderlich, so könnte es sein, weil es Java-Objekte übergeben musste).

In Python verwenden wir Zugriff auf die interne jvm (dies ist ein privates Mitglied, so könnte es in der Zukunft geändert werden, aber ich sehe es nicht herum) und importieren Sie unser Paket mit java_import. Wir greifen auf die createUDF-Funktion zu und rufen sie auf. Dies erzeugt ein Objekt, das über die Methode apply verfügt (Funktionen in scala sind tatsächlich Java-Objekte mit der Methode apply). Die Eingabe für die Methode apply ist eine Spalte. Das Ergebnis der Anwendung der Spalte ist eine neue Spalte, daher müssen wir sie mit der Column-Methode umbrechen, um sie mit Column verfügbar zu machen.

+0

Das sieht in der Tat ein bisschen hacky aus, und ich bin mir nicht sicher, ob ich es in anderen als Testcode verwenden würde, aber es gibt mir mehr Verständnis für die inneren Abläufe, also danke dafür! – Andarin

+0

Versucht, auf Pysaprk 2.1.1, zu laufen Ich erhalte den folgenden Fehler: 'TypeError: 'Spalte' Objekt ist nicht aufrufbar ' in der Spark Codebase scheint scheint, dass Sie Spalte Objekt nicht instanziieren –

+0

Dies bedeutet im Grunde Es gibt ein Problem mit dem Import.Entweder sind die Gläser nicht im Klassenpfad oder der Name ist falsch oder ähnlich. –