Ich bin relativ neu zu entfachen und zur Zeit meinen Kopf Ich Einwickeln darüber, wie die (Wieder-) Partitionierung von Daten auf Protokolldateien von S3 importieren zu entfachen (Parkett-Dateien).Partitionierung auf Import/verschiedene
Ich habe eine Reihe von gzip-Dateien in S3 log in folgendem Format {bucket}/{YYYY-MM-DD}/{CustomerId}.log.gz
. Die Größe der Protokolldateien reicht von < 1 MB bis zu 500 MB.
Auf den Import Ich bin ein pyspark Skript ausgeführt wird, die die folgenden:
# load, unpack and parse file from S3 into an rdd of Rows
# using just python modules: boto, cStringIO and gzip
# then:
rdd = rdd.distinct()
df = sqlContext.createDataFrame(rdd, schema)
df = df.withColumn("timeDay", F.date_format(df.time, "yyyy-MM-dd"))
df.write.parquet("PATH/", mode="append", partitionBy=["timeDay"])
Die Probleme, die ich sind, haben (glaube ich):
distinct
wird alles mit allem vergleichen. Dies ist logisch, kein Problem, aber es produziert eine Menge Shuffle. Wie kann doppelte Reihen WITHIN ein Kunde und ein Tag entfernen?- auch auf
distinct
schafft es (für mich) genau 200 Partitionen mit gemischten Daten pro Kunde und Tag. Also, wenn ich einen Tag importieren bekomme ich 200 Partitionen mit sehr kleinen Dateien, aber wenn ich versuche, einen Monat zu importieren ich auch 200 Partitionen und laufe in AusnahmenMissing an output location for shuffle 0
- ich die Anzahl der Partitionen auf
distinct
definieren könnte, aber wie würde diese mein Problem lösen?
- ich die Anzahl der Partitionen auf
- Ohne
distinct
bekomme ich eine Partition pro Eingabedatei, die bedeutet, dass ich ein paar sehr große Partitionen und einige sehr kleine, die zur Parallelisierung meine Aufgaben bereits schlecht ist
Es wäre wirklich sehr hilfreich sein, wenn jemand Sie könnten mir helfen, meine Dateien beim Import zu teilen/zusammenzuführen, um eine bessere Parallelisierung zu erreichen und das Problem zu lösen, dass ich eindeutige Zeilen pro Kunde und Tag brauche.
PS: Ich bin mit Funken 1.5.2 mit Python 2.
Ich würde wirklich so manchmal muss ich wieder importieren (überschreiben) nur einige Tage später mag diese partitionBy=["timeDay"]
als erste Partition haben.
Vielen Dank im Voraus!
Danke für Ihre Antwort! Ich habe bemerkt, dass die Aufteilung nach Spalten in Spark 1.6 hinzugefügt wurde :( –
Mit '' distinct'' Ich meine, ist es möglich, es auf nur einer Eingabedatei auszuführen, anstatt über alle Eingabedateien zu vergleichen, da Zeilen verschiedener Dateien niemals dupliziert werden? Wenn ich distinct + repartition mit 1.6 laufen lasse, wird es eine Menge Daten zweimal mischen. –
Wenn Sie distinct nur für eine Eingabedatei aufrufen möchten, laden Sie sie in eine separate RDD, rufen distinct auf und verbinden sie dann mit der anderen RDD Rest der Dateien.Wenn Sie alle Zeilen in den anderen Dateien, die in der ersten Datei auftreten, entfernen möchten, könnten Sie wahrscheinlich eine Art reduceByKey tun.Wie beim Mischen gibt es einen Grund, warum Sie auch neu partitionieren möchten? Am besten versuchen Sie, Ihren Code auszuführen, um zu sehen, ob er so schnell ausgeführt wird, dass keine Optimierung erforderlich ist. –