2016-07-17 12 views
2

Ich verstehe nicht, warum mein Akku wurde nicht ordnungsgemäß von Spark aktualisiert.Spark: Akkumulatoren funktioniert nicht richtig, wenn ich es im Bereich

object AccumulatorsExample extends App { 
    val acc = sc.accumulator(0L, "acc") 
    sc range(0, 20000, step = 25) map { _ => acc += 1 } count() 
    assert(acc.value == 800) // not equals 
} 

My Spark-config:

setMaster("local[*]") // should use 8 cpu cores 

Ich bin mir nicht sicher, ob Funken Berechnungen des Akkumulators verteilen auf jedem Kern und vielleicht das ist das Problem.

Meine Frage ist, wie kann ich alle acc Werte in einer einzigen Summe aggregieren und den richtigen Akku-Wert (800) bekommen?

PS

Wenn ich Kern-Nummer setMaster("local[1]") beschränken als alle gut funktioniert.

+0

Was ist 'acc0'? Du arbeitest 'acc', ist das ein Tippfehler? –

+0

Entschuldigung, ich meine 'acc'. – Aaron

Antwort

2

Es gibt zwei verschiedene Themen hier:

  • Du App statt Implementierung main Verfahren erstrecken. Es gibt einige bekannte Probleme, die mit diesem Ansatz zusammenhängen, einschließlich eines fehlerhaften Akkumulatorverhaltens und deshalb it shouldn't be used in Spark applications. Dies ist wahrscheinlich die Ursache des Problems.

    Siehe zum Beispiel SPARK-4170 für andere mögliche Probleme im Zusammenhang mit der Erweiterung App.

  • Sie verwenden Akkumulatoren in Transformationen. Dies bedeutet, dass der Akkumulator beliebig oft erhöht werden kann (mindestens einmal, wenn ein Job erfolgreich ist).

    In der Regel benötigen Sie genaue Ergebnisse, die Sie Akku nur innerhalb Aktionen wie foreach und foreachPartition obwohl es Sie werden es eher unwahrscheinlich, verwenden sollten, wie dies alle Probleme in einer Spielzeug-Anwendung erleben.