2016-04-29 15 views
0

Hallo Ich versuche, Daten in einer Datei zu verarbeiten.Scala warten auf die Liste der Zukunft zu führen

Dies ist der Code, den ich unten verwende.

Ich habe eine Liste von Futures und versuche, die Ausgabe von diesen Futures zu bekommen.

Alles ist gut, aber die letzte Zeile der Rückkehr wird vor OnSuccess ausgeführt.

Wie kann ich dieses Verhalten ändern, ohne eine Blockierung durchzuführen?

def processRow(rowNumber: Int, row: String, delimiter: String, rules: List[Rule]): RowMessage = { 
var cells = row.split(delimiter) 
var passedRules = new ListBuffer[RuleResult]() 
val failedRules = new ListBuffer[RuleResult]() 
val rulesFuture = rules.map { 
    i => Future { 
    val cells = row.split(delimiter); 
    //some processing.... 
    } 
} 
val f1 = Future.sequence(rulesFuture) 
f1 onComplete { 
    case Success(results) => for (result <- results) (result.map(x => { 
    if (x.isPassFailed) { 
     passedRules += x 
    } 
    else { 
     failedRules += x 
    } 
    })) 
    case Failure(t) => println("An error has occured: " + t.getMessage) 
} 
return new RowMessage(passedRules.toList, failedRules.toList) 
} 
+0

was hat es mit akka strömen zu tun? –

Antwort

1

Sie können nicht vermeiden, blockiert und eine einfache RowMessage zurückzukehren. Sie müssen auch eine Future zurückgeben.

Denken Sie auch an Ihren Algorithmus, um einen veränderlichen Zustand zu vermeiden, besonders wenn Sie ihn von verschiedenen Futures ändern.

Future.traverse entspricht Ihrem map + Future.sequence. Dann, anstatt onComplete, mappen Sie einfach Ihre Future um die Liste zu ändern. Sie können es einfach teilen mit partition statt was Sie getan haben.

Sie müssen nicht return verwenden, in der Tat sollten Sie nicht, wenn Sie nicht wissen, was Sie tun.

Btw isPassFailed klingt nicht wie eine vernünftige Methode Name für mich, vor allem wenn man bedenkt, dass, wenn es wahr ist, Sie es zu übergebenen Regeln hinzufügen.