Ich bin neu bei AKKA Streams. (Mit Akka v 2.4.4) Ich versuche, einen Websocket zu erstellen, der neue Benachrichtigungen an abonnierte Clients senden kann. Meine Strategie besteht darin, einen ActorPublisher zu implementieren, an den ich später eine Nachricht senden kann, und ihn dann an die Kunden weiterleiten zu lassen.Actorpublisher als Quelle in handleMessagesWithSinkSource
case class Tick()
class TickActor extends ActorPublisher[Int] {
import scala.concurrent.duration._
implicit val ec = context.dispatcher
val tick = context.system.scheduler.schedule(1 second, 1 second, self, `Tick())`
var cnt = 0
var buffer = Vector.empty[Int]
override def receive: Receive = {
case Tick() => {
cnt = cnt + 1
if (buffer.isEmpty && totalDemand > 0) {
onNext(cnt)
}
else {
buffer :+= cnt
if (totalDemand > 0) {
val (use,keep) = buffer.splitAt(totalDemand.toInt)
buffer = keep
use foreach onNext
}
}
}
}
override def postStop() = tick.cancel()
}
Mein Problem ist, dass ich weiß nicht, wie es als Quelle zu verwenden:
Um Ich kopierte ein Beispiel eines ActorPublisher loszulegen.
Ich habe versucht, die folgenden:
val source: Source[Strict, ActorRef] = Source.actorPublisher(Props[TickActor]).map(i => TextMessage(i.toString))
optionalHeaderValueByType[akka.http.scaladsl.model.ws.UpgradeToWebSocket]() {
case Some(upgrade) =>
complete(
upgrade.handleMessagesWithSinkSource(Sink.ignore,source))
case None =>
reject(akka.http.scaladsl.server.ExpectedWebSocketRequestRejection)
}
Aber wenn ich mit einem Klienten ich folgendes Classcast erhalten verbinden: java.lang.ClassCastException: java.lang.Integer kann nicht auf scala.runtime.Nothing gegossen werden $
Wenn ich die Quelle zu ändern:
val src: Source[Strict, NotUsed] = Source.fromIterator(() => Iterator.continually(ThreadLocalRandom.current.nextInt()))
.filter(i => i > 0 && i % 2 == 0).map(i => TextMessage(i.toString))
Es läuft ganz gut.
Ich kämpfe ein bisschen die Punkte verbinden, so hoffentlich können Sie mich in die richtige Richtung führen.