2016-06-17 9 views
2

Ich verarbeite eine JSON-Datei, um zwei JSON-Dateien mit Spark (Version 1.6.1) zu generieren. Die Größe der Eingabedatei beträgt ca. 30 ~ 40G (100M Datensätze). Für die generierten Dateien ist der größere etwa 10G ~ 15G (30M Datensätze), der kleinere ist etwa 500M ~ 750M (1,5M Datensätze). Beide Ergebnisdateien sind mit den folgenden Problemen konfrontiert:Spark-Datenframe ist nicht sortiert nach der Sortierung

Ich rief die "sort" -Methode für den Datenframe auf, danach führte "repartition" die Ergebnisse in eine einzige Datei zusammenführen. Dann überprüfte ich die generierte Datei, gefunden in einem Intervall, in dem die Datensätze bestellt werden, aber die gesamte Datei wird nicht global bestellt. z.B. der Schlüssel (aus 3 Spalten aufgebaut) des letzten Datensatzes (Zeile Nr. 1.9M) in der Datei ist "(ou7QDj48c, 014, 075)", aber der Schlüssel eines mittleren Datensatzes in der Datei (Zeile Nr. 375K) ist "(pzwzh5vm8, 003, 023)“

pzwzh5vm8 003 023 
... 
ou7QDj48c 014 075 

Wenn ich Code lokal getestet, um eine relativ kleine Eingangsquelle (Input-Datei 400K Zeilen) verwenden, wird einem solchen Fall nicht passieren.

Mein konkreter Code ist unten dargestellt:

big_json = big_json.sort($"col1", $"col2", $"col3", $"col4") 
big_json.repartition(1).write.mode("overwrite").json("filepath") 

Könnte jemand einen Rat geben? Vielen Dank.

(Ich habe auch bemerkt, dass this thread ein ähnliches Problem diskutiert, aber es gibt keine gute Lösung bis jetzt. Wenn dieses Phänomen wirklich aus Repartition Operation resultiert, könnte mir jemand helfen, effektiv Dataframe in eine einzige JSON-Datei zu transformieren ohne sie in RDD verwandeln, während die sortierten Reihenfolge halten? Dank)

=========================== SOLUTION ==== ======================

Wirklich zu schätzen für die Hilfe von @manos @eliasah und @pkrishna. Ich hatte über die Verwendung von Coalesce nach dem Lesen Ihrer Kommentare nachgedacht, aber nachdem ich seine Leistung untersucht hatte, gab ich die Idee auf.

Die endgültige Lösung ist: Sortieren Sie den Datenrahmen und schreiben Sie in Json, ohne jede Neuverteilung oder Koaleszenz. Nachdem die ganze Arbeit getan ist, rufen Sie den Befehl hdfs unter

hdfs dfs -getmerge /hdfs/file/path/part* ./local.json 

Dieser Befehl ist viel besser als meine vorstellen. Es braucht weder zu viel Zeit noch zu viel Platz und gibt mir eine gute einzelne Datei. Ich habe gerade head und tail auf der riesigen Ergebnisdatei verwendet und es scheint total geordnet.

+0

Nicht neu partitionieren. Lassen Sie es mehrere Dateien erstellen und lesen Sie sie nacheinander, sollten sie in der richtigen Sortierreihenfolge sein. – marios

Antwort

5

Wie marios in seinem Kommentar angegeben hat, werden Sie nach Ihrer Sortieraktion neu partitioniert.

Das ist also, was Aufteilung tun: Es mischt die Daten in der RDD nach dem Zufallsprinzip zufällig, um entweder mehr oder weniger Partitionen zu erstellen und sie über sie auszugleichen. Dies mischt immer alle Daten über das Netzwerk.

Unter der Haube verwendet es Koaleszenz und Shuffle, um Daten neu zu verteilen. [Reference]

Damit sind Ihre Daten nicht mehr sortiert!

1

Da die Partitionsanzahl auf 1 in Ihrem Beispiel gesetzt, was bedeutet, dass die Partition 1.

Um die Anzahl der Partitionen in der rdd zu verringern reduziert wird, Funke stellt eine Transformation coalesce (mit Shuffle = false) was die Ordnung erhält.

Wie Eliasah, erwähnt, dass die Aufteilung unter der Haube mit Koaleszenz. Es ruft coalesce mit shuffle = true auf. Daher kann die Koaleszenzumwandlung anstelle einer Neupartitionierung mit shuffle = false verwendet werden.