2016-07-07 19 views
2

mit früheren Versionen von Akka Streams erstellen, groupBy ein Source von Source s zurück, die in ein Source[Seq[A]] verwirklicht werden könnte.Wie eine Akka Stream-Quelle [Seq [A]] von Quelle [A]

Mit Akka Streams 2.4 sehe ich, dass groupBy eine zurückgibt - es ist mir nicht klar, wie Sie das verwenden. Die Transformationen, die ich auf den Flow anwenden muss, müssen das ganze Seq haben, also kann ich nicht einfach map über die SubFlow (glaube ich).

Ich habe eine Klasse geschrieben, die extends GraphStage, die die Aggregation über eine veränderbare Sammlung in der GraphStageLogic tut, aber gibt es eingebaute Funktionalität dafür? Fehle ich den Punkt SubFlow?

Antwort

0

Ich landete das Schreiben eines GraphStage:

class FlowAggregation[A, B](f: A => B) extends GraphStage[FlowShape[A, Seq[A]]] { 
    val in: Inlet[A] = Inlet("in") 
    val out: Outlet[Seq[A]] = Outlet("out") 
    override val shape = FlowShape.of(in, out) 

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
    new GraphStageLogic(shape) { 

     private var counter: Option[B] = None 
     private var aggregate = scala.collection.mutable.ArrayBuffer.empty[A] 

     setHandler(in, new InHandler { 
     override def onPush(): Unit = { 
      val element = grab(in) 

      counter.fold({ 
      counter = Some(f(element)) 
      aggregate += element 
      pull(in) 
      }) { p => 
      if (f(element) == p) { 
       aggregate += element 
       pull(in) 
      } else { 
       push(out, aggregate) 
       counter = Some(f(element)) 
       aggregate = scala.collection.mutable.ArrayBuffer(element) 
      } 
      } 
     } 
     override def onUpstreamFinish(): Unit = { 
      emit(out, aggregate) 
      complete(out) 
     } 
     }) 

     setHandler(out, new OutHandler { 
     override def onPull(): Unit = { 
      pull(in) 
     } 
     }) 
    } 
}