2016-03-28 15 views
1

Ich versuche, mit Supervision-Status zu erholen, es funktioniert, wenn ich Flow-Phasen mit Karte schreiben, aber wenn ich Grafik-Phase verwenden, wird nie gefangen, und die gesamte Pipeline schlägtDie Verwendung der Überwachungsstrategie mit GraphStage funktioniert nicht

object test extends App{ 

     val stageSupervisionDecider: Supervision.Decider = { 
     case cEx: IllegalArgumentException => 
      println("Supervision Catch") 
      Supervision.Resume 
     case _ => Supervision.Stop 
     } 

     implicit val system = ActorSystem("system") 

     implicit val materializer = ActorMaterializer(
     ActorMaterializerSettings(system) 
      .withSupervisionStrategy(stageSupervisionDecider) 
    ) 

     Source(Vector(1,2,3,4,5,6,7)) 
     .via(new FailFlow) 
     .runWith(Sink.foreach(println)) 
    } 


    class FailFlow extends GraphStage[FlowShape[Int, Int]] { 

     val in = Inlet[Int]("FailFlow.In") 
     val out = Outlet[Int]("FailFlow.Out") 

     override def shape: FlowShape[Int, Int] = FlowShape.of(in, out) 

     override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = { 
     new GraphStageLogic(shape) { 
      setHandler(in, new InHandler { 
      override def onPush(): Unit = { 
       val m = grab(in) 
       if(m % 2 == 0) 
       throw new IllegalArgumentException("illegal value") 
       else 
       push(out,m) 
      } 
      }) 

      setHandler(out, new OutHandler { 
      override def onPull(): Unit = { 
       pull(in) 
      } 
      }) 
     } 
     } 
    } 

Irgendwelche Ideen, was hier falsch passiert?

Antwort

1

Nach dem documentation (big red box):

ZipWith, GraphStag Kreuzung, ActorPublisher Quelle und ActorSubscriber sinken Komponenten nicht die Aufsicht Strategie ehrt Attribut noch

+0

jede Arbeit um? – Rabzu