2012-06-21 8 views
12

Ich lese in der Akka docs, dass es gefährlich ist, über Variablen von einem einschließenden Akteur zu schließen.Akka Schauspieler, Futures und Schließungen

Warnung

In diesem Fall, dass Sie sorgfältig vermeiden müssen über den Referenz des enthaltenden Schauspielers zu schließen, das heißt nicht nennen Methoden auf den umschließenden Schauspieler aus der anonymen Schauspieler-Klasse. Dies würde brechen die Actor-Kapselung und kann Synchronisierungsfehler und Race-Bedingungen einführen, weil der Code des anderen Akteurs wird gleichzeitig mit dem umgebenden Schauspieler eingeplant werden.

Jetzt habe ich zwei Schauspieler, von denen einer etwas von der Sekunde verlangt und etwas mit dem Ergebnis tut. In diesem Beispiel unten habe ich zusammengestellt, Schauspieler Akkumulator ruft Zahlen von Schauspieler NumberGenerator und fügt sie hinzu, Berichterstattung die Summe auf dem Weg.

Dies kann in mindestens zwei verschiedenen Arten durchgeführt werden, wie dies beispielsweise mit zwei unterschiedlichen Funktionen erhalten (A vs B). Der Unterschied zwischen den beiden ist, dass A nicht über die Zähler Variable schließt; Stattdessen erwartet es eine ganze Zahl und fasst es zusammen, während B eine Zukunft erstellt, die über Zähler schließt und die Summe tut. Dies geschieht innerhalb eines anonymen Akteurs, der nur zur Verarbeitung von onSuccess erstellt wurde, wenn ich richtig verstehe, wie das funktioniert.

import com.esotericsoftware.minlog.Log 

import akka.actor.{Actor, Props} 
import akka.pattern.{ask, pipe} 
import akka.util.Timeout 
import akka.util.duration._ 

case object Start 
case object Request 


object ActorTest { 
    var wake = 0 

    val accRef = Main.actorSystem.actorOf(Props[Accumulator], name = "accumulator") 
    val genRef = Main.actorSystem.actorOf(Props[NumberGenerator], name = "generator") 

    Log.info("ActorTest", "Starting !") 

    accRef ! Start 
} 

class Accumulator extends Actor { 
    var counter = 0 

    implicit val timeout = Timeout(5 seconds) 

    // A: WITHOUT CLOSURE 
    def receive = { 
    case Start => ask(ActorTest.genRef, Request).mapTo[Int] pipeTo self 
    case x: Int => counter += x; Log.info("Accumulator", "counter = " + counter); self ! Start 
    } 
    // B: WITH CLOSURE 
    def receive = { 
    case Start => ask(ActorTest.genRef, Request).mapTo[Int] onSuccess { 
     case x: Int => counter += x; Log.info("Accumulator", "counter = " + counter); self ! Start 
    } 
    } 
} 

class NumberGenerator extends Actor { 
    val rand = new java.util.Random() 

    def receive = { 
    case Request => sender ! rand.nextInt(11)-5 
    } 
} 

Ist es absolut böse, in diesem Fall Verschlüsse zu verwenden? Natürlich könnte ich einen AtomicInteger anstelle eines Int verwenden, oder in einem Netzwerk-Szenario mit z. B. netty, eine Schreiboperation auf einem threadsafe-Kanal, aber das ist nicht mein Punkt hier.

Um das Risiko für das Lächerliche zu fragen: Gibt es einen Weg für die onSuccess Future in dieses Schauspieler anstelle eines anonymen mittleren Schauspieler auszuführen, ohne einen Fall, in dem definieren Funktion erhalten?

EDIT

Um es deutlicher, meine Frage ist: Gibt es eine Möglichkeit, eine Reihe von Futures zu zwingen, im selben Thread als gegebene Schauspieler zu laufen?

Antwort

5

Das Problem ist, dass die onSuccess in einem anderen Thread ausgeführt wird als das Gewinde der Schauspieler receive in laufen wird. Sie könnten den pipeTo Ansatz verwenden, oder ein Agent verwenden. Making counter ein AtomicInteger würde das Problem lösen, aber es ist nicht so sauber - das heißt, es bricht das Actor-Modell.

+1

+1 für die Verwendung eines Agenten vorschlagen – gsimard

5

Der einfachste Weg, eine solche Gestaltung der Umsetzung ist durch „Feuer-and-Forget“ semantische Verwendung:

class Accumulator extends Actor { 
    private[this] var counter = 0 

    def receive = { 
    case Start => ActorTest.genRef ! Request 
    case x: Int => { 
     counter += x 
     Log.info("Accumulator", "counter = " + counter) 
     self ! Start 
    } 
    } 
} 

Diese Lösung vollständig asynchron ist, und Sie keine Timeout benötigen.

+0

Ja, das funktioniert, wenn ich mit Futures aufgeben würde. Chaining Futures ist in meinem Beispiel ohne weiteres möglich, wenn sie mit _pipeTo self_ enden, aber mit der Fire-and-Forget-Semantik nicht mehr möglich sind. Stattdessen müsste ich N Zwischennachrichten in der Empfangsfunktion des Akkumulators definieren, nur um zu garantieren, dass der Code innerhalb des Threads dieses Actors läuft. Ich denke, ich kann noch einmal fragen, dieses Mal klarer: Gibt es eine Möglichkeit, eine Serie von Futures dazu zu zwingen, im gleichen Thread wie ein bestimmter Actor zu laufen? – gsimard

+0

Warum müssen Sie sicherstellen, dass der 'Accumulator' Actor immer im selben Thread läuft? Das scheint gegen die Schauspieler-Modell-Philosophie zu sein. Das Gleiche gilt für Futures: Sie sollten in einem Threadpool versandt werden, um die Leistung zu maximieren. Wenn sie alle in demselben Thread in einer Kette laufen, haben Sie nur ein einfaches sequenzielles Programm und Sie brauchen keine Futures mehr ... – paradigmatic

+0

Eigentlich ist es egal, ob es im selben Thread läuft oder nicht, die Anforderung ist mehr, dass es sequentiell ausgeführt werden sollte, genauso wie die Nachrichten eines einzelnen Akteurs verarbeitet werden. Dies geht nicht gegen das Akteurmodell, es ist das Akteurmodell. – gsimard