2016-04-15 6 views
0

Ich versuche, den folgenden Code auszuführen, basierend auf der akka Strom Kurzanleitung:Akka Stream - Source.fromPublisher

implicit val system = ActorSystem("QuickStart") 
implicit val materializer = ActorMaterializer() 

val songs = Source.fromPublisher(SongsService.stream) 

val count: Flow[Song, Int, NotUsed] = Flow[Song].map(_ => 1) 

val sumSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _) 

val counterGraph: RunnableGraph[Future[Int]] = 
    songs 
    .via(count) 
    .toMat(sumSink)(Keep.right) 

val sum: Future[Int] = counterGraph.run() 

sum.foreach(c => println(s"Total songs processed: $c")) 

Das Problem hierbei ist, dass die Zukunft ein Ergebnis nie wieder zurückkehren. Der größte Unterschied zum Dokumentationsbeispiel ist meine Quelle.

Ich habe ein Spiel enumerator, die ich es zu einem Akka Verlag bin Umwandlung, was in diesem SongsService.stream

Wenn eine definierte Liste als Quelle wie mit:

val songs = Source(list) 

Es funktioniert , aber mit dem Source.fromPublisher nicht.

Aber das Problem ist hier nicht der Verlag in der Tat, kann ich eine einfache Bedienung tun und es funktioniert:

val songs = Source.fromPublisher(SongsService.stream) 
songs.runForeach(println) 

Es die Datenbank durchläuft, erstellen Sie das Spiel enumerator, wandelt es in einen Verlag und ich kann iterieren über.

Irgendwelche Ideen?

+1

Liefert 'songs.runForeach (println)' 'Done', wenn Sie darauf warten? – Mullefa

+0

@Mullefa hat gerade erkannt, dass es nicht so ist. –

Antwort

4

Ihr Publisher wird wahrscheinlich nie abgeschlossen.