2012-04-10 4 views
6

Ich Modellierung eines einfachen P2P mit Scala und Akka:Akka Actor "fragen" und "Wartet" mit Timeout

class Node() extends Peer with Actor { 

    var peers: List[ActorRef] = List() 

    def receive = { 
    case _register(peer: ActorRef, p: Option[Int]) => { 
     println("registering [" + peer + "] for [" + this + "]") 
     peers = peer :: peers 
    } 
    } 

} 

sealed case class _register(val peer: ActorRef, var p: Option[Int] = None) 

und dann ein einfaches Netzwerk:

class Network() extends Actor { 

    def this(name: String) = { 

    this() 

    val system = ActorSystem(name) 

    val s1 = system.actorOf(Props(new Node()), name = "s1") 
    val s2 = system.actorOf(Props(new Node()), name = "s2") 

    val c1 = system.actorOf(Props(new Node()), name = "c1") 
    val c2 = system.actorOf(Props(new Node()), name = "c2") 
    val c3 = system.actorOf(Props(new Node()), name = "c3") 
    val c4 = system.actorOf(Props(new Node()), name = "c4") 

    implicit val timeout = Timeout(5 second) 

    s1 ? _register(c1) 
    s1 ? _register(c2) 
    s1 ? _register(c3) 
    val lastRegistered = s2 ? _register(c4) 
    Await.ready(lastRegistered, timeout.duration) 

    println("initialized nodes") 
    } 
} 

Die Ausgabe, die ich bin immer ist immer wie:

registering [Actor[akka://p2p/user/c1]] for [[email protected]] 
registering [Actor[akka://p2p/user/c2]] for [[email protected]] 
registering [Actor[akka://p2p/user/c3]] for [[email protected]] 
registering [Actor[akka://p2p/user/c4]] for [[email protected]] 
[ERROR] [04/10/2012 22:07:04.34] [main-akka.actor.default-dispatcher-1] [akka://main/user/p2p] error while creating actor 
java.util.concurrent.TimeoutException: Futures timed out after [5000] milliseconds 
    at akka.dispatch.DefaultPromise.ready(Future.scala:834) 
    at akka.dispatch.DefaultPromise.ready(Future.scala:811) 
    at akka.dispatch.Await$.ready(Future.scala:64) 
    at nl.cwi.crisp.examples.p2p.scala.Network.<init>(Node.scala:136) 
    at nl.cwi.crisp.examples.p2p.scala.Main$$anonfun$11.apply(Node.scala:164) 
    at nl.cwi.crisp.examples.p2p.scala.Main$$anonfun$11.apply(Node.scala:164) 
    at akka.actor.ActorCell.newActor(ActorCell.scala:488) 
    at akka.actor.ActorCell.create$1(ActorCell.scala:506) 
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:591) 
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:191) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:160) 
    at akka.dispatch.ForkJoinExecutorConfigurator$MailboxExecutionTask.exec(AbstractDispatcher.scala:505) 
    at akka.jsr166y.ForkJoinTask.doExec(ForkJoinTask.java:259) 
    at akka.jsr166y.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:997) 
    at akka.jsr166y.ForkJoinPool.runWorker(ForkJoinPool.java:1495) 
    at akka.jsr166y.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104) 

ich die Dokumentation vongefolgtauf Akka Referenzdokumentation. Der Austausch von Await.ready mit Await.result hat keine Auswirkungen. Das Protokoll zeigt, dass die letzte Registrierung erfolgreich war.

Wie soll ich das beheben?

Antwort

8

Sie suchen eine Nachricht warten, von dem Node Schauspieler zurückgegeben werden, aber der Knoten Schauspieler eine Nachricht zurück an den sender actorRef, so die Zukunft [Alles] erstellt von s1 ? _register nicht wird nie eine Antwort erhalten senden, so dass das Die Zukunft wird niemals vollständig sein. Sie könnten sender ! something aus dem Node receive Methode hinzufügen, um eine Antwort zu senden, ich bin mir nicht sicher, was something in diesem Fall sinnvoll ist.

+1

Danke, das war ein schlampiger Fehler! 'Absender! Keines würde für mich funktionieren. – nobeh

8

Eintopf hat all das zu Recht, aber Sie haben einige Besorgnis erregenden Code in Ihrem Netzwerk Schauspieler bekommen:

val system = ActorSystem(name) 

val s1 = system.actorOf(Props(new Node()), name = "s1") 
val s2 = system.actorOf(Props(new Node()), name = "s2") 

val c1 = system.actorOf(Props(new Node()), name = "c1") 
val c2 = system.actorOf(Props(new Node()), name = "c2") 
val c3 = system.actorOf(Props(new Node()), name = "c3") 
val c4 = system.actorOf(Props(new Node()), name = "c4") 

Warum Sie eine neue ActorSystem zu schaffen, und warum schaffen Sie Top-Level-Akteure innerhalb dieses Aktorik ?

Wenn Sie Zugriff auf die Schauspieler des Systems benötigen, können Sie einfach Aufruf:

context.system 

Und Sie sollten die Schaffung Top-Level-Akteure „nur weil“, aus dem gleichen Grund vermeiden, dass Sie nicht die Wurzel Krempel sollte Ihres Dateisystems, indem Sie alle Ihre Dateien dort ablegen. Kind-Akteure Netzwerk zu erstellen, tut gerade:

context.actorOf(...) 

Gerade jetzt Sie, sobald ein Problem haben würden, wie Sie mehr als ein Netzwerk-Akteur in demselben System zu erstellen, wie es oben zu schaffen versucht, Schauspieler auf der gleichen Ebene.

+1

Danke für die klaren Tipps. – nobeh

+1

Sie sind herzlich willkommen. Happy hakking! –