2016-08-03 41 views
3

Ich bin nicht in der Lage, herauszufinden, wie akka stream Runnable Graph sofort zu stoppen? Wie benutzt man Killswitch um das zu erreichen? Ich habe erst ein paar Tage mit Akka Streams angefangen. In meinem Fall lese ich Zeilen aus einer Datei und mache einige Operationen im Fluss und schreibe in die Senke. Was ich tun möchte, ist, zu stoppen Datei sofort zu lesen, wann immer ich will, und ich hoffe, dass dies möglicherweise den gesamten laufenden Graph stoppen sollte. Irgendwelche Ideen dazu würden sehr geschätzt werden.Wie man einen akka-Strom abrupt stoppt Runable Graph?

Vielen Dank im Voraus.

Antwort

2

Da Akka Streams 2.4.3, da sein ist ein elegantes Möglichkeit, den Strom von außen über KillSwitch zu stoppen.

Betrachten Sie das folgende Beispiel, das den Stream nach 10 Sekunden stoppt.

object ExampleStopStream extends App { 

    implicit val system = ActorSystem("streams") 
    implicit val materializer = ActorMaterializer() 

    import system.dispatcher 

    val source = Source. 
    fromIterator(() => Iterator.continually(Random.nextInt(100))). 
    delay(500.millis, DelayOverflowStrategy.dropHead) 
    val square = Flow[Int].map(x => x * x) 
    val sink = Sink.foreach(println) 

    val (killSwitch, done) = 
    source.via(square). 
    viaMat(KillSwitches.single)(Keep.right). 
    toMat(sink)(Keep.both).run() 

    system.scheduler.scheduleOnce(10.seconds) { 
    println("Shutting down...") 
    killSwitch.shutdown() 
    } 

    done.foreach { _ => 
    println("I'm done") 
    Await.result(system.terminate(), 1.seconds) 
    } 

} 
+0

Siehe Beispiel-Code unten: Wie das Diagramm zu stoppen, und brechen Sie die große Datei zu lesen? RunnableGraph.fromGraph (GraphDSL.create() { implizite Builder => Import GraphDSL.Implicits._ // Quelle aus der Datei val A: Outlet [String] = builder.add (readFileLineByLine) .out // jeweils Zeilen schreiben val E in Datei:. Inlet [Zeichenfolge] = builder.add (writeLinesToFile) .in A ~> E ClosedShape }) run() – PainPoints

+0

Sie bieten beispielsweise so, dass Sie wissen, wann um das System zu stoppen, bevor Sie beginnen, aber in meinem Fall muss ich das laufende System stoppen, wann immer ich will. – PainPoints

+0

ich sehe, aus dem akka Stream Test gehe ich tthis Arbeits: val switch1 = KillSwitches.shared ("switch") val = Downstream RunnableGraph.fromGraph (GraphDSL.create() { implizite Builder => Import GraphDSL. Implicits._ // Quelle aus der Datei val A: Outlet [String] = builder.add (readFileLineByLine) .out // jeweils Zeilen schreiben val E in Datei: Inlet [Zeichenfolge] = builder.add (writeLinesToFile) .in A.via (switch1.flow) ~> E Geschlossene Form }). run() switch1.shutdown() – PainPoints

1

Der eine Weg, um einen Dienst oder shutdownhookup haben, die

Graph cancellable nennen
val graph= 
    Source.tick(FiniteDuration(0,TimeUnit.SECONDS), FiniteDuration(1,TimeUnit.SECONDS), Random.nextInt).to(Sink.foreach(println)) 
    val cancellable=graph.run() 

    cancellable.cancel 

Die cancellable.cancel kann Teil ActorSystem.registerOnTermination