2016-07-20 14 views
15

Ich möchte bestimmte Partitionen anstelle von alle in Spark zu überschreiben. Ich versuche, den folgenden Befehl ein:Überschreiben bestimmter Partitionen in Spark Dataframe Schreibmethode

df.write.orc('maprfs:///hdfs-base-path','overwrite',partitionBy='col4') 

wo df Datenrahmen ist die inkrementellen Daten mit überschrieben werden.

hdfs-base-path enthält die Stammdaten.

Wenn ich den obigen Befehl versuche, löscht er alle Partitionen und fügt die in df vorhandenen im hdfs-Pfad ein.

Was meine Anforderung ist überschreiben nur die Partitionen in df am angegebenen hdfs Pfad. Kann mir bitte jemand dabei helfen?

Antwort

13

Dies ist ein häufiges Problem. Die einzige Lösung, mit Funken bis zu 2,0 ist direkt in die Trennwand Verzeichnis zu schreiben, zum Beispiel

df.write.mode(SaveMode.Overwrite).save("/root/path/to/data/partition_col=value") 

Wenn Sie Funken verwenden vor 2.0, werden Sie Spark stoppen müssen von Metadatendateien emittieren (weil sie wollen brechen Entdeckung automatische Partition) mit:

sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false") 

Wenn Sie Funken vor 1.6.2 verwenden, müssen Sie auch die _SUCCESS Datei in /root/path/to/data/partition_col=value oder seine Anwesenheit löschen wird die automatische Partition Entdeckung brechen. (Ich empfehle dringend, 1.6.2 oder höher zu verwenden.)

Sie können ein paar weitere Details über die Verwaltung von großen partitionierten Tabellen von meinem Spark Summit sprechen unter Bulletproof Jobs.

+0

Dank viel Sim für die Beantwortung . Nur ein paar Zweifel mehr, wenn der anfängliche Datenrahmen Daten für ungefähr 100 Partitionen hat, dann muss ich diesen Datenrahmen in weitere 100 Datenrahmen mit dem entsprechenden Partitionswert teilen und direkt in das Partitionsverzeichnis einfügen. Kann das Speichern dieser 100 Partitionen parallel erfolgen? Ich benutze auch Spark 1.6.1 Wenn ich das orc-Dateiformat verwende, wie kann ich aufhören, Metadaten-Dateien dafür auszugeben, ist es dasselbe, was du für Parkett erwähnt hast? – yatin

+0

Re: Metadaten, nein, ORC ist ein anderes Format und ich glaube nicht, dass es Nicht-Datendateien erzeugt. Mit 1.6.1 benötigen Sie nur ORC-Dateien in den Unterverzeichnissen der Partitionsstruktur. Sie müssen daher '_SUCCESS' von Hand löschen. Sie können parallel zu mehr als einer Partition schreiben, aber nicht von demselben Job. Starten Sie mehrere Jobs basierend auf Ihren Plattformfunktionen, z. B. mithilfe der REST-API. – Sim

+3

Irgendwelche Updates darüber? Setzt saveToTable() nur bestimmte Partitionen überschreiben? Ist Spark intelligent genug, um herauszufinden, welche Partitionen überschrieben wurden? –

4

Mit Spark-1.6 ...

Die HiveContext diesen Prozess erheblich vereinfachen kann. Der Schlüssel ist, dass Sie zuerst die Tabelle in Hive erstellen müssen, indem Sie eine CREATE EXTERNAL TABLE Anweisung mit Partitionierung definieren. Zum Beispiel:

# Hive SQL 
CREATE EXTERNAL TABLE test 
(name STRING) 
PARTITIONED BY 
(age INT) 
STORED AS PARQUET 
LOCATION 'hdfs:///tmp/tables/test' 

Von hier aus lassen Sie uns sagen, dass Sie einen Datenrahmen mit neuen Datensätzen in es für eine bestimmte Partition (oder mehrere Partitionen). Sie können eine HiveContext SQL-Anweisung verwenden, um ein INSERT OVERWRITE mit diesem Datenrahmen durchzuführen, die in die Tabelle nur überschrieben werden für die Partitionen im Datenrahmen enthalten ist:

# PySpark 
hiveContext = HiveContext(sc) 
update_dataframe.registerTempTable('update_dataframe') 

hiveContext.sql("""INSERT OVERWRITE TABLE test PARTITION (age) 
        SELECT name, age 
        FROM update_dataframe""") 

Hinweis: update_dataframe in diesem Beispiel hat ein Schema, das die, dass die Spiele Ziel test Tabelle.

Ein einfacher Fehler bei diesem Ansatz besteht darin, den Schritt CREATE EXTERNAL TABLE in Hive zu überspringen und die Tabelle einfach mit den Schreibmethoden der Dataframe-API zu erstellen. Insbesondere bei Parquet-basierten Tabellen wird die Tabelle nicht korrekt definiert, um Hives INSERT OVERWRITE... PARTITION-Funktion zu unterstützen.

Hoffe, das hilft.

+0

Ich habe den obigen Ansatz versucht, ich bekomme den Fehler wie 'dynamische Partition strengen Modus erfordert mindestens eine statische Partition Spalte. Um dies zu deaktivieren, setze hive.exec.dynamic.partition.mode = nonstrict' – Shankar

+0

Ich habe keine statischen Partitionsspalten – Shankar

0

Wenn Sie DataFrame verwenden, möchten Sie möglicherweise die Hive-Tabelle über Daten verwenden. In diesem Fall, dass Sie Methode

df.write.mode(SaveMode.Overwrite).partitionBy("partition_col").insertInto(table_name) 

Es Partitionen überschrieben werden, den Datenrahmen enthält nur anrufen müssen.

Es gibt keine Notwendigkeit, das Format (orc) anzugeben, da Spark das Hive-Tabellenformat verwendet.

Es funktioniert gut in Spark-Version 1.6

0

Sie so etwas tun könnte den Job einspringende (idempotent) zu machen: (versucht, dies auf Funken 2,2)

# drop the partition 
drop_query = "ALTER TABLE table_name DROP IF EXISTS PARTITION (partition_col='{val}')".format(val=target_partition) 
print drop_query 
spark.sql(drop_query) 

# delete directory 
dbutils.fs.rm(<partition_directoy>,recurse=True) 

# Load the partition 
df.write\ 
    .partitionBy("partition_col")\ 
    .saveAsTable(table_name, format = "parquet", mode = "append", path = <path to parquet>)