Wie

2015-08-11 11 views
30

arbeitet mit HiveQL Dieses Befehl in der CSV-Daten von Spark-SQL exportieren:Wie

java.lang.RuntimeException: Unsupported language features in query: 
    insert overwrite directory '/data/home.csv' select * from testtable 

Bitte leiten: Ich erhalte eine Fehlermeldung mit einem org.apache.spark.sql.hive.HiveQl Stack-Trace

insert overwrite directory '/data/home.csv' select * from testtable; 

Aber mit Spark-SQL Ich schreibe Export in CSV-Funktion in Spark SQL.

Antwort

45

Sie unter Anweisung verwenden können, um den Inhalt von Datenrahmen im CSV-Format df.write.csv("/data/home/csv")

zu schreiben Wenn Sie den gesamten Datenrahmen in eine einzige CSV-Datei schreiben müssen, verwenden Sie dann df.coalesce(1).write.csv("/data/home/sample.csv")

Für Funken 1 .x, können Sie spark-csv verwenden, um die Ergebnisse in CSV-Dateien zu schreiben

Below scala Schnipsel hel würde p

import org.apache.spark.sql.hive.HiveContext 
// sc - existing spark context 
val sqlContext = new HiveContext(sc) 
val df = sqlContext.sql("SELECT * FROM testtable") 
df.write.format("com.databricks.spark.csv").save("/data/home/csv") 

Um den Inhalt in eine einzige Datei

import org.apache.spark.sql.hive.HiveContext 
// sc - existing spark context 
val sqlContext = new HiveContext(sc) 
val df = sqlContext.sql("SELECT * FROM testtable") 
df.coalesce(1).write.format("com.databricks.spark.csv").save("/data/home/sample.csv") 
+0

Ich habe das Koalesce-Ding ausprobiert, das du erwähnt hast. Es erstellt ein Verzeichnis am angegebenen Pfad mit einer "Teil" -Datei und einer Datei namens "_SUCCESS". Weißt du einen Weg, um tatsächlich nur die eine Datei zu bekommen? –

+0

Nein, ich denke es gibt keinen Weg es zu tun. – sag

1

Die Fehlermeldung schlägt vor, dass dies keine unterstützte Funktion in der Abfragesprache ist. Sie können einen DataFrame jedoch wie gewohnt über die RDD-Schnittstelle (df.rdd.saveAsTextFile) in einem beliebigen Format speichern. Oder Sie können https://github.com/databricks/spark-csv auschecken.

+0

scala> df.write.format ("com.databricks.spark.csv") speichern ("/ data/home.csv") . 18: Fehler: Wert Schreib ist kein Mitglied der org.apache.spark.sql.SchemaRDD Muss ich das aktuelle jar mit dem databricks-Paket erneut erstellen? – shashankS

+0

'DataFrame.write' wurde in Apache Spark 1.4.0 hinzugefügt. –

8

Der einfachste Weg ist, zu schreiben über den Datenrahmen des RDD zur Karte und verwendet mkString:

df.rdd.map(x=>x.mkString(",")) 

Als Spark 1.5 (oder schon vorher) df.map(r=>r.mkString(",")) würde das gleiche tun Wenn Sie CSV-Escaping möchten, können Sie Apache Commons lang dafür verwenden. z.B. hier ist der Code, den wir

def DfToTextFile(path: String, 
        df: DataFrame, 
        delimiter: String = ",", 
        csvEscape: Boolean = true, 
        partitions: Int = 1, 
        compress: Boolean = true, 
        header: Option[String] = None, 
        maxColumnLength: Option[Int] = None) = { 

    def trimColumnLength(c: String) = { 
     val col = maxColumnLength match { 
     case None => c 
     case Some(len: Int) => c.take(len) 
     } 
     if (csvEscape) StringEscapeUtils.escapeCsv(col) else col 
    } 
    def rowToString(r: Row) = { 
     val st = r.mkString("~-~").replaceAll("[\\p{C}|\\uFFFD]", "") //remove control characters 
     st.split("~-~").map(trimColumnLength).mkString(delimiter) 
    } 

    def addHeader(r: RDD[String]) = { 
     val rdd = for (h <- header; 
        if partitions == 1; //headers only supported for single partitions 
        tmpRdd = sc.parallelize(Array(h))) yield tmpRdd.union(r).coalesce(1) 
     rdd.getOrElse(r) 
    } 

    val rdd = df.map(rowToString).repartition(partitions) 
    val headerRdd = addHeader(rdd) 

    if (compress) 
     headerRdd.saveAsTextFile(path, classOf[GzipCodec]) 
    else 
     headerRdd.saveAsTextFile(path) 
    } 
+2

Während dies die einfachste Antwort (und eine gute) ist, wenn Sie Text doppelte Anführungszeichen haben, müssen Sie sie berücksichtigen. – devonlazarus

+0

Einfach den Fehler nach dem Erstellen RDD für die Tabelle erhalten scala> df.rdd.map (x => x.mkString (",")); : 18: Fehler: Wert rdd ist kein Mitglied von org.apache.spark.sql.SchemaRDD df.rdd.map (x => x.mkString (",")); – shashankS

22

Die Antwort oben mit Funken csv korrekt verwenden, aber es gibt ein Problem - die Bibliothek mehrere Dateien auf der Grundlage der Datenrahmenpartitionierung erstellt. Und das brauchen wir normalerweise nicht. So können Sie alle Partitionen zu einem kombinieren:

df.coalesce(1). 
    write. 
    format("com.databricks.spark.csv"). 
    option("header", "true"). 
    save("myfile.csv") 

und benennen Sie die Ausgabe des lib (Namen „Teil-00000“) zu einem Wunsch Dateinamen.

Dieser Blog-Eintrag liefert weitere Details: https://fullstackml.com/2015/12/21/how-to-export-data-frame-from-apache-spark/

+2

Sollte es df.repartition.write anstelle von df.write.repartition sein? –

+0

@Cedric du hast Recht, danke! Neupartition zuerst! Bearbeitet. –

+2

Man kann auch ein Modell hinzufügen, wenn man in eine bestehende Datei schreiben möchte. 'resultDF.repartition (1) .write.mode (" Anhängen "). format (" com.databricks.spark.csv "). option (" header "," true ") .save (" s3: // .. . ') ' – Pramit

24

Da Funken 2.Xspark-csv als native datasource integriert ist. Daher vereinfacht die notwendige Anweisung (Fenster)

df.write 
    .option("header", "true") 
    .csv("file:///C:/out.csv") 

oder UNIX

df.write 
    .option("header", "true") 
    .csv("/var/out.csv") 
+1

Dies sollte jetzt die akzeptierte Antwort sein. –

+0

Hi all, Gibt es eine Möglichkeit, die Datei zu ersetzen, wie es fehlschlägt, wenn es versucht, die Datei neu zu schreiben. – user3341078

+0

Sicher! '.mode (" überschreiben "). csv ("/var/out.csv ")' – Boern

0

Mit Hilfe von Funken csv wir in eine CSV-Datei schreiben kann.

val dfsql = sqlContext.sql("select * from tablename") 
dfsql.write.format("com.databricks.spark.csv").option("header","true").save("output.csv")`