Ich versuche, Akka HTTP für das POST auf einem Webserver zu verwenden. Wenn ein POST fehlschlägt, möchte ich, dass es aufhört und nicht mehr POSTs sendet, da sie nicht idempotent sind.Akka HTTP-Fluss wird nicht gestoppt, wenn Ausnahme ausgelöst wird
Der folgende Code erstellt POSTs und sendet sie an einen Test-Webserver. Es löst eine Ausnahme bei der ersten Antwort aus. Der Code sollte runnable sein, in dem Fall, dass Sie werden sehen, es druckt:
i = 0
got response
i = 1
stopping
Exception in thread "main" java.lang.Exception
i = 2
i = 3
i = 4
i = 5
So ist die ‚Anhalten‘ passiert, nachdem der nächste Anfrage wurde zusammengestellt worden (i = 1
), dann wird der Code einfach weiter.
Kann jemand den Fluss stoppen, sobald ein Fehler auftritt, und keine weiteren POSTs senden?
(Scala 2.11.8, Akka 2.4.4)
object FlowTest {
def main(args: Array[String]) {
val stop: Supervision.Decider = {
case _ =>
println("stopping")
Supervision.Stop
}
implicit val system = ActorSystem()
import system.dispatcher
implicit val mat = ActorMaterializer()
val connectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] =
Http().outgoingConnection(host = "posttestserver.com", port = 80)
val future: Future[Done] = Source(0 to 10).map {
i =>
val uri = s"/post.php?dir=so_akka&i=$i"
println(s"i = $i")
HttpRequest(method = HttpMethods.POST, uri = uri, entity = s"data $i")
}.via(connectionFlow).mapAsync(1) {
resp =>
Unmarshal(resp.entity).to[String]
.map { str =>
println(str)
throw new Exception("") // Always fail
str
}
}.withAttributes(ActorAttributes.supervisionStrategy(stop)).runForeach(println)
Await.result(future, Duration.Inf)
}
}
Da der Stream asynchron verarbeitet wird, glaube ich nicht, dass es möglich ist, den Stream basierend auf einer Bedingung (z. B. eine Ausnahme) abzubrechen. Es könnte sein, dass die Ergebnisse der fertigen Futures der nachfolgenden Elemente bereits stromabwärts emittiert wurden. Wenn Sie wirklich abbrechen müssen, nachdem eine Ausnahme ausgelöst wurde, müssen Sie sicherstellen, dass die Elemente sequenziell verarbeitet werden, wahrscheinlich indem Sie die Futures blockieren. – devkat