2015-10-20 15 views
8

Ich habe den folgenden Code:Funken zu verlieren println() auf stdout

val blueCount = sc.accumulator[Long](0) 
val output = input.map { data => 
    for (value <- data.getValues()) { 
    if (record.getEnum() == DataEnum.BLUE) { 
     blueCount += 1 
     println("Enum = BLUE : " + value.toString() 
    } 
    } 
    data 
}.persist(StorageLevel.MEMORY_ONLY_SER) 

output.saveAsTextFile("myOutput") 

Dann wird der blueCount nicht Null ist, aber ich habe keine println() -Ausgabe! Fehle ich hier etwas? Vielen Dank!

Antwort

3

Ich war in der Lage, es zu umgehen, indem ein UtilityFunction machen:

object PrintUtiltity { 
    def print(data:String) = { 
     println(data) 
    } 
} 
+5

Warum funktioniert es? – angelcervera

+0

Da Spark denkt, dass es eine Dienstprogrammfunktion aufruft, anstatt die Druckfunktion aufzurufen. Spark hat anscheinend (und konnte praktisch nicht) jede Zeile in ihrer Dienstprogrammfunktion überprüft. – Edamame

+1

Was Sie tun, ist ein Objekt in Ihrem Treiberprogramm instanziieren. Ich würde nicht auf dieses Verhalten zählen ohne ein klares Modell von genau, was vor sich geht. Erwarten Sie, dass sich das Verhalten bei jeder Änderung an Ihrem Programm oder beim Aufrufen des PrintUtility-Objekts unvorhersehbar ändert. Wenn du Logs sammeln willst, benutze Standardmethoden, erfinde keine zufälligen Mechanismen, die du nicht verstehst. Deine Erklärung dafür, warum es funktioniert, ist gefährlich falsch - es gibt kein Verbot, das zu tun, was du getan hast; Es gibt keinen Code-Checker, um sicherzustellen, dass Sie nicht betrügen: Alle Verhaltensweisen folgen dem Systemdesign – David

13

Dies ist eine konzeptionelle Frage ...

Stellen Sie einen großen Cluster haben, der sich aus vielen Arbeitern sagen lassen n Arbeiter und jene Arbeiter speichern eine Partition eines RDD oder DataFrame, stellen Sie starten eine map Aufgabe über dass Daten, und innerhalb dieser map haben Sie eine print Aussage, vor allem:

  • Wo werden die Daten ausgedruckt werden?
  • Welcher Knoten hat Priorität und welche Partition?
  • Wenn alle Knoten parallel laufen, wer wird zuerst gedruckt?
  • Wie wird diese Druckwarteschlange erstellt?

Das sind zu viele Fragen, so die Designer/Maintainer von apache-spark logisch jede Unterstützung print Anweisungen in jedem map-reduce Betrieb zu fallen entschieden (diese accumulators und sogar broadcast Variablen enthalten).

Dies macht auch Sinn, weil Spark eine Sprache ist entworfen für sehr große Datensätze. Während das Drucken zum Testen und Debuggen nützlich sein kann, möchten Sie nicht jede Zeile eines DataFrames oder RDDs drucken, da sie so konstruiert sind, dass sie Millionen oder Milliarden von Zeilen haben! Warum also mit diesen komplizierten Fragen umgehen, wenn Sie gar nicht erst drucken wollen?

Um dies zu beweisen, Sie können diesen scala Code zum Beispiel laufen:

// Let's create a simple RDD 
val rdd = sc.parallelize(1 to 10000) 

def printStuff(x:Int):Int = { 
    println(x) 
    x + 1 
} 

// It doesn't print anything! because of a logic design limitation! 
rdd.map(printStuff) 

// But you can print the RDD by doing the following: 
rdd.take(10).foreach(println) 
+6

I println funktioniert gut glauben: Es geht einfach zu dem stdout/stderr auf dem Computer, auf dem ein Spark Executor ausgeführt wird. Wenn Sie also nicht wissen, was in diesen Protokollen enthalten ist, werden Sie es nie sehen. Wenn Sie Garn verwenden, gibt es einen Befehl, um es für Sie auszudrucken. – David

+0

Während die Argumentation gültig ist, führt Spark keine statische Analyse durch, um Code zu löschen. Die Ausgabe geht einfach nicht zum Treiber 'STDOUT', wie von @David erklärt –