Ich versuche, mit Supervision-Status zu erholen, es funktioniert, wenn ich Flow-Phasen mit Karte schreiben, aber wenn ich Grafik-Phase verwenden, wird nie gefangen, und die gesamte Pipeline schlägtDie Verwendung der Überwachungsstrategie mit GraphStage funktioniert nicht
object test extends App{
val stageSupervisionDecider: Supervision.Decider = {
case cEx: IllegalArgumentException =>
println("Supervision Catch")
Supervision.Resume
case _ => Supervision.Stop
}
implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer(
ActorMaterializerSettings(system)
.withSupervisionStrategy(stageSupervisionDecider)
)
Source(Vector(1,2,3,4,5,6,7))
.via(new FailFlow)
.runWith(Sink.foreach(println))
}
class FailFlow extends GraphStage[FlowShape[Int, Int]] {
val in = Inlet[Int]("FailFlow.In")
val out = Outlet[Int]("FailFlow.Out")
override def shape: FlowShape[Int, Int] = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush(): Unit = {
val m = grab(in)
if(m % 2 == 0)
throw new IllegalArgumentException("illegal value")
else
push(out,m)
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
pull(in)
}
})
}
}
}
Irgendwelche Ideen, was hier falsch passiert?
jede Arbeit um? – Rabzu