2012-04-04 3 views
2

Ich versuche, Akka zu verwenden, um einen TCP-Server für ein benutzerdefiniertes Anwendungsprotokoll zu implementieren. Ich versuche, dem Beispiel zu folgen, das hier gegeben wird: http://doc.akka.io/docs/akka/2.0/scala/io.html, um nicht blockierende IO innerhalb einer for ... Yield-Schleife zu machen.Seltsames Versuchs-/Fangverhalten mit Scala + Akka

Ich finde, wenn ich eine Ausnahme aus dem Yield-Block werfen, kann ich es nicht von außerhalb des Blocks fangen. Ich glaube, ich habe ein fundamentales Missverständnis darüber, wie Akka oder Scala hier arbeiten und ich würde mich über alle Tipps freuen.

import akka.actor._ 
import java.net.InetSocketAddress 

class EchoServer(port: Int) extends Actor { 

    val state = IO.IterateeRef.Map.async[IO.Handle]()(context.dispatcher) 

    override def preStart { 
    IOManager(context.system) listen new InetSocketAddress(port) 
    } 

    def receive = { 
    case IO.NewClient(server) => 
     val socket = server.accept() 
     state(socket) flatMap (_ => EchoServer.processRequest(socket)) 
    case IO.Read(socket, bytes) => 
     state(socket)(IO.Chunk(bytes)) 
    case IO.Closed(socket, cause) => 
     state(socket)(IO.EOF(None)) 
     state -= socket 
    } 
} 

object EchoServer extends App 
{ 
    def processRequest(socket: IO.SocketHandle): IO.Iteratee[Unit] = 
    { 
    println("In process request") 
    try { 
     for { 
     bs <- IO take 1 
     } yield { 
     println("I'll get here") 
     throw new Exception("Hey-o!") 
     println("But not here ... as expected") 
     } 
    } catch { 
     case e: Exception => println("And not here ... wtf?"); IO.Done() // NEVER GETS HERE 
    } 
    } 

    ActorSystem().actorOf(Props(new EchoServer(8080))) 
} 

Vielleicht bequem, den Kern hier zu folgen: https://gist.github.com/2296554

Kann jemand erklären, warum Steuer meines nicht catch-Block in dieser Situation erreicht

Ich habe unten den Code diesen gekocht?

Ich habe bemerkt, dass, wenn ich auf die Debug-Protokollierung in Akka drehen, ich diese Meldung in der Ausgabe sehen:

[DEBUG] [04/03/2012 22:42:25.106] [EchoServerActorSystem-akka.actor.default-dispatcher-1] [Future] Hey-o! 

Also ich denke, die Ausnahme vom Dispatcher Akka behandelt wird immer? Kann mir jemand erklären, wie das möglich ist?

Antwort

6

Der Punkt des nicht blockierenden IO ist natürlich, dass es keine Garantie gibt, wann und wo es ausgeführt wird. Denken Sie daran, dass man das für Verständnis schreiben kann als

(IO take 1).map(bs => { 
    println("I'll get here"); throw // ... 
} 

Was macht dieser Code? IO take 1 gibt einige nicht blockierende Future-ähnliche Sache zurück, der dann eine Transformationsfunktion durch die map-Methode angehängt wird. I.e. Wann immer (und wo auch immer) IO take 1 bereit ist, wendet es das map auf das Ergebnis an.

dies geschieht in einem anderen Thread Alle (oder irgendeine andere Weise, die nicht blockierende Semantik der Implementierung verwendet wird), so gibt es keine Möglichkeit für die try-catch auf irgendwelchen Exception s geworfen zu reagieren. Auch würde die bs => println(…) … Methode Ihrer Ausnahmebehandlung nicht kennen. Alles, was es weiß, dass es einige Eingaben bs transformieren und ein Ergebnis haben soll, wenn es fertig ist.

Die Lektion zu lernen: Vermeiden Sie bei Verwendung von nicht blockierenden Code Nebenwirkungen. Vor allem, wenn die Nebenwirkungen den Ablauf der Ausführung ändern.

Um die Ausnahme tatsächlich zu fangen, ich glaube, Sie werden es zu strukturieren haben wie folgt (ungetestet, siehe API):

def processRequest(socket: IO.SocketHandle): IO.Iteratee[Unit] = 
    { 
    println("In process request") 
    (
     for { 
     bs <- IO take 1 
     } yield { 
     println("I'll get here") 
     throw new Exception("Hey-o!") 
     println("But not here ... as expected") 
     } 
    ) recover { 
     case e: Exception => println("And here ...?"); IO.Done() 
    } 
    } 
+0

Vielen Dank für die sehr hilfreiche Antwort, löscht es wirklich Dinge. –