2016-04-26 7 views
2

Ich probiere einige Sachen mit Akka Streams für eines meiner Projekte, wo ich bereits Rx Scala verwenden. Ich war versucht zu sehen, wie Akka Streams die Rx Scala-Bibliothek, die wir haben, ersetzen könnte. Eine Sache, die ich mit Akka Streams nicht sehen kann, ist die Möglichkeit, eine Quelle und viele Sinks zu haben. Sag mal, in diesem Beispiel gerade aus Akka Streams Dokumentation entnommen:Eine Quelle zu vielen sinken mit Akka Streams

val source = Source(1 to 10) 
val sink = Sink.fold[Int, Int](0)(_ + _) 

// connect the Source to the Sink, obtaining a RunnableGraph 
val runnable: RunnableGraph[Future[Int]] = source.toMat(sink)(Keep.right) // how could I materialize to a Seq of Sinks? 

// materialize the flow and get the value of the FoldSink 
val sum: Future[Int] = runnable.run() 

Wenn eine Rx-Bibliothek, ich habe sowohl die Quelle (beobachtbare) und Sink (Observer) völlig entkoppelt, was mir die Flexibilität gibt, 1 Quelle zur Karte (Observable) und haben n Sinks (Beobachter). Wie kann ich das mit den Akka Streams erreichen? Alle Hinweise wären hilfreich!

Antwort

2

Dieses mit Graphs verfügbar ist, spezifisch Broadcast:

broadcast [T] - (1-Eingang, N Ausgänge) Element ein Eingang gegeben emittiert zu jeden Ausgang

einige Code Probe aus die Dokumentation:

val in = Source(1 to 10) 
val out = Sink.ignore 

val bcast = builder.add(Broadcast[Int](2)) 
val merge = builder.add(Merge[Int](2)) 

val f1, f2, f3, f4 = Flow[Int].map(_ + 10) 

in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out 
      bcast ~> f4 ~> merge 
ClosedShape