2016-04-07 3 views
1

Neu bei Akka and Actors - Ich muss ein paar Schauspieler starten, die ihr Leben damit verbringen werden, von Kafka-Themen zu lesen und in einen Ignite-Cache zu schreiben. Ich baute mein Dispatcher wie folgt aus:Akka Darsteller läuft Endlosschleife nach Initialisierungsmeldung

kafka-dispatcher { 
    executor = "thread-pool-executor" 
    type  = PinnedDispatcher 
} 

Meine Schauspieler mit .withDispatcher("kafka-dispatcher") geschaffen, und meine Vermutung ist, dass jeder Akteur ein einzelner Thread zugeordnet wird.

Diese Akteure verbringen im Grunde ihr Leben wie folgt aus:

override def receive: Receive = LoggingReceive { 
    case InitWorker => { 
    initialize() 
    pollTopic() // This never returns 
    } 
} 

Mit anderen Worten, sie erhalten eine Nachricht Initialisierung und rufen dann die pollTopic() Methode, die nie zurückgibt - eine Schleife läuft das Lesen (die bis blockieren es gibt Daten) und dann schreiben Sie die Daten.

Meine Fragen:

  1. Ist das koscher?
  2. Gibt es einen besseren, d. H. Mehr idiomatischen Weg, dies zu tun? Beachten Sie, dass der Leseaufruf innerhalb von pollTopic() blockiert.

Antwort

3

zu Ihrem Punkt Answering 2 und aus der Beschreibung von dem, was Sie versuchen, vielleicht zu tun, mögen Sie mit Akka streams mit der reactive-kafka Bibliothek betrachten. Akka-Streams verwenden Akteure unter der Haube, die jedoch all dies für Sie verwalten, sodass Sie sich nur auf die Implementierung wiederverwendbarer kleiner Komponenten konzentrieren können, die genau eine Sache erledigen.

Sie können dann Datenverarbeitungspipelines schreiben, indem Sie Kafka als Source für Ihren Datenfluss verwenden. Ich weiß nicht viel über Ignite-Cache, aber die Chancen stehen gut, dass Sie entweder eine Sink dafür schreiben oder - wenn Sie über eine blockierende API sprechen, wird mapAsync Ihr Freund sein.