7

Spark bietet jetzt vordefinierte Funktionen, die in Datenrahmen verwendet werden können, und es scheint, dass sie stark optimiert sind. Meine ursprüngliche Frage sollte schneller sein, aber ich habe selbst einige Tests durchgeführt und festgestellt, dass die Funkenfunktionen zumindest in einem Fall etwa 10 mal schneller sind. Weiß jemand, warum dies so ist, und wann würde ein udf schneller sein (nur für Fälle, in denen eine identische Funkenfunktion existiert)?Spark-Funktionen vs UDF-Leistung?

Hier mein Testcode (lief auf Databricks Gemeinschaft ed):

# UDF vs Spark function 
from faker import Factory 
from pyspark.sql.functions import lit, concat 
fake = Factory.create() 
fake.seed(4321) 

# Each entry consists of last_name, first_name, ssn, job, and age (at least 1) 
from pyspark.sql import Row 
def fake_entry(): 
    name = fake.name().split() 
    return (name[1], name[0], fake.ssn(), fake.job(), abs(2016 - fake.date_time().year) + 1) 

# Create a helper function to call a function repeatedly 
def repeat(times, func, *args, **kwargs): 
    for _ in xrange(times): 
     yield func(*args, **kwargs) 
data = list(repeat(500000, fake_entry)) 
print len(data) 
data[0] 

dataDF = sqlContext.createDataFrame(data, ('last_name', 'first_name', 'ssn', 'occupation', 'age')) 
dataDF.cache() 

UDF-Funktion:

concat_s = udf(lambda s: s+ 's') 
udfData = dataDF.select(concat_s(dataDF.first_name).alias('name')) 
udfData.count() 

Spark-Funktion:

spfData = dataDF.select(concat(dataDF.first_name, lit('s')).alias('name')) 
spfData.count() 

beide mehrmals Ran, die udf benötigte normalerweise etwa 1,1 - 1,4 s, und die Spark Concat-Funktion nahm immer weniger als 0,15 s in Anspruch.

Antwort

15

wenn wäre ein UDF schnell

Wenn Sie Python UDF fragen, die Antwort ist wahrscheinlich nie. Da SQL-Funktionen relativ einfach sind und nicht für komplexe Aufgaben ausgelegt sind, ist es nahezu unmöglich, die Kosten der wiederholten Serialisierung, Deserialisierung und Datenverschiebung zwischen Python-Interpreter und JVM zu kompensieren.

Weiß jemand, warum dies so ist

Die Hauptgründe oben bereits aufgezählt und kann auf eine einfache Tatsache reduziert werden, dass Funken DataFrame durch einfache nativ eine JVM Struktur und Standard-Zugriffsverfahren implementiert ist Aufrufe an Java-API. UDF hingegen sind in Python implementiert und erfordern das Verschieben von Daten hin und her.

Während PySpark im Allgemeinen Datenbewegungen zwischen JVM und Python erfordert, erfordert es im Falle einer Low-Level-RDD-API in der Regel keine teure Serde-Aktivität. Spark SQL fügt zusätzliche Kosten für Serialisierung und Serialisierung sowie Kosten für das Verschieben von Daten aus und in unsichere Darstellung in JVM hinzu. Der letztere ist spezifisch für alle UDFs (Python, Scala und Java), aber der erste ist spezifisch für nicht-muttersprachliche Sprachen.

Im Gegensatz zu UDFs funktionieren Spark SQL-Funktionen direkt auf JVM und sind in der Regel gut in Catalyst und Tungsten integriert. Dies bedeutet, dass diese im Ausführungsplan optimiert werden können und in den meisten Fällen von den Optimierungen von codgen und anderen Tungsten profitieren können. Darüber hinaus können diese Daten in ihrer "nativen" Darstellung bearbeiten.

In gewisser Weise besteht das Problem hier darin, dass Python UDF Daten in den Code bringen muss, während SQL-Ausdrücke umgekehrt laufen.

+0

Fantastische Antwort, genau das, was ich suchte. Ich vermutete, dass es aufgrund von Datenverschiebungen zwischen Python-Java war, war nur nicht sicher. Ich schätze die zusätzlichen Informationen, dass diese auch von Catalyst und Tungsten profitieren können, daher wird es für mich viel wichtiger sein, sie so weit wie möglich in meinen Code zu implementieren und UDFs zu minimieren. Ein bisschen abseits von Thema, aber würden Sie zufällig wissen, ob numpy Fähigkeiten zu Spark Dataframes bald kommen werden? Dies hat eines meiner Projekte weitgehend auf RDDs gehalten. – alfredox

+0

Ich bin nicht sicher, was genau Sie mit "numpy Fähigkeiten" meinen. – zero323

+0

Sie können ein numpiges Array nicht als Zeilenelement hinzufügen. Derzeit unterstützen Spark Rows verschiedene Datentypen wie StringType, BoolType, FloatType, aber Sie können dort kein numpy Array speichern. – alfredox

0

Seit dem 30. Oktober 2017 hat Spark vectorized pdfs für pyspark eingeführt.

https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html

Der Grund, dass Python UDF langsam ist, ist wahrscheinlich die PySpark UDF in einer optimierten Art und Weise nicht implementiert ist:

Nach dem Absatz aus dem Link.

Spark hinzugefügt eine Python-API in Version 0.7 mit Unterstützung für benutzerdefinierte Funktionen. Diese benutzerdefinierten Funktionen arbeiten One-row-at-time und leiden daher unter hohem Serialisierungs- und Aufrufoverhead.

jedoch die neu vektorisiert pdfs scheint die Leistung viel zu verbessern:

im Bereich von 3x auf über 100x.

enter image description here