2014-12-17 11 views
12

Ich habe einen 10-Knoten-Cluster mit dem ec2-Skript im Standalone-Modus für Spark gestartet. Ich greife auf Daten in s3-Buckets innerhalb der PySpark-Shell zu, aber wenn ich Transformationen auf der RDD durchführe, wird immer nur ein Knoten verwendet. Zum Beispiel wird die unten in Daten aus dem CommonCorpus lesen:Wie können alle Spark-Knoten im Cluster vollständig genutzt werden?

bucket = ("s3n://@aws-publicdatasets/common-crawl/crawl-data/CC-MAIN-2014-23/" 
      "/segments/1404776400583.60/warc/CC-MAIN-20140707234000-00000-ip-10" 
      "-180-212-248.ec2.internal.warc.gz") 

data = sc.textFile(bucket) 
data.count() 

Als ich das laufen, nur eine meiner 10-Slaves die Daten verarbeitet. Ich weiß das, weil nur ein Slave (213) Protokolle von der Aktivität hat, wenn er von der Spark-Webkonsole aus betrachtet wird. Wenn ich die Aktivität in Ganglia betrachte, ist derselbe Knoten (213) der einzige Slave mit einer Spike in der Mem-Nutzung, wenn die Aktivität ausgeführt wurde. enter image description here

Außerdem habe ich genau die gleiche Leistung, wenn ich das gleiche Skript mit einem ec2-Cluster von nur einem Slave ausführen. Ich benutze Spark 1.1.0 und jede Hilfe oder Beratung wird sehr geschätzt.

+5

Ich denke, Sie haben ein ziemlich typisches Problem mit gziped Dateien getroffen, in denen sie nicht parallel geladen werden können, so dass Spark Ihnen standardmäßig eine RDD mit 1 Partition gibt und somit nur 1 Task auf dieser RDD ausgeführt werden kann Zeit. Versuchen Sie, Ihre RDD wie folgt neu zu partitionieren: 'data = sc.textFile (bucket) .repartition (sc.defaultParallelism * 3)' –

+0

Das scheint es zu sein. Warum hast du dich dazu entschieden, mit 3 zu multiplizieren? War das nur ein Beispiel oder war es zweckmäßig? Als ich mit 3 multiplizierte, schien es alle Knoten zu aktivieren. –

+2

Der Spark Tuning Guide empfiehlt [2-3 Aufgaben pro Kern] (http://spark.apache.org/docs/1.1.1/tuning.html#level-of-parallelism). 'sc.defaultParalellism' gibt Ihnen die Anzahl der Kerne in Ihrem Cluster. –

Antwort

17

...ec2.internal.warc.gz

Ich glaube, Sie haben ein ziemlich typisches Problem mit gzip-Dateien in dem getroffen haben, können sie nicht gleichzeitig geladen werden. Genauer gesagt, eine einzelne gezippte Datei kann nicht parallel von mehreren Aufgaben geladen werden, so Spark wird es mit 1 Aufgabe laden und Ihnen so eine RDD mit 1 Partition geben.

(Beachten Sie jedoch, dass Funken 10 gzip-Dateien parallel laden kann nur gut, es ist nur, dass jeder dieser 10 Dateien nur von 1 Task geladen werden können Sie noch Parallelität über Dateien bekommen können, einfach nicht . . in eine Datei)

können Sie bestätigen, dass Sie nur durch Überprüfung der Anzahl der Partitionen in Ihrem RDD explizit 1 Partition:

data.getNumPartitions() 

die Obergrenze für die Anzahl von Aufgaben, die in ausführen können parallel auf einem RD D ist die Anzahl der Partitionen in der RDD oder die Anzahl der Slave-Kerne in Ihrem Cluster, je nachdem, welcher Wert niedriger ist.

In Ihrem Fall ist es die Anzahl der RDD-Partitionen. Sie können, dass erhöhen, indem Ihr RDD repartitioning wie folgt:

data = sc.textFile(bucket).repartition(sc.defaultParallelism * 3) 

Warum sc.defaultParallelism * 3?

Der Spark Tuning Guide empfiehlt 2-3 tasks per core und sc.defaultParalellism gibt Ihnen die Anzahl der Kerne in Ihrem Cluster.