Ich versuche, einen akka streams basierten Flow in meine Play 2.5 App zu integrieren. Die Idee ist, dass Sie ein Foto streamen und dann als rohe Datei, eine Thumbnail-Version und eine mit Wasserzeichen versehene Version auf die Festplatte schreiben können.Wie man eine Akka-Streams-Senke aus mehreren Dateischreibvorgängen zusammenbaut?
ich es geschafft, diese Arbeit mit einem Diagramm, um so etwas zu bekommen:
val byteAccumulator = Flow[ByteString].fold(new ByteStringBuilder())((builder, b) => {builder ++= b.toArray})
.map(_.result().toArray)
def toByteArray = Flow[ByteString].map(b => b.toArray)
val graph = Flow.fromGraph(GraphDSL.create() {implicit builder =>
import GraphDSL.Implicits._
val streamFan = builder.add(Broadcast[ByteString](3))
val byteArrayFan = builder.add(Broadcast[Array[Byte]](2))
val output = builder.add(Flow[ByteString].map(x => Success(Done)))
val rawFileSink = FileIO.toFile(file)
val thumbnailFileSink = FileIO.toFile(getFile(path, Thumbnail))
val watermarkedFileSink = FileIO.toFile(getFile(path, Watermarked))
streamFan.out(0) ~> rawFileSink
streamFan.out(1) ~> byteAccumulator ~> byteArrayFan.in
streamFan.out(2) ~> output.in
byteArrayFan.out(0) ~> slowThumbnailProcessing ~> thumbnailFileSink
byteArrayFan.out(1) ~> slowWatermarkProcessing ~> watermarkedFileSink
FlowShape(streamFan.in, output.out)
})
graph
}
Dann habe ich es einen Akkumulator wie dies in meinem Spiel-Controller Draht:
val sink = Sink.head[Try[Done]]
val photoStorageParser = BodyParser { req =>
Accumulator(sink).through(graph).map(Right.apply)
}
Das Problem ist, dass meine zwei verarbeiteten Dateisenken nicht abgeschlossen werden und ich für beide verarbeiteten Dateien keine Größen, aber nicht die rohe Größe bekomme. Meine Theorie ist, dass der Akkumulator nur auf einen der Ausgänge meines Fächers wartet, also wenn der Eingabestrom vollendet ist und mein byteAccumulator die komplette Datei ausspuckt, hat die Wiedergabe den materialisierten Wert von der Ausgabe, wenn die Verarbeitung beendet ist .
Also, meine Fragen sind:
Bin ich damit auf dem richtigen Weg soweit mein Ansatz geht? Wie lautet das erwartete Verhalten für das Ausführen eines Diagramms? Wie kann ich alle meine Waschbecken zusammenbringen, um eine letzte Spüle zu bilden?
Ich denke auch, dass der Grund ist, dass die Flüsse nicht nach der Verarbeitung zusammengeführt werden. Haben Sie 'Sink.combine' versucht (http://doc.akka.io/docs/akka/2.4.4/scala/stream/stream-graphs.html#Combining_Sources_and_Sinks_with_simplified_API)? – devkat
Ja, ich habe Sink.combine ein Go, aber das vereinheitlicht mehrere Senken, um _to_ wie ein Fan zu senden. Ich denke, ich suche einen Fan, aber es scheint, dass Sie das nicht mit Senken nur Quellen tun können! – Tompey
Dies scheint ein ähnliches Beispiel zu sein: http://doc.akka.io/docs/akka/2.4.4/scala/stream/stream-quickstart.html#Broadcasting_a_stream. Vielleicht musst du eine 'SinkShape' anstelle einer' FlowShape' zurückgeben, um zu erklären, dass dein Stream fertig ist? – devkat