2015-01-24 6 views
11

ich meine Cluster auf diese Weise gestartet haben:Funke: Repartition Strategie nach Textdatei lesen

/usr/lib/spark/bin/spark-submit --class MyClass --master yarn-cluster--num-executors 3 --driver-memory 10g --executor-memory 10g --executor-cores 4 /path/to/jar.jar 

Das erste, was ich tue, ist eine große Textdatei zu lesen, und es zählen:

val file = sc.textFile("/path/to/file.txt.gz") 
println(file.count()) 

Wenn Dabei sehe ich, dass nur einer meiner Knoten die Datei liest und die Zählung ausführt (weil ich nur eine Aufgabe sehe). Wird das erwartet? Sollte ich meine RDD nachpartitionieren oder wenn ich Map Reduce-Funktionen verwende, wird Spark das für mich tun?

+0

Was sind Ihre "defaultMinPartitions"? Wie das Dokument klar sagt, nimmt textFile eine optionale Anzahl von Partitionen Parameter, die standardmäßig auf diese. –

+0

Meine defaultMinPartitions ist größer als eins. Es scheint, dass ich eine bestimmte Anzahl von Partition nicht erzwingen kann, weil es nur eine Textdatei ist ... läuft .... val file = sc.textFile ("/ path/to/file.txt.gz", 8) println (file.partitions.length) gibt 1 – Stephane

+0

zurück Nun, es muss das Lesen an einem Ort tun, weil das inhärent seriell ist. Aber ich kann nicht sehen, warum es diesen optionalen Parameter hätte, wenn es _something_ nicht tun würde. –

Antwort

20

Es sieht so aus, als ob Sie mit einer gezippten Datei arbeiten.

Zitiert aus my answer here:

Ich glaube, Sie haben ein ziemlich typisches Problem mit gzip-Dateien in dem getroffen haben, können sie nicht gleichzeitig geladen werden. Genauer gesagt, eine einzelne gezippte Datei kann nicht parallel von mehreren Aufgaben geladen werden, so Spark wird es mit 1 Aufgabe laden und Ihnen so eine RDD mit 1 Partition geben.

Sie müssen die RDD nach dem Laden explizit neu partitionieren, damit mehr Aufgaben parallel ausgeführt werden können.

Zum Beispiel:

val file = sc.textFile("/path/to/file.txt.gz").repartition(sc.defaultParallelism * 3) 
println(file.count()) 

In Bezug auf die Bemerkungen zu Ihrer Frage, der Grund minPartitions Einstellung hier nicht helfen, weil a gzipped file is not splittable, so Funke wird immer 1 Aufgabe verwenden, um die Datei zu lesen.

Wenn Sie beim Lesen einer normalen Textdatei oder einer Datei mit komprimierbarem Komprimierungsformat wie bzip2 minPartitions setzen, wird Spark diese Anzahl von Aufgaben parallel bereitstellen (bis zur Anzahl verfügbarer Kerne) in Ihrem Cluster), um die Datei zu lesen.

+0

Danke! Würden Sie bzip2 dann über gzip empfehlen? Wenn ich diese Datei häufig lade, was sollte meine Strategie sein, um jeden Lauf zu optimieren? – Stephane

+0

@Stephane - Es hängt davon ab, wie viele Daten eintreffen und wie viel Zeit der Cluster für die Neupartitionierung der Daten aufwendet. Eine einzelne gezippte Datei könnte in Ordnung sein. Wenn die eine Datei zu groß ist, könnten Sie wahrscheinlich auch mehrere gezippte Dateien verwenden (d. H. Vor der Komprimierung aufteilen), da jede gezippte Datei parallel in dieselbe RDD geladen werden kann (eine Aufgabe pro Datei). Das ist wahrscheinlich der Weg des geringsten Widerstands. –

+0

sehr, sehr interessant, danke! Also .gz.001 geteilte Dateien oder bzip2 ... Ich werde mit beiden experimentieren!Ich denke, ja, der große Engpass ist die erste Neuaufteilung. Wenn ich es also schaffen würde, meine Dateien bereits zu teilen, könnte das ein wenig Zeit sparen. – Stephane