2015-05-20 8 views
6

Ich habe Schwierigkeiten mit Schritt, wo ich jede RDD-Partition zu separieren Parkett-Datei mit einem eigenen Verzeichnis schreiben möchte. Beispiel ist:Schreiben RDD-Partitionen in einzelne Parkett-Dateien in einem eigenen Verzeichnis

<root> 
     <entity=entity1> 
      <year=2015> 
       <week=45> 
        data_file.parquet 

Vorteil dieses Formats ist, es diese direkt in SparkSQL als Spalten verwenden kann, und ich werde diese Daten nicht in der tatsächlichen Datei wiederholen. Dies wäre ein guter Weg, um zu einer bestimmten Partition zu gelangen, ohne an anderer Stelle separate Partitionierungs-Metadaten zu speichern.

Als einen vorhergehenden Schritt habe ich alle Daten aus einer großen Anzahl von Gzip-Dateien geladen und basierend auf dem oben genannten Schlüssel partitioniert.

Möglicher Weg wäre, jede Partition als separate RDD zu bekommen und dann zu schreiben, obwohl ich keine gute Möglichkeit finden konnte, es zu tun.

Jede Hilfe wird geschätzt. Übrigens bin ich neu in diesem Stapel.

Antwort

2

Ich denke, es ist möglich, indem Sie foreachPartition(f: Iterator[T] => Unit) auf der RDD aufrufen, die Sie speichern möchten.

In der Funktion gelieferten man in foreachPartition:

  1. dem hdfs://localhost:9000/parquet_data/year=x/week=y
  2. einem ParquetWriter
  3. Abgaspfad den Iterator Bereiten jede Zeile in die recordWriter durch Einfügen.
  4. reinigen
+0

Danke für die Antwort. Ich verstehe Ihre Antwort konzeptionell und versuchte sie zu implementieren, konnte aber keinen Weg finden, ParquetRecord writer zu konstruieren? Jedes Codebeispiel in Scala wird sehr hilfreich sein. –

+0

@RajeevPrasad, Ich habe die Antwort mit einem Parkett Schriftsteller Beispiel bearbeitet, überprüfen Sie bitte die verwandten src Code von Funken, um zu sehen, wie es funktioniert :) –

+0

Danke für Sie helfen. Das funktioniert gut. Weißt du, was die Leistung dieser Implementierung im Vergleich zum direkten Schreiben mit saveAsParquetFile wäre? –

42

up Ich denke nicht, dass die akzeptierte Antwort in geeigneter Weise die Frage beantwortet.

versuchen, etwas wie folgt aus:

df.write.partitionBy("year", "month", "day").parquet("/path/to/output") 

Und Sie werden die partitioniert Verzeichnisstruktur erhalten.

+0

Zustimmen, dies sollte die akzeptierte Antwort sein. –

+0

Ja, diese Antwort ist einfacher und zukunftssicher. – Sim