2016-05-14 3 views
2

Ich mache eine iterative Berechnung mit flink Dataset API.
Aber das Ergebnis jeder Iteration ist ein Teil meiner kompletten Lösung.
(Wenn mehr Details erforderlich sind: Ich berechne Gitterknoten Ebene von oben nach unten in jeder Iteration, siehe Formal Concept Analysis)
Wenn ich flink Dataset-API mit Bulk-Iteration ohne Speichern meines Ergebnisses, wird der Code aussehen wie unten:
Möglichkeit, Teilausgaben von Masseniteration in Flink Dataset zu speichern?

val start = env.fromElements((0, BitSet.empty)) 
val end = start.iterateWithTermination(size) { inp => 
    val result = ObjData.mapPartition(new MyMapPartition).withBroadcastSet(inp, "concepts").groupBy(0).reduceGroup(new MyReduceGroup) 
    (result,result) 
} 
end.count() 

Aber, wenn ich versuche, Teilergebnisse innerhalb Iteration (_.writeAsText()) oder eine Aktion zu schreiben, werde ich bekommen Fehler:

org.apache.flink.api.common.InvalidProgramException: A data set that is part of an iteration was used as a sink or action. Did you forget to close the iteration? 

Die Alternative, ohne Masse Iteration scheint unten zu sein:

var start = env.fromElements((0, BitSet.empty)) 
var count = 1L 
var all = count 
while (count > 0){ 
    start = ObjData.mapPartition(new MyMapPartition).withBroadcastSet(start, "concepts").groupBy(0).reduceGroup(new MyReduceGroup) 
    count = start.count() 
    all = all + count 
} 
println("total nodes: " + all) 

Aber dieser Ansatz ist außerordentlich langsam auf kleinste Eingangsdaten, Iteration Version nimmt < 30 Sekunden und Schleife Version dauert> 3 Minuten.
Ich denke, flink ist nicht in der Lage, einen optimalen Plan zum Ausführen der Schleife zu erstellen.

Jede Problemumgehung, die ich versuchen sollte? Ist eine Änderung an flink möglich, um Teilergebnisse auf Hadoop etc. zu speichern?

Antwort

3

Leider ist es derzeit nicht möglich, Zwischenergebnisse aus einer Bulk-Iteration auszugeben. Sie können das Endergebnis nur am Ende der Iteration ausgeben.

Wie Sie richtig bemerkt haben, kann Flink eine While-Schleife oder For-Schleife nicht effizient ausrollen, so dass das auch nicht funktioniert.

Wenn Ihre Zwischenergebnisse nicht so groß sind, können Sie Ihre Zwischenergebnisse in die Teillösung einfügen und dann am Ende der Iteration alles ausgeben. Ein ähnlicher Ansatz ist in der TransitiveClosureNaive example implementiert, wo in einer Iteration entdeckte Pfade in der nächsten Teillösung akkumuliert werden.

+0

Vielen Dank für einen Vorschlag zur Implementierung. :) wird aktualisiert, wenn das funktioniert. Ich denke Datensatz api ist nicht genug ausgereift in flink, oft Executer Verluste. Aber Streaming funktioniert einwandfrei. –