Ich versuche, den folgenden Code auszuführen, basierend auf der akka Strom Kurzanleitung:Akka Stream - Source.fromPublisher
implicit val system = ActorSystem("QuickStart")
implicit val materializer = ActorMaterializer()
val songs = Source.fromPublisher(SongsService.stream)
val count: Flow[Song, Int, NotUsed] = Flow[Song].map(_ => 1)
val sumSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _)
val counterGraph: RunnableGraph[Future[Int]] =
songs
.via(count)
.toMat(sumSink)(Keep.right)
val sum: Future[Int] = counterGraph.run()
sum.foreach(c => println(s"Total songs processed: $c"))
Das Problem hierbei ist, dass die Zukunft ein Ergebnis nie wieder zurückkehren. Der größte Unterschied zum Dokumentationsbeispiel ist meine Quelle.
Ich habe ein Spiel enumerator, die ich es zu einem Akka Verlag bin Umwandlung, was in diesem SongsService.stream
Wenn eine definierte Liste als Quelle wie mit:
val songs = Source(list)
Es funktioniert , aber mit dem Source.fromPublisher nicht.
Aber das Problem ist hier nicht der Verlag in der Tat, kann ich eine einfache Bedienung tun und es funktioniert:
val songs = Source.fromPublisher(SongsService.stream)
songs.runForeach(println)
Es die Datenbank durchläuft, erstellen Sie das Spiel enumerator, wandelt es in einen Verlag und ich kann iterieren über.
Irgendwelche Ideen?
Liefert 'songs.runForeach (println)' 'Done', wenn Sie darauf warten? – Mullefa
@Mullefa hat gerade erkannt, dass es nicht so ist. –