2016-07-26 17 views
2

Ich habe einen Strom von Ereignissen und ich möchte eine Funktion aufrufen, die ein Versprechen für jedes dieser Ereignisse zurückgibt, das Problem ist, dass diese Funktion sehr teuer ist, also möchte ich höchstens n Ereignisse auf einmal verarbeiten.Wie verarbeitet man RxJS stream n Elemente gleichzeitig und sobald ein Element fertig ist, automatisch wieder auf n zurück?

Dieser kiesel Diagramm ist wahrscheinlich falsch, aber das ist, was würde Ich mag:

---x--x--xxxxxxx-------------x-------------> //Events 
---p--p--pppp------p-p-p-----p-------------> //In Progress 
-------d--d--------d-d-dd------dddd--------> //Promise Done 

---1--21-2-34-----------3----4-3210-------- //QUEUE SIZE CHANGES 

Dies ist der Code ist, dass ich so weit:

var n = 4; 
var inProgressCount = 0; 

var events$ = Rx.Observable.fromEvent(produceEvent, 'click') 
    .map((ev) => new Date().getTime()); 

var inProgress$ = events$.controlled(); 

var done$ = inProgress$  
    .tap(() => inProgressCount++) 
    .flatMap((timestamp) => Rx.Observable.fromPromise(expensiveComputation(getRandomInt(1, 5) * 1000, timestamp))); 

done$.subscribeOnNext((timestamp) => { 
    inProgressCount--; 
    inProgress$.request(Math.max(1, n - inProgressCount)); 
}); 

inProgress$.request(n); 

Es gibt zwei Probleme mit diesem Code:

  1. Es ist mit dem inProgressCount var, die mit Seiten Effekt Funktion aktualisiert s.
  2. Das $ done-Abonnement wird nur einmal aufgerufen, wenn ich mehr als 1 Element aus dem kontrollierten Stream anfordere. Dies führt dazu, dass die Variable inProgressCount nicht korrekt aktualisiert wird. Dadurch wird die Warteschlange möglicherweise auf einen einzelnen beschränkt.

Sie können es sehen, in hier zu arbeiten: http://jsbin.com/wivehonifi/1/edit?js,console,output

Fragen:

  1. Gibt es einen besseren Ansatz?
  2. Wie kann ich die Variable inProgressCount loswerden?
  3. Warum wird das done $ -Abonnement nur einmal aufgerufen, wenn mehrere Elemente angefordert werden?

Update:
Antwort auf Frage 3 #: switchMap ist die gleiche wie flatMapLatest, ist also, warum ich nur die letzte bekommen. Der Code wurde in FlatMap anstelle von SwitchMap aktualisiert.

Antwort

3

Sie müssen eigentlich überhaupt keinen Gegendruck verwenden. Es gibt einen Operator namens flatMapWithMaxConcurrent, der das für Sie erledigt. Es ist im Wesentlichen ein Alias ​​für den Aufruf .map().merge(concurrency) und es erlaubt nur eine maximale Anzahl von Streams gleichzeitig zu fliegen.

ich Ihre jsbin hier aktualisiert: http://jsbin.com/weheyuceke/1/edit?js,output

Aber ich kommentierte die wichtige Bit unter:

const concurrency = 4; 

var done$ = events$ 
    //Only allows a maximum number of items to be subscribed to at a time 
    .flatMapWithMaxConcurrent(concurrency, 
    ({timestamp}) => 
     //This overload of `fromPromise` defers the execution of the lambda 
     //until subscription      
     Rx.Observable.fromPromise(() => { 
     //Notify the ui that this task is in progress         
     updatePanelAppend(inProgress, timestamp); 
     removeFromPanel(pending, timestamp); 
     //return the task 
     return expensiveComputation(getRandomInt(1, 5) * 1000, timestamp) 
    })); 
+1

In RxJs 5 müssen Sie .flatMapWithMaxConcurrent mit .mergeMap ersetzen (Mapper (resultSelectorFunction | null), Parallelität) [mergeMap] (http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#instance-method-mergeMap) – user3717718