2015-07-10 21 views
15

Ich habe eine Beispielanwendung arbeiten zum Lesen von CSV-Dateien in einen Datenrahmen. Der Datenrahmen kann mit der Methode df.saveAsTable(tablename,mode) in einer Hive-Tabelle im Parkettformat gespeichert werden.Speichern Sie Spark-Datenframe als dynamische partitionierte Tabelle in Hive

Der obige Code funktioniert gut, aber ich habe so viele Daten für jeden Tag, dass ich die Hive-Tabelle basierend auf dem creationdate (Spalte in der Tabelle) dynamisch partitionieren möchte.

gibt es eine Möglichkeit, den Datenrahmen dynamisch zu partitionieren und ihn im Stock-Warehouse zu speichern. Unterlassen Sie die Hard-Codierung der INSERT-Anweisung unter Verwendung von hivesqlcontext.sql(insert into table partittioin by(date)....). How to save DataFrame directly to Hive?

jede Hilfe ist sehr willkommen:

Frage kann als Erweiterung zu berücksichtigen.

Antwort

12

Ich glaube, es so etwas wie dies funktioniert:

df ist ein Datenrahmen mit Jahr, Monat und anderen Spalten

df.write.partitionBy('year', 'month').saveAsTable(...) 

oder

df.write.partitionBy('year', 'month').insertInto(...) 
+0

diese Partitionby Methode versucht. Es funktioniert nur auf RDD-Ebene. Sobald der Datenrahmen erstellt ist, sind die meisten Methoden DBMS-gestylt, z. groupby, orderby, aber sie dienen nicht dazu, in verschiedenen Partitionsordnern auf Hive zu schreiben. – Chetandalal

+4

Ok, also konnte ich es mit 1.4 Version ausarbeiten. df.write(). mode (SaveMode.Append) .partitionBy ("Datum"). saveAsTable ("Tabellenname"); . Dies ändert jedoch mein Datumsfeld in einen ganzzahligen Wert und entfernt das tatsächliche Datum. z.B. Es gibt 9 eindeutige Daten in der Spalte, aber sie sind jetzt als 1,2,3 gespeichert .... und Ordnername ist date = 1,2,3, ... statt date = 20141121. Lass mich wissen, ob es einen Weg gibt, dies zu tun. – Chetandalal

+0

@ subramaniam-ramasubramanian: pls antworten auf OP s frage als antwort statt editieren bestehende antwort –

22

konnte ich partitioniert hive schreiben Tabelle mit df.write().mode(SaveMode.Append).partitionBy("colname").saveAsTable("Table")

Ich musste die Fo aktivieren Eigenschaften, um es zum Laufen zu bringen.

 
hiveContext.setConf("hive.exec.dynamic.partition", "true") 
hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict") 
+0

Wo sollte ich die oben genannten 2 Parameter einstellen? Ich habe versucht, Hive-Shell anzumelden und über Befehle ausführen, es ist fehlgeschlagen. Ich bin mir sicher, dass ich es falsch mache. Könnten Sie bitte sagen, wo kann ich diese Eigenschaften einstellen? –

+2

@VrushankDoshi Sie würden es in das Spark-Programm setzen, direkt nachdem Sie Ihren HiveContext erstellt haben. val sparkConf = neu SparkConf() val sc = neu SparkContext (sparkConf) val hiveContext = neu org.apache.spark.sql.hive.HiveContext (sc) hiveContext.setConf ("hive.exe.dynamic.partition" , "wahr") hiveContext.setConf ("hive.exec.dynamic.partition.mode "," nonstrict ") – MV23

3

Ich sah auch die gleiche Sache, aber mit folgenden Tricks, die ich gelöst.

  1. Wenn wir eine Tabelle als partitioniert und dann partitionierte Spalte Groß-und Kleinschreibung beachten.

  2. Partitionierte Spalte sollte in DataFrame mit demselben Namen vorhanden sein (Groß-/Kleinschreibung beachten). Code:

    var dbName="your database name" 
    var finaltable="your table name" 
    
    // First check if table is available or not.. 
    if (sparkSession.sql("show tables in " + dbName).filter("tableName='" +finaltable + "'").collect().length == 0) { 
        //If table is not available then it will create for you.. 
        println("Table Not Present \n Creating table " + finaltable) 
        sparkSession.sql("use Database_Name") 
        sparkSession.sql("SET hive.exec.dynamic.partition = true") 
        sparkSession.sql("SET hive.exec.dynamic.partition.mode = nonstrict ") 
        sparkSession.sql("SET hive.exec.max.dynamic.partitions.pernode = 400") 
        sparkSession.sql("create table " + dbName +"." + finaltable + "(EMP_ID  string,EMP_Name   string,EMP_Address    string,EMP_Salary bigint) PARTITIONED BY (EMP_DEP STRING)") 
        //Table is created now insert the DataFrame in append Mode 
        df.write.mode(SaveMode.Append).insertInto(empDB + "." + finaltable) 
    } 
    
+0

df.write.mode (SaveMode.Append) .insertInto (empDB +". "+ finalTable) müssen Sie partitionBy nicht erwähnen? Beispiel df.write.mode (SaveMode.Append). partitionBy ("EMP_DEP") .insertInto (empDB + "." + finalTable) –

+0

Keine Notwendigkeit .. seine optionale –

+0

haben für mich nicht funktioniert, Tisch zählen ist –