2016-04-08 6 views
1

Ist es möglich, wiederholte Senden/Antworten zu demselben Dart-Isolat innerhalb einer einzigen asynchronen Funktion einzukapseln?verkapselte wiederholte Senden/Antworten an das gleiche Dart-Isolat innerhalb einer einzigen asynchronen Funktion

Hintergrund:

Um eine günstige API zu entwerfen, würde Ich mag eine Funktion asynchron das Ergebnis mit einem Isolat erzeugte Rückkehr haben, z.B.

var ans = await askIsolate(isolateArgs); 

Dies funktioniert gut, wenn ich direkt die durch einen Aufruf spawnUri erzeugte Antwort verwenden, zB

Future<String> askIsolate(Map<String,dynamic> isolateArgs) { 

ReceivePort response = new ReceivePort(); 
var uri = Uri.parse(ISOLATE_URI); 

Future<Isolate> remote = Isolate.spawnUri(uri, [JSON.encode(isolateArgs)], response.sendPort); 
return remote.then((i) => response.first) 
       .catchError((e) { print("Failed to spawn isolate"); }) 
       .then((msg) => msg.toString()); 
} 

Der Nachteil des obigen Ansatzes ist jedoch, dass, wenn ich brauche askIsolate wiederholt zu nennen, die Isolat muss jedes Mal hervorgebracht werden.

Ich möchte stattdessen mit einem laufenden Isolat kommunizieren, was sicherlich möglich ist, indem das Isolat einen sendPort an den Aufrufer zurückgibt. Aber ich glaube, seit dem 2013 Isolate refactoring erfordert dies, dass der Anrufer nachfolgende Nachrichten auf dem Empfangsanschluss abhören muss, was eine Kapselung innerhalb einer einzigen asynchronen Funktion unmöglich macht.

Gibt es einen Mechanismus, um dies zu erreichen, die ich vermisse?

+1

Es ist ein, während ich mit Isolaten gespielt. https://pub.dartlang.org/packages/isolate soll eine nette API zum Isolieren bereitstellen. Ich nehme an, es lohnt sich, genauer hinzusehen. –

+1

Der 'IsolateRunner' in' package: isolate' ist zum mehrmaligen Aufruf einer Funktion in einem erzeugten Isolat vorgesehen. Ich denke, es wird für dieses Problem funktionieren: 'var runner = warten IsolateRunner.spawn(); for (var arg in etwas)} {... aware runner.run (queryFunction, arg); ...} awarner runner.close(); ' – lrn

+1

Eine weitere Möglichkeit besteht darin, dass ein Service-Isolat ausgeführt wird, aber anstatt das Ergebnis jedes Mal an denselben Port zurückgeben zu müssen, kann jede Anforderung ihren eigenen SendPort senden. Dann kann jede Anfrage einen "ResponsePort" erzeugen und den "ersten" Getter davon für die Antwort zurückgeben: Zukünftige askIsolate (isolateArgs) {var p = new ReceivePort(); runningIsolatePort.send ([isolateArgs, p.sendPort]); Rückkehr p.first; } '. – lrn

Antwort

1

Ein schnelles Arbeitsbeispiel basierend auf lrn's Kommentar oben folgt. Das Beispiel initialisiert ein Isolat über spawnURI und kommuniziert dann mit dem Isolat, indem es einen neuen Empfangsanschluss übergibt, auf den eine Antwort erwartet wird. Dadurch kann askIsolate eine Antwort von einem laufenden spawnURI-Isolat direkt zurückgeben.

Hinweis Fehlerbehandlung wurde aus Gründen der Übersichtlichkeit weggelassen.

Isolate Code:

import 'dart:isolate'; 
import 'dart:convert' show JSON; 

main(List<String> initArgs, SendPort replyTo) async { 
    ReceivePort receivePort = new ReceivePort(); 
    replyTo.send(receivePort.sendPort); 

    receivePort.listen((List<dynamic> callArgs) async { 
    SendPort thisResponsePort = callArgs.removeLast(); //last arg must be the offered sendport 
    thisResponsePort.send("Map values: " + JSON.decode(callArgs[0]).values.join(",")); 
    }); 
} 

Telefonvorwahl:

import 'dart:async'; 
import 'dart:isolate'; 
import 'dart:convert'; 


const String ISOLATE_URI = "http://localhost/isolates/test_iso.dart"; 
SendPort isolateSendPort = null; 

Future<SendPort> initIsolate(Uri uri) async { 
    ReceivePort response = new ReceivePort(); 
    await Isolate.spawnUri(uri, [], response.sendPort, errorsAreFatal: true); 
    print("Isolate spawned from $ISOLATE_URI"); 
    return await response.first; 
} 


Future<dynamic> askIsolate(Map<String,String> args) async { 
    if (isolateSendPort == null) { 
    print("ERROR: Isolate has not yet been spawned"); 
    isolateSendPort = await initIsolate(Uri.parse(ISOLATE_URI)); //try again 
    } 

    //Send args to the isolate, along with a receiveport upon which we listen for first response 
    ReceivePort response = new ReceivePort(); 
    isolateSendPort.send([JSON.encode(args), response.sendPort]); 
    return await response.first; 
} 

main() async { 
    isolateSendPort = await initIsolate(Uri.parse(ISOLATE_URI)); 

    askIsolate({ 'foo':'bar', 'biz':'baz'}).then(print); 
    askIsolate({ 'zab':'zib', 'rab':'oof'}).then(print); 
    askIsolate({ 'One':'Thanks', 'Two':'lrn'}).then(print); 
} 

Ausgabe

Isolate spawned from http://localhost/isolates/test_iso.dart 
Map values: bar,baz 
Map values: zib,oof 
Map values: Thanks,lrn 
2

Die Antwort hängt davon ab, wie Sie das Isolat

  • Haben Sie es auf unbestimmte Zeit am Laufen zu halten beabsichtigen, zu verwenden, es gibt das Senden und erwartete Antworten asynchron zu empfangen?

  • Möchten Sie das Isolat viele (aber endliche) Eingaben auf einmal senden, erwarten Sie, asynchron Antworten zu erhalten, und schließen Sie dann das Isolat?

Ich vermute das letztere, und Ihre askIsolate() Funktion muss sofort eine Future Rückkehr als abgeschlossen, wenn es alle Antworten erhält.

Die await for-Schleife kann verwendet werden, um auf einen Stream zu hören und Ereignisse bis zum Schließen zu verarbeiten.

Ich bin nicht vertraut mit Isolaten, also hoffe ich, das ist in Ordnung, ich habe es nicht getestet. Ich habe angenommen, dass das Isolat endet und die Antwort schließt.

String askIsolate(Map<String,dynamic> isolateArgs) async { 

    ReceivePort response = new ReceivePort(); 
    var uri = Uri.parse(ISOLATE_URI); 

    Isolate.spawnUri(uri, [JSON.encode(isolateArgs)], response.sendPort) 
    .catchError((e)) { 
    throw ...; 
    }); 

    List<String> answers = new List<String>; 

    await for(var answer in response) { 
    out.add(answer.toString()); 
    } 

    return answers; 
} 

Hinweis:

  • response ist der Strom Sie nach Antworten hören. Es ist erstellt vor Laich das Isolat, so dass Sie nicht warten müssen (und wahrscheinlich nicht), dass die Zukunft isolieren, bevor Sie es hören.

  • machte ich askIsolate() Asynchron, weil, dass es sehr einfach macht, sofort eine Zukunft zurückzukehren, die abgeschlossen ist, wenn die Funktion zurückgibt - ohne dass alle mühsam über mit .then(...) Ketten Ausmisten, die ich persönlich verwirrend und schwer zu lesen.

BTW, Ihre ursprüngliche then(...).catchError(...) Artcode besser wie folgt geschrieben werden würde:

Isolate.spawnUri(uri, [JSON.encode(isolateArgs)], response.sendPort) 
    .catchError((e) { ... }); 

    return response.first) 
    .then((msg) => msg.toString()); 

Ich glaube, dass Verzögerung der Linie einen catchError Handler Anbringen nach der Gründung des Isolat könnte die Zukunft erlauben, mit zu vervollständigen ein Fehler vor der Handler ist vorhanden.

Siehe: https://www.dartlang.org/articles/futures-and-error-handling/#potential-problem-failing-to-register-error-handlers-early.

+0

Eigentlich ist das ehemalige in deinen zwei Kugeln, was ich möchte. Danke für den Haken auf dem dann (...) .catchError (...) Problem! – ilikerobots

+0

Möchten Sie eine Funktion implementieren, die eine Eingabe an ein Isolat sendet, und eine Zukunft zurückgibt, die abgeschlossen wird, wenn das Isolat ein Ergebnis zurücksendet, das Sie wiederholt aufrufen können? –

2

auch bei IsolateRunner in package:isolate suchen empfehle ich, ist es beabsichtigt, Probleme wie diese zu lösen - Aufruf eine Funktion in dem gleichen Isolat mehrmals statt nur einmal, wenn das Isolat erstellt wird.

Wenn Sie nicht wollen, dass, gibt es andere, primitivere, Optionen

Async-Funktionen können auf Futures oder Streams warten und ein ReceivePort ist ein Strom. Für einen schnellen Hack, können Sie wahrscheinlich etwas mit einem await for auf dem Antwort-Stream tun, aber es wird nicht sehr praktisch sein.

Einpacken der ReceivePort in eine StreamQueue von package:async ist eine bessere Wahl. Damit können Sie die einzelnen Ereignisse in Futures umwandeln. Etwas wie:

myFunc() async { 
    var responses = new ReceivePort(); 
    var queue = new StreamQueue(responses); 
    // queryFunction sends its own SendPort on the port you pass to it. 
    var isolate = await isolate.spawn(queryFunction, [], responses.sendPort); 
    var queryPort = await queue.next(); 
    for (var something in somethingToDo) { 
    queryPort.send(something); 
    var response = await queue.next(); 
    doSomethingWithIt(response); 
    } 
    queryPort.send("shutdown command"); 
    // or isolate.kill(), but it's better to shut down cleanly. 
    responses.close(); // Don't forget to close the receive port. 
}