Wenn Sie wirklich warten müssen für die Zukunft vor der Verarbeitung die nächste Nachricht zu vervollständigen, können Sie so etwas wie dies versuchen:
object SimpleMessageHandler{
case class SimpleMessage()
case class FinishSimpleMessage(i:Int)
}
class SimpleMessageHandler extends Actor with Stash{
import SimpleMessageHandler._
import context._
import akka.pattern.pipe
def receive = waitingForMessage
def waitingForMessage: Receive = {
case SimpleMessage() =>
val futData:Future[Int] = ...
futData.map(FinishSimpleMessage(_)) pipeTo self
context.become(waitingToFinish(sender))
}
def waitingToFinish(originalSender:ActorRef):Receive = {
case SimpleMessage() => stash()
case FinishSimpleMessage(i) =>
//Do whatever you need to do to finish here
...
unstashAll()
context.become(waitingForMessage)
case Status.Failure(ex) =>
//log error here
unstashAll()
context.become(waitingForMessage)
}
}
Bei diesem Ansatz verarbeiten wir eine SimpleMessage
und dann Logik wechseln Handhabung zu verstauen alle nachfolgenden SimpleMessage
s erhalten, bis wir ein Ergebnis aus der Zukunft bekommen. Wenn wir ein Ergebnis bekommen, Versagen oder nicht, stürzen wir alle anderen SimpleMessage
s ab, die wir erhalten haben, während wir auf die Zukunft warten, und gehen auf unseren fröhlichen Weg.
Dieser Aktor schaltet einfach zwischen zwei Zuständen hin- und her und ermöglicht es Ihnen, nur eine SimpleMessage
gleichzeitig zu verarbeiten, ohne die Zukunft zu blockieren.
Was genau suchen Sie mit den Daten, die aus der Zukunft stammen? – cmbaxter
Dies sind Daten aus meiner Datenbank (Mongo), und ich möchte sie filtern und nur einen Teil davon in einer anderen Sammlung speichern. Aber im Grunde sind das db-Daten, ich kann das nicht im Hintergrund ausführen, und ich muss warten, um diese Aktion vor dem nächsten * SimpleMessage * zu beenden. –
Sendet es eine Nachricht zurück an den Absender oder nein? –