2015-08-25 10 views
5

ich etwas wirklich ähnlich wie diese https://github.com/typesafehub/activator-akka-stream-scala/blob/master/src/main/scala/sample/stream/GroupLogFile.scalaWie gruppiert Teilströme mit mapAsync in akka verbrauchen Ströme

mein Problem tun müssen, ist, dass ich eine unbekannte Anzahl von Gruppen haben und wenn die Anzahl der Parallelität der mapAsync weniger der Anzahl der Gruppen I got und Fehler in der letzten Senke

Reissen unten SynchronousFileSink (/Users/sam/dev/projects/akka-streams/target/log-ERROR.txt) wegen vorgeschalteten Fehler (akka.stream.impl.StreamSubscriptionTimeoutSupport $$ anon $ 2)

Ich habe versucht, einen Puffer in der Mitte zu setzen, wie in der Musterführung von akka

http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-cookbook.html Strömen vorgeschlagen
groupBy { 
    case LoglevelPattern(level) => level 
    case other     => "OTHER" 
}.buffer(1000, OverflowStrategy.backpressure). 
    // write lines of each group to a separate file 
    mapAsync(parallelism = 2) {.... 

aber mit dem gleichen Ergebnis

+0

Ich frage mich, ob die Verwendung von 'mapAsync' irgendeinen Zweck an erster Stelle erfüllt? Was passiert, wenn Sie stattdessen 'map' verwenden? – jrudolph

+0

mit Karte die Gruppen sind nicht in parallel/async konsumiert, was ist mein gewünschtes Verhalten – Sammyrulez

+2

Ich denke, das ist ein Missverständnis. Alle Gruppen werden durch eine 'Source [Something]' dargestellt (nach 'groupBy' hast du eine' Source [Source [Something]] ', richtig?). Also, das einzige, was Sie tun müssen innerhalb der 'map' (' foreach' sollte genauso gut funktionieren) wäre, die Subflows auszuführen, was eine asynchrone Operation ist. Die Subflows würden dann von selbst laufen und Ihr 'map' Element wäre frei, um die nächste' Source [Something] 'zu akzeptieren. – jrudolph

Antwort

3

auf jrudolph Kommentar Erweiterung ist absolut korrekt ist .. .

Sie benötigen in diesem Fall keine mapAsync. Als einfaches Beispiel: Angenommen, Sie eine Quelle von Tupeln haben

import akka.stream.scaladsl.{Source, Sink} 

def data() = List(("foo", 1), 
        ("foo", 2), 
        ("bar", 1), 
        ("foo", 3), 
        ("bar", 2)) 

val originalSource = Source(data) 

Sie können dann eine groupBy durchführen, um eine

Jeder der gruppierten Quellen Source of Sources

def getID(tuple : (String, Int)) = tuple._1 

//a Source of (String, Source[(String, Int),_]) 
val groupedSource = originalSource groupBy getID 
zu erstellen mit nur parallel verarbeitet werden map, keine Notwendigkeit für etwas Phantasie. Hier ist ein Beispiel für jede Gruppierung in einem unabhängigen Strom summiert werden:

import akka.actor.ActorSystem 
import akka.stream.ACtorMaterializer 

implicit val actorSystem = ActorSystem() 
implicit val mat = ActorMaterializer() 
import actorSystem.dispatcher 

def getValues(tuple : (String, Int)) = tuple._2 

//does not have to be a def, we can re-use the same sink over-and-over 
val sumSink = Sink.fold[Int,Int](0)(_ + _) 

//a Source of (String, Future[Int]) 
val sumSource = 
    groupedSource map { case (id, src) => 
    id -> {src map getValues runWith sumSink} //calculate sum in independent stream 
    } 

nun alle der "foo" Zahlen werden parallel zusammengefaßt, mit all den "bar" Zahlen.

mapAsync wird verwendet, wenn Sie eine gekapselte Funktion haben, die Future[T] zurückgibt und Sie versuchen, stattdessen eine T zu emittieren; was bei dir nicht der Fall ist. Außerdem beinhaltet mapAsync waiting for results, was nicht reactive ist ...

+1

Nun, da akka streams mit akka zusammengeführt wurden und die Source Source-Semantik durch SubFlow ersetzt wurde, wie würde man ein ähnliches Verhalten erreichen? – AlphaGeek

+0

@AlphaGeek Ich habe festgestellt, dass sich die groupBy-Funktionalität vor ein paar Monaten geändert hat. Das Kochbuch hat ein aktualisiertes Beispiel basierend auf dem neuen Teilstreams-Ansatz: http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-cookbook.html#Implementing_reduce-by-key –

+0

@RamonJRomeroyVigil Cookbook erklärt es nicht. Sie verwenden Reduzieren, was unausführbar ist, wenn wir in jedem Teilstrom Objekte mit Punkten haben. Was ich (und auch der Autor) möchte, ist "Quelle" pro Gruppe. Wie würden wir das machen? – expert