2016-05-03 6 views
3

Ich bin immer noch ziemlich neu bei Spark, aber ich konnte die Spark App erstellen, die ich Daten von unserem SQL Server mit JDBC-Treibern verarbeiten kann (wir entfernen teure SPs), die App lädt ein paar Tabellen aus Sql Server über JDBC in Datenrahmen, dann mache ich ein paar Joins, eine Gruppe und einen Filter schließlich einige Daten zurück über JDBC die Ergebnisse in eine andere Tabelle. All dies funktioniert gut bei Spark EMR in Amazon Web Services in einer m3.xlarge mit 2 Kernen in etwa einer Minute.Warum läuft meine Spark App nur in einem Executor?

Meine Frage ist folgende: 1. gerade jetzt habe ich 1 Master und 2 Kerne auf dem Cluster, aber jedes Mal, wenn ich einen neuen Schritt starte, scheint es von dem, was ich vom History Server sehen kann, nur 1 Executor wird verwendet, da ich 2 Executoren aufgelistet habe, Treiber ohne jegliche Verwendung, einen Executor mit ID 1, der ungefähr 1410 Tasks verarbeitet. Und ich bin völlig unsicher, wie es weitergehen soll.

Auch dies ist spezifisch für AWS, aber ich wollte nicht 2 Fragen posten, da sie irgendwie verwandt sind, gibt es eine Möglichkeit, dass ich 2 Schritte gleichzeitig ausführen kann? Das bedeutet, dass wir zwei Prozesse gleichzeitig ausführen können, da wir diesen Prozess viele Male am Tag ausführen (er verarbeitet Kundendaten). Ich weiß, dass ich mit diesem Schritt einen neuen Cluster starten kann, aber ich möchte die Verarbeitung schnell durchführen können und das Starten eines neuen Clusters dauert zu lange. Danke !!!

+0

Laufen Sie auf Garn (AWS EMR 4.x)? Wie sieht der Startbefehl (Spark-Submit) aus? –

+0

Ja, ich betreibe EMR 4.6. Ich startete es viele Male mit verschiedenen Argumenten und wenig Veränderung. "spark-submit --deploy-mode cluster --executor-memory 10g --total-executor-cores 8 ..." oder "spark-submit --master gam --deploy-mode cluster --num-executors 3. .. "und ein paar andere Änderungen. –

+0

Ist es beim Lesen der Daten langsam (läuft nur auf einem Executor)? Beachten Sie, dass Spark anscheinend nur parallel liest, wenn direkt von HDFS gelesen wird. Wenn es von einer anderen Datenquelle liest, wird nur ein einziger Executor verwendet (stolperte über [dies] (https://www.dataiku.com/learn/guide/spark/tips-and-troubleshooting.html), als ich es bemerkte das gleiche Problem beim Lesen von S3). –

Antwort

1

Für Ihre erste Frage:

Ich bin nicht sicher, ob dies der Fall ist, aber etwas Ähnliches passierte uns und vielleicht kann es helfen.

Wenn Sie von der JDBC-Quelle mit sqlContext.read.format("jdbc").load() (oder ähnlich) lesen, wird der resultierende Datenrahmen standardmäßig nicht partitioniert. Wenn es für Sie der Fall ist, würde die Anwendung von Transformationen in dem resultierenden Datenframe ohne vorherige Partitionierung dazu führen, dass nur ein Executor in der Lage wäre, diesen zu verarbeiten. Wenn es nicht Ihr Fall ist, wird die folgende Lösung wahrscheinlich Ihr Problem nicht lösen.

Unsere Lösung bestand also darin, eine numerische Spalte mit Wertewerten von 1 bis 32 (unsere gewünschte Anzahl an Partitionen) in den Daten zu erstellen und sie als Partitionsspalte zu verwenden, indem Sie die Partitionsoptionen des jdbc-Lesers einstellen (siehe this link)):

val connectionOptions = Map[String, String] (... <connection options> ...) 
val options = connectionOptions ++ Map[String, String] (
    "partitionColumn" -> "column name", 
    "lowerBound" -> "1", 
    "upperBound" -> "32", 
    "numPartitions" -> "32" 
) 

val df = sqlContext.read.format("jdbc").options(options).load() 

Also, mit diesem Ansatz war nicht nur die Leseaufgabe der Lage, parallel verarbeitet werden (wirklich die Leistung zu verbessern und OOM-Fehler) zu vermeiden, aber der resultierende Datenrahmen wurde aufgeteilt und für all parallel verarbeitet nachfolgende Transformationen.

Ich hoffe, dass hilft.