2016-07-12 14 views
10

Ich habe erfolgreich FileIO verwendet, um den Inhalt einer Datei zu streamen, einige Transformationen für jede Zeile zu berechnen und die Ergebnisse zu aggregieren/zu reduzieren.Ordnungsgemäße Art, Akka-Streams zu stoppen

Jetzt habe ich einen ziemlich spezifischen Anwendungsfall, wo ich den Strom stoppen möchte, wenn eine Bedingung erreicht ist, so dass es nicht notwendig ist, die ganze Datei zu lesen, aber der Prozess so schnell wie möglich beendet. Was ist der empfohlene Weg, dies zu erreichen?

+2

Wenn die Bedingung auf dem Stream-Inhalt basiert, "Source.takewhile" (http://doc.akka.io/api/akka/2.4.8/index.html#[email protected] (p: Out => Boolean): FlowOps.this.Repr [Out]) sollte funktionieren. – devkat

Antwort

16

Wenn die Stopp-Bedingung „auf der Außenseite des Baches“

Es gibt einen erweiterte Gebäude-Block genannt KillSwitch, dass Sie diese nutzen könnten tun: http://doc.akka.io/japi/akka/2.4.7/akka/stream/KillSwitches.html Der Strom abgeschaltet bekommen würde, sobald der Kill-Schalter ist benachrichtigt.

Es verfügt über Methoden wie abort(reason)/shutdown usw., siehe hier für sie API ist: http://doc.akka.io/japi/akka/2.4.7/akka/stream/SharedKillSwitch.html

Referenzdokumentation ist hier: http://doc.akka.io/docs/akka/2.4.8/scala/stream/stream-dynamic.html#kill-switch-scala

Beispiel der Verwendung wäre:

val countingSrc = Source(Stream.from(1)).delay(1.second, 
    DelayOverflowStrategy.backpressure) 
val lastSnk = Sink.last[Int] 

val (killSwitch, last) = countingSrc 
    .viaMat(KillSwitches.single)(Keep.right) 
    .toMat(lastSnk)(Keep.both) 
    .run() 

doSomethingElse() 

killSwitch.shutdown() 

Await.result(last, 1.second) shouldBe 2 

Wenn die Stoppbedingung ist innerhalb des Stroms

Sie können takeWhile verwenden, um wirklich jede Bedingung auszudrücken, obwohl manchmal auch take oder limit genug sein kann "10 lnes".

Wenn Ihre Logik sehr fortgeschritten ist, könnten Sie eine spezielle Stufe bauen, die diese spezielle Logik unter Verwendung von statefulMapConcat verarbeitet, die buchstäblich alles ausdrücken kann - so können Sie den Stream abschließen, wann immer Sie wollen "von innen".

+0

Was ist, wenn ich für den kontinuierlichen Strom ein- und ausschalten muss, damit mein reduzierter Funktionsaufruf die aggregierten (gruppierten) Daten an Sink sendet? Kann ich takeWhile (p) und swithc p true und false verwenden? – zt1983811