2016-04-04 10 views
13

Ich möchte eine SourceQueue verwenden, um Elemente dynamisch in eine Akka Stream Quelle zu schieben. Play Controller benötigt eine Quelle, um ein Ergebnis mit der Methode chuncked streamen zu können.
Da Play seinen eigenen Akka Stream Sink unter der Haube verwendet, kann ich die Source Queue selbst nicht mit einem Sink materialisieren, da die Quelle verbraucht wird, bevor sie von der chunked Methode verwendet wird (außer wenn ich den folgenden Hack verwende).Wie verwende ich einen Akka Streams SourceQueue mit PlayFramework

Ich bin in der Lage, damit es funktioniert, wenn ich die Quellen-Warteschlange mit einem reaktiven Strom Verlag vorge materialisieren, aber es ist eine Art von ‚schmutziger Hack‘:

def sourceQueueAction = Action{ 

    val (queue, pub) = Source.queue[String](10, OverflowStrategy.fail).toMat(Sink.asPublisher(false))(Keep.both).run() 

    //stupid example to push elements dynamically 
    val tick = Source.tick(0 second, 1 second, "tick") 
    tick.runForeach(t => queue.offer(t)) 

    Ok.chunked(Source.fromPublisher(pub)) 
    } 

Gibt es einen einfacheren Weg zu Verwenden Sie eine Akka Streams SourceQueue mit PlayFramework?

Dank

Antwort

19

Die Lösung ist mapMaterializedValue auf der Quelle zu verwenden, um eine Zukunft der Warteschlange Materialisierung zu erhalten:

def sourceQueueAction = Action { 
    val (queueSource, futureQueue) = peekMatValue(Source.queue[String](10, OverflowStrategy.fail)) 

    futureQueue.map { queue => 
     Source.tick(0.second, 1.second, "tick") 
     .runForeach (t => queue.offer(t)) 
    } 
    Ok.chunked(queueSource) 

    } 

    //T is the source type, here String 
    //M is the materialization type, here a SourceQueue[String] 
    def peekMatValue[T, M](src: Source[T, M]): (Source[T, M], Future[M]) = { 
    val p = Promise[M] 
    val s = src.mapMaterializedValue { m => 
     p.trySuccess(m) 
     m 
    } 
    (s, p.future) 
    } 
+0

Warum, wenn ich 'queueSource.map {_.toUpperCase}' zum Beispiel I bekomme keine Quelle [String, NotUsed]? Stattdessen gibt es den Fehler zurück: Ausdruck des Typs queueSource.Repr [String] entspricht nicht dem erwarteten Typ Quelle [String, NotUsed] .' Wo werden Sie Transformationen an den Elementen der Quelle vornehmen? Wie Ticks in [Ihrem Beispiel] (http://loicdescotte.github.io/posts/play-akka-streams-queue/) – gabrielgiussi