2016-04-01 13 views
2

Ich arbeite an Anwendungsfall, in dem ich interdependent Operationen ausführen muss (definiert als eine gerichtete azyklische Grafik) mit scala Future. Grundsätzlich wird jede Operation (zB Knoten der DAG) in einer Zukunft ausgeführt und nachfolgende abhängige Knoten werden ausgelöst (sie sollten auch in einer Zukunft sein), sobald der aktuelle Knoten Future fertig ist. Dies wird fortgesetzt, bis jeder Knoten die Verarbeitung beendet hat oder einer von ihnen fehlschlägt. Bisher habe ich (minimaler Code):Ausführen von DAG wie Operationen in Scala Future

def run(node: Node, result: Result): Unit = { 
    val f: Future[(Node, Result)] = Future { 
    // process current Node 
    ... 
    } 

    f onComplete { 
    case Success(x) => 
     val n = x._1 // Current Node 
     val r = x._2 // Result of current Node 
     if (!n.isLeaf()) { 
     n.children.foreach { z => 
      run(z, r) 
     } 
     } 
    case Failure(e) => throw e 
    } 
} 

Ist das der richtige Weg, um dieses Problem anzugehen (Aufruf einer anderen Zukunft in einem Rückruf)? Wieder habe ich keinen richtigen Weg, andere laufende Zukunft zu stoppen, wenn einer der Knoten die Verarbeitung nicht besteht.

Kann dies mit Future-Zusammensetzung gelöst werden? Wenn ja, wie kann ich das erreichen?

Danke,
Pravin

+1

Jede funktionale Berechnung ist eine DAG, und Sie können Ihre Berechnung einfach in 'Future's und' flatMap' verpacken, die verschachtelten 'Futures' weg. Wenn Sie die "hardcore" Funktionelle Straße gehen wollen, sollten Sie sich 'FreeAp' in' scalaz' anschauen. – ziggystar

+0

@ziggystart Ich bin nicht sicher, wie dies mit FlatMap erreicht werden kann, haben Sie eine Idee (vielleicht ein Beispielcode)? –

+0

'flatMap' funktioniert nicht, da der Rückgabetyp von' run' 'Unit' nicht' Future' ist. –

Antwort

1

Hier ist ein funktioneller Ansatz: Statt Unit als Ergebnis der Auswertung der wir run/Future unter Verwendung eines generischen Typs haben kann. Normalerweise würden Sie lieber mit den Ergebnissen des Future funktional arbeiten, als mit seinen Nebenwirkungen.

Ich habe Typ Anmerkungen und beschreibende Variablennamen hinzugefügt, so dass es leichter zu verstehen wäre. Ich habe auch ein paar Fälle hinzugefügt, um zu zeigen, wie es scheitern wird. Sie können auch festlegen, dass bei einem Fehler alles wiederhergestellt und nicht fehlgeschlagen wird. Für dieses Problem, wenn die untergeordnete Berechnung auf dem übergeordneten Wert beruht, ist es wahrscheinlich sinnvoller, einen Fehler zu verursachen. diese

import scala.concurrent.{Await, Future} 
import scala.concurrent.ExecutionContext.Implicits.global 
import scala.concurrent.duration._ 
import scala.util.Try 

case class Node[T](value: T, children: List[Node[T]]) 

object DagFuture extends App { 

    def run[A, B](node: Node[A], result: B)(nodeEval: (Node[A], B) => B)(aggregator: List[B] => B): Future[B] = { 
    val nodeResult: Future[B] = Future(nodeEval(node, result)) 
    val allResults: Future[List[B]] = nodeResult.flatMap(r => Future.sequence(nodeResult :: node.children.map(x => run(x, r)(nodeEval)(aggregator)))) 
    val finalResult: Future[B] = allResults.map(cl => aggregator(cl)) 
    finalResult 
    } 

    val debugSum = (l: List[Int]) => { 
    println(s"aggregating: $l") 
    l.sum 
    } 

    def debugNodeEval(f: (Node[Int], Int) => Int)(n: Node[Int], r: Int) = { 
    val eval = Try { f(n, r) } 
    println(s"node: $n, result: $r, eval: $eval") 
    eval.get 
    } 

    val debugNodeEvalDefault = debugNodeEval((n, r) => n.value + r) _ 

    val singleNodeDag = Node(1, Nil) 
    val multiNodeDag = Node(1, List(Node(20, Nil), Node(300, Nil))) 

    println("\nSINGLE NODE DAG EXAMPLE:") 
    val singleNodeFuture = run(singleNodeDag, 0)(debugNodeEvalDefault)(debugSum) 
    val singleNodeResult = Await.result(singleNodeFuture, 5 seconds) 
    println(s"Single node result: $singleNodeResult") 

    println("\nDAG PATH LENGTH EXAMPLE:") 
    val pathLengthFuture = run(multiNodeDag, 0)(debugNodeEvalDefault)(debugSum) 
    val pathLengthResult = Await.result(pathLengthFuture, 5 seconds) 
    println(s"Path length: $pathLengthResult") 

    println("\nFAILED DAG ROOT NODE EXAMPLE:") 
    val failedRootNodeFuture = run(multiNodeDag, 0)(debugNodeEval((n, r) => throw new Exception))(debugSum) 
    val failedRootNodePromise = Await.ready(failedRootNodeFuture, 5 seconds) 
    println(s"Failed root node: ${failedRootNodePromise.value}") 

    println("\nFAILED DAG CHILD NODE EXAMPLE:") 
    val failedChildNodeFuture = run(multiNodeDag, 0)(debugNodeEval((n, r) => if (n.value == 300) throw new Exception else n.value + r))(debugSum) 
    val failedChildNodePromise = Await.ready(failedChildNodeFuture, 5 seconds) 
    println(s"Failed child node: ${failedChildNodePromise.value}") 
} 

Drucke:

SINGLE NODE DAG EXAMPLE: 
node: Node(1,List()), result: 0, eval: Success(1) 
aggregating: List(1) 
Single node result: 1 

DAG PATH LENGTH EXAMPLE: 
node: Node(1,List(Node(20,List()), Node(300,List()))), result: 0, eval: Success(1) 
node: Node(20,List()), result: 1, eval: Success(21) 
node: Node(300,List()), result: 1, eval: Success(301) 
aggregating: List(301) 
aggregating: List(21) 
aggregating: List(1, 21, 301) 
Path length: 323 

FAILED DAG ROOT NODE EXAMPLE: 
node: Node(1,List(Node(20,List()), Node(300,List()))), result: 0, eval: Failure(java.lang.Exception) 
Failed root node: Some(Failure(java.lang.Exception)) 

FAILED DAG CHILD NODE EXAMPLE: 
node: Node(1,List(Node(20,List()), Node(300,List()))), result: 0, eval: Success(1) 
node: Node(20,List()), result: 1, eval: Success(21) 
aggregating: List(21) 
node: Node(300,List()), result: 1, eval: Failure(java.lang.Exception) 
Failed child node: Some(Failure(java.lang.Exception)) 

TL; DR

def run[A, B](node: Node[A], result: B)(nodeEval: (Node[A], B) => B)(aggregator: Traversable[B] => B): Future[B] = { 
    val nodeResult = Future(nodeEval(node, result)) 
    val allResults = nodeResult flatMap { r => Future.sequence(nodeResult :: node.children.map { x => run(x, r)(nodeEval)(aggregator) }) } 
    allResults map aggregator 
    } 

Grob gesagt, es ist nur ein Future.flatMap(result => Future.sequence(children ...)). Wenn das übergeordnete Objekt Future sein Ergebnis abgeschlossen hat, wird es unter flatMap an die Kinderberechnung übergeben. Wenn Eltern Future fehlschlägt, schlägt die gesamte Berechnung ebenfalls fehl. sequence kombiniert das Ergebnis aus der Liste Future s in eine einzige Future. Ein Kind Future ist ein Elternteil zu seinen Kindern und so weiter rekursiv. Somit gilt der gleiche Fehlermodus.