2012-07-26 4 views
5

Hier ist mein Szenario:Wird Akka onReceive-Methode gleichzeitig ausgeführt?

Ich habe einen Hauptdarsteller, der Nachrichten von mehreren untergeordneten Akteuren empfängt. Diese Nachrichten enthalten zu aggregierende Daten. Muss ich in dieser Aggregationslogik auf Synchronisationsprobleme achten, wenn ich eine geteilte Datenstruktur verwende, um die Aggregation zu sammeln?

else if(arg0 instanceof ReducedMsg){ 

          ReducedMsg reduced = (ReducedMsg)arg0; 
     counter.decrementAndGet(); 

     synchronized(finalResult){ 

      finalResult.add((KeyValue<K, V>) reduced.getReduced()); 

      if(counter.get() == 0){ 
            if(checkAndReduce(finalResult)){ 

        finalResult.clear(); 
       } 
       else{ 
        stop(); 
        latch.countDown(); 
       } 

      } 

     } 



    } 

So wie man sehen kann ich einen finalResult haben, zu dem jede Nachricht wird aggregiert und nach einer Verarbeitungslogik muss die Sammlung als auch gelöscht werden.

Eigentlich was ich versuche zu implementieren ist eine rekursive (assoziative) Reduktion mapreduce. Also muss ich den synchronisierten Block behalten, den ich vermute? Oder ist es zufällig, dass Akka den onReceive Thread gleichzeitig ausführt?

Diese Logik liefert genaue und vorhersehbare Ergebnisse für kleine Datenmengen. Mein Problem ist, wenn mein Eingabedatensatz ein wenig groß ist, hängt der Code. Ich will sicher sein, dass das wegen der Kontextwechsel für meinen Synchronisationsblock ist, so dass ich mich vielleicht in ein anderes Design verliebte.

Antwort

14

onReceive() ist nie gleichzeitig aufgerufen. Dies ist die grundlegendste Garantie, die Akka Ihnen gibt. Diese

bedeutet, dass, wenn Ihr counter Variable ein Feld in einem Schauspieler ist und kein anderes Stück Code kann das Feld direkt zugreifen zu können, sicher normalen int/long statt AtomicInteger/AtomicLong verwenden können. Auch die Synchronisation auf finalResult ist nicht notwendig, vorausgesetzt, es handelt sich um ein Feld, das in einem Aktor verkapselt und verborgen ist.

Schließlich ist die Verwendung von CountDownLatch verdächtig. In Akka-Anwendungen sollten Sie keine Synchronisierungsgrundelemente verwenden. Akteure sind im Wesentlichen single-threaded und alle Kommunikation (einschließlich aufwachen und Weitergabe von Daten) sollte über Message-Passing implementiert werden.

Dies alles wird in der Dokumentation erklärt: http://doc.akka.io/docs/akka/2.0.2/general/jmm.html#Actors_and_the_Java_Memory_Model

+0

Sie Tomasz danken. Deine erste Zeile löscht viele meiner Zweifel! Registrieren Sie die Verwendung des Latch, ich muss das tun, um eine Client-Schnittstelle zu geben, die warten sollte, bis die Aktor-Verarbeitung abgeschlossen ist. Mein Ziel ist es, ein Java-Framework zu entwickeln, das intern Akka/Scala zur Verarbeitung nutzt. –

+0

@sutanudalui: Sie können actor * synchron * aufrufen, was im Wesentlichen bedeutet, dass Akka auf eine temporäre Warteschlange wartet. Keine Notwendigkeit, dies manuell zu tun. Konsultieren Sie Akka Dokumente über 'ask' (im Gegensatz zu' tell') Nachrichtenmuster. –

+0

Okay. Ich werde ein bisschen tiefer gehen. Ich habe einen Round-Robin-Router mit N Slave-Schauspielern. Was ich beabsichtige, ist eine Parallelverarbeitung und dann ansammeln das Ergebnis. So leitet der Master-Aktor bei Empfang jedes Eingangs zu einem der Slaves. Die Slaves senden die Nachricht bei der Verarbeitung an den Master zurück, der aggregieren muss. Diese Aggregationsphase, in der ich über Synchronisationsprobleme nachdachte. Über den bereitgestellten Doc-Link kann ich nicht garantieren, dass Akka diesen gemeinsamen Speicher garantiert (in meinem Fall wird das "finalResult" geschützt). Verstehe ich das richtig? –