2010-03-15 12 views
13

Ich habe eine asynchrone Steuerfluss wie folgt aus:Wie kann man einen asynchronen Kontrollfluss unter Verwendung von Fortsetzungen teilen und versenden?

ActorA ! DoA(dataA, callback1, callbackOnErrorA) 

def callback1() = { 
    ... 
    ActorB ! DoB(dataB, callback2, callbackOnErrorB) 
} 

def callback2() = { 
    ActorC ! DoC(dataC, callback3, callbackOnErrorC) 
} 

... 

Wie würde ich diese Strömung in mehrere Teile (Fortsetzungen) unterteilen und zu versenden nacheinander diese verschiedenen Akteure (oder Fäden/tasks), während der Gesamtzustand beibehalten ?

Jeder Hinweis geschätzt, Dank

Antwort

7

Dies ist sehr vereinfacht, aber zeigt, wie eine einzige Steuerfluss zwischen drei Akteuren aufzuteilen, zusammen zu jeder den Zustand vorbei:

package blevins.example 

import scala.continuations._ 
import scala.continuations.ControlContext._ 
import scala.actors.Actor._ 
import scala.actors._ 

object App extends Application { 

    val actorA, actorB, actorC = actor { 
    receive { 
     case f: Function1[Unit,Unit] => { f() } 
    } 
    } 

    def handle(a: Actor) = shift { k: (Unit=>Unit) => 
    a ! k 
    } 

    // Control flow to split up 
    reset { 
     // this is not handled by any actor 
     var x = 1 
     println("a: " + x) 

     handle(actorA) // actorA handles the below 
     x += 4 
     println("b: " + x) 

     handle(actorB) // then, actorB handles the rest 
     var y = 2 
     x += 2 
     println("c: " + x) 

     handle(actorC) // and so on... 
     y += 1 
     println("d: " + x + ":" + y) 
    } 

} 
+0

Großartig, danke! – hotzen

9

Ich mag scalaz.concurrent.Promise verwenden. Dieses Beispiel ist nicht genau wie das in Ihrer Frage, aber es gibt Ihnen die Idee.

object Async extends Application { 
    import scalaz._ 
    import Scalaz._ 
    import concurrent._ 
    import concurrent.strategy._ 
    import java.util.concurrent.{ExecutorService, Executors} 

    case class ResultA(resultb: ResultB, resulta: ResultC) 
    case class ResultB() 
    case class ResultC() 

    run 

    def run { 
    implicit val executor: ExecutorService = Executors.newFixedThreadPool(8) 
    import Executor.strategy 

    val promiseA = doA 
    println("waiting for results") 
    val a: ResultA = promiseA.get 
    println("got " + a) 
    executor.shutdown  
    } 

    def doA(implicit s: Strategy[Unit]): Promise[ResultA] = { 
    println("triggered A") 
    val b = doB 
    val c = doC 
    for {bb <- b; cc <- c} yield ResultA(bb, cc) 
    } 

    def doB(implicit s: Strategy[Unit]): Promise[ResultB] = { 
    println("triggered B") 
    promise { Thread.sleep(1000); println("returning B"); ResultB() } 
    } 

    def doC(implicit s: Strategy[Unit]): Promise[ResultC] = { 
    println("triggered C") 
    promise { Thread.sleep(1000); println("returning C"); ResultC() } 
    } 
} 

Ausgang:

triggered A 
triggered B 
triggered C 
waiting for results 
returning B 
returning C 
got ResultA(ResultB(),ResultC()) 

Sie erhalten eine Einführung in die Scalaz Gleichzeitigkeit in diesem presentation von Runar finden.

Dieser Ansatz ist nicht so flexibel wie Actors, aber komponiert besser und kann nicht blockieren.

+0

Nice Scalaz Promise-Beispiel, danke. Ich möchte jedoch ein tieferes Verständnis für die neue Scala2.8-CPS-Sache bekommen und würde mich über eine CPS-spezifische Antwort freuen. – hotzen

+0

+1 für die Vorteile von Futures over Actors und die Verwendung von impliziten Vals für die Definition der Strategien. – thSoft