2012-08-30 10 views
5

Ich muss Nachrichten verschiedener Typen zu Ereignisstrom veröffentlichen, und diese Nachrichten sollten verschiedene Prioritäten haben, zum Beispiel wenn 10 Nachrichten vom Typ A gebucht wurden, und eine Nachricht vom Typ B wird schließlich gepostet, und Priorität von B ist höher als die Priorität von A - Nachricht B sollte vom nächsten Akteur abgeholt werden, selbst wenn 10 Nachrichten vom Typ A in der Warteschlange sind.Akka :: Verwenden von Nachrichten mit unterschiedlichen Prioritäten über Ereignisstrom in ActorSystem

Ich habe über priorisierte Nachrichten lesen here und erstellt meine einfache Implementierung dieser Mailbox:

class PrioritizedMailbox(settings: Settings, cfg: Config) extends UnboundedPriorityMailbox(

    PriorityGenerator { 
     case ServerPermanentlyDead => println("Priority:0"); 0 
     case ServerDead => println("Priority:1"); 1 
     case _ => println("Default priority"); 10 
    } 

) 

dann konfiguriert ich es in application.conf

akka { 

    actor { 

     prio-dispatcher { 
      type = "Dispatcher" 
      mailbox-type = "mailbox.PrioritizedMailbox" 
     } 

    } 

} 

und in meine Schauspieler verdrahtet:

private val myActor = actors.actorOf(
    Props[MyEventHandler[T]]. 
    withRouter(RoundRobinRouter(HIVE)). 
    withDispatcher("akka.actor.prio-dispatcher"). 
    withCreator(
    new Creator[Actor] { 
     def create() = new MyEventHandler(storage) 
    }), name = "eventHandler") 

Ich benutze ActorSystem.eventStream.publish in um Nachrichten zu senden, und mein Akteur ist es abonniert (Ich kann in Protokollen sehen, dass Nachrichten verarbeitet werden, aber in FIFO-Reihenfolge).

Allerdings sieht es so aus, als wäre es nicht genug, denn in Protokollen/Konsole habe ich nie die Nachrichten wie "Default priority" gesehen. Fehle ich hier etwas? Funktioniert der beschriebene Ansatz mit Ereignisströmen oder nur mit direkten Aufrufen von Senden einer Nachricht an den Akteur? Und wie bekomme ich priorisierte Nachrichten mit eventStream?

Antwort

10

Ihr Problem besteht darin, dass Ihre Darsteller wahnsinnig schnell sind, sodass Nachrichten verarbeitet werden, bevor sie Zeit haben, sich anzustellen. Daher kann das Postfach nicht priorisiert werden. Das folgende Beispiel zeigt den Punkt:

trait Foo 
    case object X extends Foo 
    case object Y extends Foo 
    case object Z extends Foo 

    class PrioritizedMailbox(settings: ActorSystem.Settings, cfg: Config) 
extends UnboundedPriorityMailbox( 
    PriorityGenerator { 
     case X ⇒ 0 
     case Y ⇒ 1 
     case Z ⇒ 2 
     case _ ⇒ 10 
    }) 

val s = ActorSystem("prio", com.typesafe.config.ConfigFactory.parseString( 
     """ prio-dispatcher { 
     type = "Dispatcher" 
      mailbox-type = "%s" 
     }""".format(classOf[PrioritizedMailbox].getName))) 
     val latch = new java.util.concurrent.CountDownLatch(1) 
     val a = s.actorOf(Props(new akka.actor.Actor { 
     latch.await // Just wait here so that the messages are queued up 
inside the mailbox 
     def receive = { 
      case any ⇒ /*println("Processing: " + any);*/ sender ! any 
     } 
     }).withDispatcher("prio-dispatcher")) 
     implicit val sender = testActor 
     a ! "pig" 
     a ! Y 
     a ! Z 
     a ! Y 
     a ! X 
     a ! Z 
     a ! X 
     a ! "dog" 

     latch.countDown() 

     Seq(X, X, Y, Y, Z, Z, "pig", "dog") foreach { x => expectMsg(x) } 
     s.shutdown() 

Dieser Test geht mit Bravour