2015-07-06 7 views
30

Wie gibt man mehr Spaltenbedingungen vor, wenn zwei Datenrahmen verbunden werden. Zum Beispiel möchte ich Folgendes ausführen:Spark gibt mehrere Spaltenbedingungen für den Dataframe-Join an

val Lead_all = Leads.join(Utm_Master, 
    Leaddetails.columns("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign") == 
    Utm_Master.columns("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"), 
"left") 

Ich möchte nur beitreten, wenn diese Spalten übereinstimmen. Aber obige Syntax ist nicht gültig, da Spalten nur eine Zeichenkette benötigt. Wie bekomme ich, was ich will?

Antwort

53

Es wird ein Funke column/expression API join für einen solchen Fall:

Leaddetails.join(
    Utm_Master, 
    Leaddetails("LeadSource") <=> Utm_Master("LeadSource") 
     && Leaddetails("Utm_Source") <=> Utm_Master("Utm_Source") 
     && Leaddetails("Utm_Medium") <=> Utm_Master("Utm_Medium") 
     && Leaddetails("Utm_Campaign") <=> Utm_Master("Utm_Campaign"), 
    "left" 
) 

Der <=> Operator im Beispiel bedeutet "Equality test that is safe for null values".

Der Hauptunterschied zu einfachen Equality test (===) ist, dass die erste sicher verwendet werden kann, wenn eine der Spalten Nullwerte haben darf.

+3

Können Sie erklären, was der Unterschied zwischen '===' und '<=>' ist? – zero323

+5

Aktualisiert mit mehr Informationen über den Unterschied zwischen diesen Gleichheitstests. – rchukh

+0

Aha, konnte das in der Dokumentation nicht finden. Woher weißt du das? – user568109

3

Eine Sache, die Sie tun können, ist roh SQL zu verwenden:

case class Bar(x1: Int, y1: Int, z1: Int, v1: String) 
case class Foo(x2: Int, y2: Int, z2: Int, v2: String) 

val bar = sqlContext.createDataFrame(sc.parallelize(
    Bar(1, 1, 2, "bar") :: Bar(2, 3, 2, "bar") :: 
    Bar(3, 1, 2, "bar") :: Nil)) 

val foo = sqlContext.createDataFrame(sc.parallelize(
    Foo(1, 1, 2, "foo") :: Foo(2, 1, 2, "foo") :: 
    Foo(3, 1, 2, "foo") :: Foo(4, 4, 4, "foo") :: Nil)) 

foo.registerTempTable("foo") 
bar.registerTempTable("bar") 

sqlContext.sql(
    "SELECT * FROM foo LEFT JOIN bar ON x1 = x2 AND y1 = y2 AND z1 = z2") 
+0

Dies ist die Methode, die ich gerade benutze. Ich hatte gehofft, ich könnte es tun, ohne mich als temporäre Tabellen zu registrieren. Wenn dies mit der Dataframe-API nicht möglich ist, akzeptiere ich die Antwort. – user568109

+0

Wenn ja, @ Rchukhs Antwort ist viel besser. – zero323

7

Ab der Spark-Version 1.5.0 (die derzeit nicht veröffentlicht ist) können Sie an mehreren DataFrame-Spalten teilnehmen. Siehe SPARK-7990: Add methods to facilitate equi-join on multiple join keys.

Python

Leads.join(
    Utm_Master, 
    ["LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"], 
    "left_outer" 
) 

Scala

Die Frage nach einer Antwort Scala gefragt, aber ich verwende Scala nicht. Hier ist meine beste Vermutung ....

Leads.join(
    Utm_Master, 
    Seq("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"), 
    "left_outer" 
) 
+0

Wie lassen wir den Join die Werte case ignorieren (d. H. Es macht Groß- und Kleinschreibung unempfindlich)? Ich habe es versucht und nicht funktioniert. sqlContext.sql ("set spark.sql.caseSensitive = false") –

1

Scala:

Leaddetails.join(
    Utm_Master, 
    Leaddetails("LeadSource") <=> Utm_Master("LeadSource") 
     && Leaddetails("Utm_Source") <=> Utm_Master("Utm_Source") 
     && Leaddetails("Utm_Medium") <=> Utm_Master("Utm_Medium") 
     && Leaddetails("Utm_Campaign") <=> Utm_Master("Utm_Campaign"), 
    "left" 
) 

Um es Groß- und Kleinschreibung, machen

import org.apache.spark.sql.functions.{lower, upper} 

dann benutzen Sie einfach lower(value) im Zustand der Join-Methode.

ZB: dataFrame.filter(lower(dataFrame.col("vendor")).equalTo("fortinet"))

2

In Pyspark Sie einfach jede Bedingung separat angeben:

val Lead_all = Leads.join(Utm_Master, 
    (Leaddetails.LeadSource == Utm_Master.LeadSource) & 
    (Leaddetails.Utm_Source == Utm_Master.Utm_Source) & 
    (Leaddetails.Utm_Medium == Utm_Master.Utm_Medium) & 
    (Leaddetails.Utm_Campaign == Utm_Master.Utm_Campaign)) 

Nur sicher sein, richtig Operatoren und Klammern zu verwenden.

0

Funken SQL unterstützt auf Tupel von Spalten, wenn in Klammern verbinden, wie

... WHERE (list_of_columns1) = (list_of_columns2) 

, die einen Weg kürzer als Angabe gleich Ausdrücke (=) für jedes Paar von Spalten, die durch einen Satz von „und“ s kombinierte .

Zum Beispiel:

SELECT a,b,c 
FROM tab1 t1 
WHERE 
    NOT EXISTS 
    ( SELECT 1 
     FROM t1_except_t2_df e 
     WHERE (t1.a, t1.b, t1.c) = (e.a, e.b, e.c) 
    ) 

statt

SELECT a,b,c 
FROM tab1 t1 
WHERE 
    NOT EXISTS 
    ( SELECT 1 
     FROM t1_except_t2_df e 
     WHERE t1.a=e.a AND t1.b=e.b AND t1.c=e.c 
    ) 

, die zu weniger lesbar ist vor allem, wenn die Liste der Spalten groß ist und Sie wollen einfach mit NULL-Werten beschäftigen.

+0

funktioniert es wirklich? Wird dies in Version 1.6 unterstützt? – Shankar

+0

Ich habe nicht am 1.6 getestet. Dies funktioniert am 2.x. – Tagar

+0

funktioniert nicht in 1.6. – Shankar