2016-05-10 5 views
8

Ich versuche, einen akka streams basierten Flow in meine Play 2.5 App zu integrieren. Die Idee ist, dass Sie ein Foto streamen und dann als rohe Datei, eine Thumbnail-Version und eine mit Wasserzeichen versehene Version auf die Festplatte schreiben können.Wie man eine Akka-Streams-Senke aus mehreren Dateischreibvorgängen zusammenbaut?

ich es geschafft, diese Arbeit mit einem Diagramm, um so etwas zu bekommen:

val byteAccumulator = Flow[ByteString].fold(new ByteStringBuilder())((builder, b) => {builder ++= b.toArray}) 
            .map(_.result().toArray) 

def toByteArray = Flow[ByteString].map(b => b.toArray) 

val graph = Flow.fromGraph(GraphDSL.create() {implicit builder => 
    import GraphDSL.Implicits._ 
    val streamFan = builder.add(Broadcast[ByteString](3)) 
    val byteArrayFan = builder.add(Broadcast[Array[Byte]](2)) 
    val output = builder.add(Flow[ByteString].map(x => Success(Done))) 

    val rawFileSink = FileIO.toFile(file) 
    val thumbnailFileSink = FileIO.toFile(getFile(path, Thumbnail)) 
    val watermarkedFileSink = FileIO.toFile(getFile(path, Watermarked)) 

    streamFan.out(0) ~> rawFileSink 
    streamFan.out(1) ~> byteAccumulator ~> byteArrayFan.in 
    streamFan.out(2) ~> output.in 

    byteArrayFan.out(0) ~> slowThumbnailProcessing ~> thumbnailFileSink 
    byteArrayFan.out(1) ~> slowWatermarkProcessing ~> watermarkedFileSink 

    FlowShape(streamFan.in, output.out) 
}) 

graph 

}

Dann habe ich es einen Akkumulator wie dies in meinem Spiel-Controller Draht:

val sink = Sink.head[Try[Done]] 

val photoStorageParser = BodyParser { req => 
    Accumulator(sink).through(graph).map(Right.apply) 
} 

Das Problem ist, dass meine zwei verarbeiteten Dateisenken nicht abgeschlossen werden und ich für beide verarbeiteten Dateien keine Größen, aber nicht die rohe Größe bekomme. Meine Theorie ist, dass der Akkumulator nur auf einen der Ausgänge meines Fächers wartet, also wenn der Eingabestrom vollendet ist und mein byteAccumulator die komplette Datei ausspuckt, hat die Wiedergabe den materialisierten Wert von der Ausgabe, wenn die Verarbeitung beendet ist .

Also, meine Fragen sind:
Bin ich damit auf dem richtigen Weg soweit mein Ansatz geht? Wie lautet das erwartete Verhalten für das Ausführen eines Diagramms? Wie kann ich alle meine Waschbecken zusammenbringen, um eine letzte Spüle zu bilden?

+0

Ich denke auch, dass der Grund ist, dass die Flüsse nicht nach der Verarbeitung zusammengeführt werden. Haben Sie 'Sink.combine' versucht (http://doc.akka.io/docs/akka/2.4.4/scala/stream/stream-graphs.html#Combining_Sources_and_Sinks_with_simplified_API)? – devkat

+0

Ja, ich habe Sink.combine ein Go, aber das vereinheitlicht mehrere Senken, um _to_ wie ein Fan zu senden. Ich denke, ich suche einen Fan, aber es scheint, dass Sie das nicht mit Senken nur Quellen tun können! – Tompey

+0

Dies scheint ein ähnliches Beispiel zu sein: http://doc.akka.io/docs/akka/2.4.4/scala/stream/stream-quickstart.html#Broadcasting_a_stream. Vielleicht musst du eine 'SinkShape' anstelle einer' FlowShape' zurückgeben, um zu erklären, dass dein Stream fertig ist? – devkat

Antwort

7

Ok, nach einer wenig Hilfe (Andreas auf dem richtigen Weg war), habe ich an dieser Lösung gekommen, die den Trick:

val rawFileSink = FileIO.toFile(file) 
val thumbnailFileSink = FileIO.toFile(getFile(path, Thumbnail)) 
val watermarkedFileSink = FileIO.toFile(getFile(path, Watermarked)) 

val graph = Sink.fromGraph(GraphDSL.create(rawFileSink, thumbnailFileSink, watermarkedFileSink)((_, _, _)) { 
    implicit builder => (rawSink, thumbSink, waterSink) => { 
    val streamFan = builder.add(Broadcast[ByteString](2)) 
    val byteArrayFan = builder.add(Broadcast[Array[Byte]](2)) 

    streamFan.out(0) ~> rawSink 
    streamFan.out(1) ~> byteAccumulator ~> byteArrayFan.in 

    byteArrayFan.out(0) ~> processorFlow(Thumbnail) ~> thumbSink 
    byteArrayFan.out(1) ~> processorFlow(Watermarked) ~> waterSink 

    SinkShape(streamFan.in) 
    } 
}) 

graph.mapMaterializedValue[Future[Try[Done]]](fs => Future.sequence(Seq(fs._1, fs._2, fs._3)).map(f => Success(Done))) 

Nach dem es tot ist jeder diese aus dem Spiel zu nennen:

val photoStorageParser = BodyParser { req => 
    Accumulator(theSink).map(Right.apply) 
} 

def createImage(path: String) = Action(photoStorageParser) { req => 
    Created 
} 
+0

danke Mann, ich hatte gerade eine ähnliche Aufgabe und konnte nicht herausfinden, wie man auf alle materialisierten Futures wartet. Ihre Lösung hat viel geholfen und es funktioniert! –

+0

Hallo! Wie wäre es mit variabler Anzahl der Spüle für die Kombination? – Alexander