2016-08-04 63 views
0

Ich versuche, ein neues Dataset mit einem alten Dataset zusammenzuführen, ich habe einen Seq [String] von Primärschlüsseln für jeden Tabellentyp und einen alten Dataframe und einen neuen Dataframe mit demselben Schema.Wie kann ich auswählen, welche doppelten Zeilen gelöscht werden sollen?

Wenn die Primärschlüsselspalte Werte übereinstimmen, möchte ich die Zeile in der alten Datenrahmen mit der Zeile in der neuen Datenrahmen ersetzen, wenn sie nicht übereinstimmen, ich will in die Zeile hinzuzufügen.

I haben dies bisher:

val finalFrame: DataFrame = oldDF.withColumn("old/new",lit("1")) 
     .union(newDF.withColumn("old/new",lit("2"))) 
     .dropDuplicates(primaryKeySet) 

ich eine wörtliche Spalte 1 der hinzufügen und 2 der Spur zu halten, von denen Reihen sind denen, union sie zusammen, und legen sie die Duplikate auf der Grundlage der Seq [Zeichenfolge] der Primärschlüsselspaltennamen . Das Problem mit dieser Lösung ist, dass ich nicht angeben kann, welche Duplikate aus der Tabelle gelöscht werden, wenn ich angeben könnte, dass Duplikate mit "1" gelöscht werden, was optimal wäre, aber ich bin offen für alternative Lösungen.

Antwort

1

Hämmerte meinen Kopf ein wenig länger und fand einen Trick. Meine primären Schlüssel waren eine Folge, und so nicht direkt in eine partitionBy in einer Fensterfunktion genommen werden können, so dass ich tat dies:

val windowFunction = Window.partitionBy(primaryKeySet.head, primaryKeySet.tail: _*).orderBy(desc("old/new")) 
    val duplicateFreeFinalDF = finalFrame.withColumn("rownum", row_number.over(windowFunction)).where("rownum = 1").drop("rownum").drop("old/new") 

Im Wesentlichen verwendet nur Vararg Expansion so partitionBy würde meine Liste nehmen und dann einem rownum window Funktion, damit ich im Falle eines Duplikats sicherstellen kann, dass die neueste Kopie erstellt wird.