Ich möchte eine SourceQueue verwenden, um Elemente dynamisch in eine Akka Stream Quelle zu schieben. Play Controller benötigt eine Quelle, um ein Ergebnis mit der Methode chuncked
streamen zu können.
Da Play seinen eigenen Akka Stream Sink unter der Haube verwendet, kann ich die Source Queue selbst nicht mit einem Sink materialisieren, da die Quelle verbraucht wird, bevor sie von der chunked
Methode verwendet wird (außer wenn ich den folgenden Hack verwende).Wie verwende ich einen Akka Streams SourceQueue mit PlayFramework
Ich bin in der Lage, damit es funktioniert, wenn ich die Quellen-Warteschlange mit einem reaktiven Strom Verlag vorge materialisieren, aber es ist eine Art von ‚schmutziger Hack‘:
def sourceQueueAction = Action{
val (queue, pub) = Source.queue[String](10, OverflowStrategy.fail).toMat(Sink.asPublisher(false))(Keep.both).run()
//stupid example to push elements dynamically
val tick = Source.tick(0 second, 1 second, "tick")
tick.runForeach(t => queue.offer(t))
Ok.chunked(Source.fromPublisher(pub))
}
Gibt es einen einfacheren Weg zu Verwenden Sie eine Akka Streams SourceQueue mit PlayFramework?
Dank
Warum, wenn ich 'queueSource.map {_.toUpperCase}' zum Beispiel I bekomme keine Quelle [String, NotUsed]? Stattdessen gibt es den Fehler zurück: Ausdruck des Typs queueSource.Repr [String] entspricht nicht dem erwarteten Typ Quelle [String, NotUsed] .' Wo werden Sie Transformationen an den Elementen der Quelle vornehmen? Wie Ticks in [Ihrem Beispiel] (http://loicdescotte.github.io/posts/play-akka-streams-queue/) – gabrielgiussi