2016-04-01 6 views
3

Ich versuche, einen Kafka-Consumer zu WebSet Flow mit reaktiven Kafka, Akka-http und Akka-Stream zu schreiben.Kafka Nachricht an WebSocket

val publisherActor = actorSystem.actorOf(CommandPublisher.props) 
    val publisher = ActorPublisher[String](publisherActor) 
    val commandSource = Source.fromPublisher(publisher) map toMessage 
    def toMessage(c: String): Message = TextMessage.Strict(c) 

    class CommandPublisher extends ActorPublisher[String] { 
    override def receive = { 
     case cmd: String => 
     if (isActive && totalDemand > 0) 
      onNext(cmd) 
    } 
    } 

    object CommandPublisher { 
    def props: Props = Props(new CommandPublisher()) 
    } 

    // This is the route 
    def mainFlow(): Route = { 
    path("ws"/"commands") { 
     handleWebSocketMessages(Flow.fromSinkAndSource(Sink.ignore, commandSource)) 
    } 
    } 

Vom kafka Verbraucher (hier weggelassen), mache ich eine publisherActor ! commandString Inhalte dynamisch an die websocket hinzuzufügen.

aber ich laufe in dieser Ausnahme im Backend, wenn ich mehrere Clients auf die websocket starten:

[ERROR] [03/31/2016 21:17:10.335] [KafkaWs-akka.actor.default-dispatcher-3][akka.actor.ActorSystemImpl(KafkaWs)] WebSocket handler failed with can not subscribe the same subscriber multiple times (see reactive-streams specification, rules 1.10 and 2.12) 
java.lang.IllegalStateException: can not subscribe the same subscriber multiple times (see reactive-streams specification, rules 1.10 and 2.12) 
    at akka.stream.impl.ReactiveStreamsCompliance$.canNotSubscribeTheSameSubscriberMultipleTimesException(ReactiveStreamsCompliance.scala:35) 
    at akka.stream.actor.ActorPublisher$class.aroundReceive(ActorPublisher.scala:295) 
    ... 

Kann nicht ein Fluss für alle websocket Clients verwendet werden? Oder sollte der Flow-/Publisher-Akteur pro Client erstellt werden?

Hier habe ich vor, "aktuelle"/"Live" -Benachrichtigungen an alle Websocket-Clients zu senden. Der Verlauf der Benachrichtigungen ist irrelevant und muss für neue Kunden ignoriert werden.

Antwort

1

Es tut mir leid, schlechte Nachrichten zu tragen, aber es sieht so aus, als ob dies das explizite Design von akka in Bezug auf ist. Sie können die Instanz des Datenflusses für alle Clients nicht wie gewünscht verwenden. Der Fanout muss als Folge des Rx-Modells "explizit" sein.

Beispiele I über gekommen sind verwenden eine routee spezifische Flow:

// The flow from beginning to end to be passed into handleWebsocketMessages 
    def websocketDispatchFlow(sender: String): Flow[Message, Message, Unit] = 
    Flow[Message] 
     // First we convert the TextMessage to a ReceivedMessage 
     .collect { case TextMessage.Strict(msg) => ReceivedMessage(sender, msg) } 
     // Then we send the message to the dispatch actor which fans it out 
     .via(dispatchActorFlow(sender)) 
     // The message is converted back to a TextMessage for serialization across the socket 
     .map { case ReceivedMessage(from, msg) => TextMessage.Strict(s"$from: $msg") } 

    def route = 
    (get & path("chat") & parameter('name)) { name => 
     handleWebsocketMessages(websocketDispatchFlow(sender = name)) 
    } 

Hier ist eine Diskussion darüber ist:

Und das ist genau das, was ich weiß nicht wie in Akka Bach, Diese explizite Fan-out. Wenn ich eine Datenquelle von irgendwo, dass ich Prozess (z. B. Observable oder eine Quelle) erhalten möchte, möchte ich es abonnieren und ich will nicht kümmern, ob es kalt oder heiß ist oder ob es abonniert wurde von anderen Abonnenten oder nicht. Das ist meine Flussanalogie. Der Fluss sollte sich nicht darum kümmern, wer davon trinkt und die Trinker sollten sich nicht um die Quelle des Flusses kümmern oder darum, wie viele andere Trinker dort sind. Meine Probe, die der Mathias entspricht, teilt die Datenquelle, aber es bezieht sich einfach auf Zählen und Sie können 2 Abonnenten haben oder Sie können 100 haben, nicht Angelegenheit. Und hier habe ich Lust bekommen, aber Referenzzählung funktioniert nicht funktioniert, wenn Sie keine Ereignisse verlieren wollen oder wenn Sie sicherstellen möchten, dass der Stream immer eingeschaltet bleibt. Aber dann verwenden Sie ConnectableObservable , die connect(): Cancelable hat, und das ist eine perfekte Passform für sagen ... Play LifeCycle Plugin. Darüber hinaus können Sie ein BehaviorSubject oder ein ReplaySubject verwenden, wenn Sie vorherige Werte für neue Abonnenten wiederholen möchten. Und die Dinge funktionieren danach, keine manuelle Zeichnung dieses Verbindungsgraphen benötigt. ... ... (dies ist von https://bionicspirit.com/blog/2015/09/06/monifu-vs-akka-streams.html) ... Für Funktionen, die eine beobachtbare und zurück eine beobachtbare nehmen wir in der Tat Aufzug haben, was die nächste Sache, etwas ist, das einen Namen hat und kann wegen der LiftOperators1 für Subject oder andere beobachtbare Typen in Monifu mit großem Erfolg eingesetzt werden (und 2), was es ermöglicht, Observables zu transformieren, ohne ihren Typ zu verlieren - Dies ist eine OOP-ische Verbesserung gegenüber dem, was RxJava mit lift macht.

Aber solche Funktionen sind nicht äquivalent zu Processor/Subject. Der Unterschied ist, dass Subject gleichzeitig ein Verbraucher und ein Produzent ist. Das bedeutet, dass Abonnenten beim Start der Datenquelle nicht genau die Adresse steuern können und dass die Datenquelle im Wesentlichen hot ist (was bedeutet, dass mehrere Abonnenten dieselbe Datenquelle verwenden). In Rx ist es völlig in Ordnung, wenn Sie modellieren kalt Observablen (dh Observablen, die eine neue Datenquelle für jeden einzelnen Abonnenten starten). Auf der anderen Seite in Rx (im Allgemeinen) ist es nicht in Ordnung Datenquellen, die nur einmal abonniert werden können, und das ist es dann. Die einzige Ausnahme zu dieser Regel in Monifu sind die Observables, die von der GroupBy-Operator erzeugt werden, aber das ist wie die Ausnahme, die die -Regel bestätigt.

Was dies bedeutet, vor allem in Verbindung mit einer weiteren Einschränkung des Vertrages sowohl Monifu und der Reactive Streams Protokoll (ich soll nicht mehrfach mit dem gleichen Verbraucher abonniert hat), ist, dass eine Subject oder eine Processor Instanz ist nicht wiederverwendbar. Damit eine Instanz wiederverwendbar ist, würde das Rx-Modell eine Fabrik von Processor benötigen. Darüber hinaus bedeutet dies, dass Ihre Datenquelle automatisch hot (gemeinsam für mehrere Abonnenten) sein muss, wenn Sie eine Subject/Processor verwenden möchten.

+0

Interessanterweise nahm ich meinen Code von https://github.com/J-Technologies/akka-http-websocket-activator-template, der überraschend funktioniert. Es verwendet Source.actorPublisher anstelle von Source.fromPublisher - das ist der einzige Unterschied. Nicht in der Lage zu verstehen, warum das funktioniert, und meine tut nicht – vishr

+0

Ich bin mir ziemlich sicher, die Post, die ich zitierte, ist genau ... so bedeutet es, Sie müssen es wie ein regulärer Actor verwenden und nicht an eine ReactiveStreams konforme –

+0

Von 'ActorPublisher 'it says:/** * Erstellen Sie einen [[org.reactivestreams.Publisher]], der von einem Akteur [[ActorPublisher]] unterstützt wird. Er kann sein * an einen [[org.reactivestreams.Subscriber]] angehängt oder als Eingabequelle für einen * [[akka.stream.scaladsl.Flow]] verwendet werden. * / –