2016-07-20 10 views
0

Ich habe einen Akka Actor, an den ich "control" Nachrichten senden möchte. Die Hauptaufgabe dieses Actors besteht darin, eine Kafka-Warteschlange anzuhören, bei der es sich um einen Abrufprozess in einer Schleife handelt.Kann ich sicher einen Thread in einem Akka Actor erstellen?

Ich habe festgestellt, dass die folgenden einfach den Schauspieler sperrt und es wird nicht erhalten die „Stop“ (oder irgendeine andere) Nachricht: Gestartet von

class Worker() extends Actor { 
    private var done = false 

    def receive = { 
    case "stop" => 
     done = true 
     kafkaConsumer.close() 
    // other messages here 
    } 

    // Start digesting messages! 
    while (!done) { 
    kafkaConsumer.poll(100).iterator.map { cr: ConsumerRecord[Array[Byte], String] => 
     // process the record 
    ), null) 
    } 
    } 
} 

ich die Schleife in einem Thread wickeln könnte der Actor, aber ist es ok/sicher, einen Thread aus einem Actor heraus zu starten? Gibt es einen besseren Weg?

+0

Absolut nicht! Sie möchten wahrscheinlich etwas wie "Verbraucherschauspieler" erstellen. Werfen Sie einen Blick auf [Reactive Kafka] (https://github.com/akka/reactive-kafka) –

Antwort

2

Grundsätzlich kann man aber bedenken, dass dieser Schauspieler blockiert und ein Daumen der Regel ist, niemals innerhalb von Schauspielern zu blockieren. Wenn Sie dies dennoch tun möchten, stellen Sie sicher, dass dieser Akteur in einem separaten Threadpool als dem nativen ausgeführt wird, sodass Sie die Leistung des Actor-Systems nicht beeinflussen. Eine andere Möglichkeit wäre, Nachrichten an sich selbst zu senden, um neue Nachrichten abzufragen.

1) erhalten, um eine Nachricht von kafka abzufragen

2) Hand über die Nachricht an den relevanten Akteur

3) An sich eine Nachricht bestellen eine neue Nachricht ziehen

4) Hand es über ...

-Code weise:

case object PollMessage 

class Worker() extends Actor { 
    private var done = false 

    def receive = { 
    case PollMessage ⇒ { 
     poll() 
     self ! PollMessage 
    } 
    case "stop" => 
     done = true 
     kafkaConsumer.close() 
    // other messages here 
    } 

    // Start digesting messages! 

    def poll() = { 
    kafkaConsumer.poll(100).iterator.map { cr: ConsumerRecord[Array[Byte], String] => 
     // process the record 
    ), null) 
    } 
    } 

} 

Ich bin mir jedoch nicht sicher, dass Sie jemals die Stop-Nachricht erhalten werden, wenn Sie den Schauspieler kontinuierlich blockieren.

0

Hinzufügen @Louis F. Antwort; Abhängig von der Konfiguration Ihrer Akteure werden sie entweder alle Nachrichten, die sie erhalten, löschen, wenn sie gerade beschäftigt sind, oder sie in eine Mailbox aka Warteschlange legen, und die Nachrichten werden später verarbeitet (normalerweise auf FIFO-Art). In diesem Fall überschwemmen Sie den Actor jedoch mit PollMessage und Sie können nicht garantieren, dass Ihre Nachricht nicht gelöscht wird - was in Ihrem Fall zu geschehen scheint.