ich etwas wirklich ähnlich wie diese https://github.com/typesafehub/activator-akka-stream-scala/blob/master/src/main/scala/sample/stream/GroupLogFile.scalaWie gruppiert Teilströme mit mapAsync in akka verbrauchen Ströme
mein Problem tun müssen, ist, dass ich eine unbekannte Anzahl von Gruppen haben und wenn die Anzahl der Parallelität der mapAsync weniger der Anzahl der Gruppen I got und Fehler in der letzten Senke
Reissen unten SynchronousFileSink (/Users/sam/dev/projects/akka-streams/target/log-ERROR.txt) wegen vorgeschalteten Fehler (akka.stream.impl.StreamSubscriptionTimeoutSupport $$ anon $ 2)
Ich habe versucht, einen Puffer in der Mitte zu setzen, wie in der Musterführung von akka
http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-cookbook.html Strömen vorgeschlagengroupBy {
case LoglevelPattern(level) => level
case other => "OTHER"
}.buffer(1000, OverflowStrategy.backpressure).
// write lines of each group to a separate file
mapAsync(parallelism = 2) {....
aber mit dem gleichen Ergebnis
Ich frage mich, ob die Verwendung von 'mapAsync' irgendeinen Zweck an erster Stelle erfüllt? Was passiert, wenn Sie stattdessen 'map' verwenden? – jrudolph
mit Karte die Gruppen sind nicht in parallel/async konsumiert, was ist mein gewünschtes Verhalten – Sammyrulez
Ich denke, das ist ein Missverständnis. Alle Gruppen werden durch eine 'Source [Something]' dargestellt (nach 'groupBy' hast du eine' Source [Source [Something]] ', richtig?). Also, das einzige, was Sie tun müssen innerhalb der 'map' (' foreach' sollte genauso gut funktionieren) wäre, die Subflows auszuführen, was eine asynchrone Operation ist. Die Subflows würden dann von selbst laufen und Ihr 'map' Element wäre frei, um die nächste' Source [Something] 'zu akzeptieren. – jrudolph