Ich versuche, einen Kafka-Consumer zu WebSet Flow mit reaktiven Kafka, Akka-http und Akka-Stream zu schreiben.Kafka Nachricht an WebSocket
val publisherActor = actorSystem.actorOf(CommandPublisher.props)
val publisher = ActorPublisher[String](publisherActor)
val commandSource = Source.fromPublisher(publisher) map toMessage
def toMessage(c: String): Message = TextMessage.Strict(c)
class CommandPublisher extends ActorPublisher[String] {
override def receive = {
case cmd: String =>
if (isActive && totalDemand > 0)
onNext(cmd)
}
}
object CommandPublisher {
def props: Props = Props(new CommandPublisher())
}
// This is the route
def mainFlow(): Route = {
path("ws"/"commands") {
handleWebSocketMessages(Flow.fromSinkAndSource(Sink.ignore, commandSource))
}
}
Vom kafka Verbraucher (hier weggelassen), mache ich eine publisherActor ! commandString
Inhalte dynamisch an die websocket hinzuzufügen.
aber ich laufe in dieser Ausnahme im Backend, wenn ich mehrere Clients auf die websocket starten:
[ERROR] [03/31/2016 21:17:10.335] [KafkaWs-akka.actor.default-dispatcher-3][akka.actor.ActorSystemImpl(KafkaWs)] WebSocket handler failed with can not subscribe the same subscriber multiple times (see reactive-streams specification, rules 1.10 and 2.12)
java.lang.IllegalStateException: can not subscribe the same subscriber multiple times (see reactive-streams specification, rules 1.10 and 2.12)
at akka.stream.impl.ReactiveStreamsCompliance$.canNotSubscribeTheSameSubscriberMultipleTimesException(ReactiveStreamsCompliance.scala:35)
at akka.stream.actor.ActorPublisher$class.aroundReceive(ActorPublisher.scala:295)
...
Kann nicht ein Fluss für alle websocket Clients verwendet werden? Oder sollte der Flow-/Publisher-Akteur pro Client erstellt werden?
Hier habe ich vor, "aktuelle"/"Live" -Benachrichtigungen an alle Websocket-Clients zu senden. Der Verlauf der Benachrichtigungen ist irrelevant und muss für neue Kunden ignoriert werden.
Interessanterweise nahm ich meinen Code von https://github.com/J-Technologies/akka-http-websocket-activator-template, der überraschend funktioniert. Es verwendet Source.actorPublisher anstelle von Source.fromPublisher - das ist der einzige Unterschied. Nicht in der Lage zu verstehen, warum das funktioniert, und meine tut nicht – vishr
Ich bin mir ziemlich sicher, die Post, die ich zitierte, ist genau ... so bedeutet es, Sie müssen es wie ein regulärer Actor verwenden und nicht an eine ReactiveStreams konforme –
Von 'ActorPublisher 'it says:/** * Erstellen Sie einen [[org.reactivestreams.Publisher]], der von einem Akteur [[ActorPublisher]] unterstützt wird. Er kann sein * an einen [[org.reactivestreams.Subscriber]] angehängt oder als Eingabequelle für einen * [[akka.stream.scaladsl.Flow]] verwendet werden. * / –