Ich habe einen Strom von Ereignissen und ich möchte eine Funktion aufrufen, die ein Versprechen für jedes dieser Ereignisse zurückgibt, das Problem ist, dass diese Funktion sehr teuer ist, also möchte ich höchstens n Ereignisse auf einmal verarbeiten.Wie verarbeitet man RxJS stream n Elemente gleichzeitig und sobald ein Element fertig ist, automatisch wieder auf n zurück?
Dieser kiesel Diagramm ist wahrscheinlich falsch, aber das ist, was würde Ich mag:
---x--x--xxxxxxx-------------x-------------> //Events
---p--p--pppp------p-p-p-----p-------------> //In Progress
-------d--d--------d-d-dd------dddd--------> //Promise Done
---1--21-2-34-----------3----4-3210-------- //QUEUE SIZE CHANGES
Dies ist der Code ist, dass ich so weit:
var n = 4;
var inProgressCount = 0;
var events$ = Rx.Observable.fromEvent(produceEvent, 'click')
.map((ev) => new Date().getTime());
var inProgress$ = events$.controlled();
var done$ = inProgress$
.tap(() => inProgressCount++)
.flatMap((timestamp) => Rx.Observable.fromPromise(expensiveComputation(getRandomInt(1, 5) * 1000, timestamp)));
done$.subscribeOnNext((timestamp) => {
inProgressCount--;
inProgress$.request(Math.max(1, n - inProgressCount));
});
inProgress$.request(n);
Es gibt zwei Probleme mit diesem Code:
- Es ist mit dem
inProgressCount
var, die mit Seiten Effekt Funktion aktualisiert s. Das $ done-Abonnement wird nur einmal aufgerufen, wenn ich mehr als 1 Element aus dem kontrollierten Stream anfordere. Dies führt dazu, dass die VariableinProgressCount
nicht korrekt aktualisiert wird. Dadurch wird die Warteschlange möglicherweise auf einen einzelnen beschränkt.
Sie können es sehen, in hier zu arbeiten: http://jsbin.com/wivehonifi/1/edit?js,console,output
Fragen:
- Gibt es einen besseren Ansatz?
- Wie kann ich die Variable
inProgressCount
loswerden? Warum wird das done $ -Abonnement nur einmal aufgerufen, wenn mehrere Elemente angefordert werden?
Update:
Antwort auf Frage 3 #: switchMap ist die gleiche wie flatMapLatest, ist also, warum ich nur die letzte bekommen. Der Code wurde in FlatMap anstelle von SwitchMap aktualisiert.
In RxJs 5 müssen Sie .flatMapWithMaxConcurrent mit .mergeMap ersetzen (Mapper (resultSelectorFunction | null), Parallelität) [mergeMap] (http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#instance-method-mergeMap) – user3717718