Ich muss Nachrichten verschiedener Typen zu Ereignisstrom veröffentlichen, und diese Nachrichten sollten verschiedene Prioritäten haben, zum Beispiel wenn 10 Nachrichten vom Typ A gebucht wurden, und eine Nachricht vom Typ B wird schließlich gepostet, und Priorität von B ist höher als die Priorität von A - Nachricht B sollte vom nächsten Akteur abgeholt werden, selbst wenn 10 Nachrichten vom Typ A in der Warteschlange sind.Akka :: Verwenden von Nachrichten mit unterschiedlichen Prioritäten über Ereignisstrom in ActorSystem
Ich habe über priorisierte Nachrichten lesen here und erstellt meine einfache Implementierung dieser Mailbox:
class PrioritizedMailbox(settings: Settings, cfg: Config) extends UnboundedPriorityMailbox(
PriorityGenerator {
case ServerPermanentlyDead => println("Priority:0"); 0
case ServerDead => println("Priority:1"); 1
case _ => println("Default priority"); 10
}
)
dann konfiguriert ich es in application.conf
akka {
actor {
prio-dispatcher {
type = "Dispatcher"
mailbox-type = "mailbox.PrioritizedMailbox"
}
}
}
und in meine Schauspieler verdrahtet:
private val myActor = actors.actorOf(
Props[MyEventHandler[T]].
withRouter(RoundRobinRouter(HIVE)).
withDispatcher("akka.actor.prio-dispatcher").
withCreator(
new Creator[Actor] {
def create() = new MyEventHandler(storage)
}), name = "eventHandler")
Ich benutze ActorSystem.eventStream.publish in um Nachrichten zu senden, und mein Akteur ist es abonniert (Ich kann in Protokollen sehen, dass Nachrichten verarbeitet werden, aber in FIFO-Reihenfolge).
Allerdings sieht es so aus, als wäre es nicht genug, denn in Protokollen/Konsole habe ich nie die Nachrichten wie "Default priority" gesehen. Fehle ich hier etwas? Funktioniert der beschriebene Ansatz mit Ereignisströmen oder nur mit direkten Aufrufen von Senden einer Nachricht an den Akteur? Und wie bekomme ich priorisierte Nachrichten mit eventStream?