2013-05-30 17 views
11

Ich spiele mit Spark. Es ist die voreingestellte, vordefinierte Distribution (0.7.0) von der Website, mit Standardkonfiguration, Cluster-Modus, einem Arbeiter (mein localhost). Ich habe die Dokumentation zur Installation gelesen und alles scheint in Ordnung.Spark-Cluster schlägt auf größeren Eingang, funktioniert gut für kleine

Ich habe eine CSV-Datei (verschiedene Größen, 1000 - 1 Millionen Zeilen). Wenn ich meine App mit einer kleinen Eingabedatei (zum Beispiel den 1000 Zeilen) starte, ist alles in Ordnung, das Programm ist in Sekunden erledigt und produziert die erwartete Ausgabe. Aber wenn ich eine größere Datei (100.000 Zeilen oder 1 Million) liefern, schlägt die Ausführung fehl. Ich habe versucht, in den Protokollen zu graben, aber half nicht viel (es wiederholt den ganzen Prozess ungefähr 9-10 mal und exitst mit dem Scheitern danach. Außerdem gibt es einen Fehler, der mit dem Abrufen von einer Nullquelle verbunden ist, ist gescheitert).

Das Ergebnis Iterable von der ersten JavaRDD zurückgegeben ist verdächtig für mich. Wenn ich eine hartcodierte Singleton-Liste (wie res.add ("something"); return res;) zurückgebe, ist alles in Ordnung, sogar mit einer Million Zeilen. Aber wenn ich alle meine Schlüssel hinzufügen möchte ich (28 Zeichenfolgen von Länge 6-20 Zeichen), schlägt der Prozess nur mit dem großen Eingang fehl. Das Problem ist, ich brauche alle diese Schlüssel, das ist die eigentliche Geschäftslogik.

Ich benutze Linux Amd64, Quad-Core, 8 GB RAM. Letztes Oracle Java7 JDK. Spark-config:

SPARK_WORKER_MEMORY=4g 
SPARK_MEM=3g 
SPARK_CLASSPATH=$SPARK_CLASSPATH:/my/super/application.jar 

Ich muss erwähnen, dass, wenn ich das Programm starten, heißt es:

13/05/30 11:41:52 WARN spark.Utils: Your hostname, *** resolves to a loopback address: 127.0.1.1; using 192.168.1.157 instead (on interface eth1) 
13/05/30 11:41:52 WARN spark.Utils: Set SPARK_LOCAL_IP if you need to bind to another address 

Hier ist mein Programm. Es basiert auf dem minimal modifizierten JavaWordCount-Beispiel.

public final class JavaWordCount 
{ 
    public static void main(final String[] args) throws Exception 
    { 
     final JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount", 
      System.getenv("SPARK_HOME"), new String[] {"....jar" }); 

     final JavaRDD<String> words = ctx.textFile(args[1], 1).flatMap(new FlatMapFunction<String, String>() { 

      @Override 
      public Iterable<String> call(final String s) 
      { 
       // parsing "s" as the line, computation, building res (it's a List<String>) 
       return res; 
      } 
     }); 

     final JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String, String, Integer>() { 

      @Override 
      public Tuple2<String, Integer> call(final String s) 
      { 
       return new Tuple2<String, Integer>(s, 1); 
      } 
     }); 
     final JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() { 

      @Override 
      public Integer call(final Integer i1, final Integer i2) 
      { 
       return i1 + i2; 
      } 
     }); 

     counts.collect(); 

     for (Tuple2<?, ?> tuple : counts.collect()) { 
      System.out.println(tuple._1 + ": " + tuple._2); 
     } 
    } 
} 
+0

Vor dem Wechsel Die Eigenschaften des Spark-Systems, mit welcher Ausnahme/welchem ​​Fehler ist Ihr Job fehlgeschlagen? –

+0

In der Funke-Benutzer-Gruppe habe ich die Antwort bekommen, dass .collect() die Sammlung von jeder (temp) RDDs auslösen wird. Das war das eigentliche Problem. Thread mit Lösung hier: http://stackoverflow.com/questions/16832429/spark-cluster-fails-on-bigger-input-works-well-for-small?noredirect=1#comment24468201_16832429 – gyorgyabraham

+1

Ich googelte seit Ewigkeiten und versucht Um eine Lösung für mein Problem zu finden, löst die Antwort auf diese Frage mein Problem. Bitte bearbeiten Sie Ihre Frage so, dass "org.apache.spark.SparkException: Fehler beim Kommunizieren mit MapOutputTracker" in Ihrer Frage enthalten ist, damit Google in Zukunft einfacher googeln kann . – samthebest

Antwort

13

ich es geschafft, es zu beheben, indem Sie die Eigenschaft festlegen auf true spark.mesos.coarse. Weitere Informationen here.

Update: Ich habe mit Spark für ein paar Stunden gespielt. Diese Einstellungen haben mir ein wenig geholfen, aber es scheint fast unmöglich zu sein, ~ 10 Millionen Textzeilen auf einer einzigen Maschine zu verarbeiten.

System.setProperty("spark.serializer", "spark.KryoSerializer"); // kryo is much faster 
System.setProperty("spark.kryoserializer.buffer.mb", "256"); // I serialize bigger objects 
System.setProperty("spark.mesos.coarse", "true"); // link provided 
System.setProperty("spark.akka.frameSize", "500"); // workers should be able to send bigger messages 
System.setProperty("spark.akka.askTimeout", "30"); // high CPU/IO load 

Hinweis: Eine Erhöhung der Rahmengröße scheint besonders hilfreich bei der Verhinderung: org.apache.spark.SparkException: Error communicating with MapOutputTracker

+1

Die 'spark.akka.frameSize' löste auch mein Problem 'org.apache.spark.SparkException: Fehler beim Kommunizieren mit MapOutputTracker'. – samthebest

+0

Führt System aus.setProperty() funktioniert auch in Spark-Shell? Ich kann den frameSize nicht setzen –