2016-05-09 10 views
0

Ich versuche, Akka HTTP für das POST auf einem Webserver zu verwenden. Wenn ein POST fehlschlägt, möchte ich, dass es aufhört und nicht mehr POSTs sendet, da sie nicht idempotent sind.Akka HTTP-Fluss wird nicht gestoppt, wenn Ausnahme ausgelöst wird

Der folgende Code erstellt POSTs und sendet sie an einen Test-Webserver. Es löst eine Ausnahme bei der ersten Antwort aus. Der Code sollte runnable sein, in dem Fall, dass Sie werden sehen, es druckt:

i = 0 
got response 
i = 1 
stopping 
Exception in thread "main" java.lang.Exception 
i = 2 
i = 3 
i = 4 
i = 5 

So ist die ‚Anhalten‘ passiert, nachdem der nächste Anfrage wurde zusammengestellt worden (i = 1), dann wird der Code einfach weiter.

Kann jemand den Fluss stoppen, sobald ein Fehler auftritt, und keine weiteren POSTs senden?

(Scala 2.11.8, Akka 2.4.4)

object FlowTest { 
    def main(args: Array[String]) { 
    val stop: Supervision.Decider = { 
     case _ => 
     println("stopping") 
     Supervision.Stop 
    } 

    implicit val system = ActorSystem() 
    import system.dispatcher 
    implicit val mat = ActorMaterializer() 
    val connectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] = 
     Http().outgoingConnection(host = "posttestserver.com", port = 80) 

    val future: Future[Done] = Source(0 to 10).map { 
     i => 
     val uri = s"/post.php?dir=so_akka&i=$i" 
     println(s"i = $i") 
     HttpRequest(method = HttpMethods.POST, uri = uri, entity = s"data $i") 
    }.via(connectionFlow).mapAsync(1) { 
     resp => 
     Unmarshal(resp.entity).to[String] 
      .map { str => 
      println(str) 
      throw new Exception("") // Always fail 
      str 
      } 
    }.withAttributes(ActorAttributes.supervisionStrategy(stop)).runForeach(println) 

    Await.result(future, Duration.Inf) 
    } 
} 
+0

Da der Stream asynchron verarbeitet wird, glaube ich nicht, dass es möglich ist, den Stream basierend auf einer Bedingung (z. B. eine Ausnahme) abzubrechen. Es könnte sein, dass die Ergebnisse der fertigen Futures der nachfolgenden Elemente bereits stromabwärts emittiert wurden. Wenn Sie wirklich abbrechen müssen, nachdem eine Ausnahme ausgelöst wurde, müssen Sie sicherstellen, dass die Elemente sequenziell verarbeitet werden, wahrscheinlich indem Sie die Futures blockieren. – devkat

Antwort

0

Also ich denke, es zwei Probleme gab ich mit dem obigen Code aufweist, wurde.

  1. HTTP-POSTs sollten nicht im Pipeline-Modus übertragen werden. Ich hatte gehofft, Akka HTTP würde warten, bis ein POST ohne Fehler verarbeitet wurde, bevor der nächste gesendet wurde. Das passiert nicht.

  2. Ausnahmen wurden nicht im Fluss verbreitet. Das Eingeben des Verarbeitungscodes hinderte die Quelle nicht daran, mehr POSTs zu erstellen und sie zu senden.

So gibt es zwei Fixes.

  1. Ich habe die withSyncProcessingLimit auf dem ActorMaterializer auf eins gesetzt. Dadurch wird verhindert, dass die Quelle neue Nachrichten sendet, bevor sie verarbeitet wurden. Ich musste auch den .mapAsync Teil ändern, also gibt es jetzt einen , der den Statuscode und die Fehler prüft, wenn nötig, und einen .mapAsync, der den Antwortkörper ansieht. Sie können den Antworttext im .map-Teil nicht anzeigen.

  2. Ich habe eine KillSwitch hinzugefügt, um den Fluss zu stoppen. Das Auslösen einer Ausnahme sollte den gleichen Effekt haben, tut dies aber nicht. Das ist also ein schrecklicher Hack, aber funktioniert.

Ich denke, es muss eine bessere Möglichkeit, dies zu tun. Die Verwendung eines Akka-HTTP-Flusses mit HTTP-POSTs sollte nicht so schmerzhaft sein.

Hier ist der neue Code.

object FlowTest { 
    def main(args: Array[String]) { 
    implicit val system = ActorSystem() 
    import system.dispatcher 
    implicit val mat = ActorMaterializer.create(
     ActorMaterializerSettings.create(system).withSyncProcessingLimit(1), system 
    ) 
    val connectionFlow = Http().outgoingConnection(host = "posttestserver.com", port = 80) 
    val source = Source(0 to 10) 
    val killSwitch = KillSwitches.shared("HttpPostKillSwitch") 

    try { 
     val future: Future[Done] = source.via(killSwitch.flow).map { 
     i => 
      val uri = s"/post.php?dir=test&i=$i" 
      println(s"i? = $i") 
      HttpRequest(method = HttpMethods.POST, uri = uri, entity = s"data $i") 
     } 
     .via(connectionFlow) 
     .map { 
      resp => 
      println("got response") 
//   if(resp.status != OK) { // always fail for testing 
       val e = new Exception("") 
       killSwitch.abort(e) 
       throw e 
//   } 
      resp 
     } 
     .mapAsync(1) { 
      resp => 
      Unmarshal(resp.entity).to[String] 
       .map { str => 
       println("got " + str) 
       str 
       } 
     } 
     .runForeach(println) 

     Await.result(future, Duration.Inf) 
    } catch { 
     case NonFatal(e) => 
     system.terminate() 
    } 
    } 
}