2016-08-02 40 views
0

Ich habe eine "Arbeitswarteschlange", die ich beobachten/abonnieren möchte. Dies ist ein Array von zu verarbeitenden Befehlsobjekten. Neue Arbeitsaufgaben kommen typischerweise in Bursts an und sie müssen seriell verarbeitet werden (in der Reihenfolge, in der sie empfangen werden, eins nach dem anderen, bis sie vollständig verarbeitet sind).Konvertieren von RxJS v4-Code in v5, Verarbeiten einer Warteschlange mit einem "Pull"

Ich benutze RxJS 5.0.0-beta.6. (Version von anderen Bibliotheken)

Hier ist ein Arbeitsbeispiel, das das gewünschte Verhalten veranschaulicht, aber RxJS v4 verwendet.

Der Haupt Code in Frage lautet ...

var events$ = Rx.Observable.fromEvent(produceEvent, 'click') 
    .timestamp() 
    .tap(({timestamp}) => updatePanelAppend(pending, timestamp)); 

var inProgress$ = events$; 

var done$ = inProgress$ 
    .flatMapWithMaxConcurrent(1, ({timestamp}) => 
          Rx.Observable.fromPromise(() => { 
           updatePanelAppend(inProgress, timestamp); 
           removeFromPanel(pending, timestamp); 
           return expensiveComputation(getRandomInt(1, 5) * 1000, timestamp) 
          })); 

done$.subscribeOnNext((timestamp) => { 
    updatePanelAppend(done, timestamp); 
    removeFromPanel(inProgress, timestamp); 
}); 

http://jsbin.com/meyife/edit?js,output

der aktuelle Beta-Zustand des API gegeben und unvollständig/Wechsel Dokumentation, ich kann nicht herausfinden, wie dies zu tun in RxJS 5.

aktualisieren: Dieses migration guide für von v4 zu v5 bewegenden zeigt viele Funktion, die entfernt wurden, aber direkt nicht, wie die Dinge, die neue Art und Weise zu tun. Beispiele für entfernte Operationen: .tap, .controlled, .flatMapWithMaxConcurrent (umbenannt).

+0

Frage, wo v4 Lösung herkam: http://stackoverflow.com/questions/38601451/how-to-process-rxjs-stream-n-items-at- a-time-and-only-ein-Item-is-done-AutoFill –

Antwort

1
  • flatMap/mergeMap - nimmt nun einen gemeinsamen Zugriff Parameter

  • tap ->do

  • subscribeOnNext nicht mehr existiert, so subscribe nur mit einem einzigen Parameter verwenden.

  • fromPromise Überladung existiert nicht auf RxJS 5, verwenden Sie stattdessen defer.

Siehe aktualisiert jsbin here

+0

Ausgezeichnet! Sehr hilfreich! Hinweis: Nachdem die RxJS-Version auf 5.0.0-beta.6 geändert wurde, mussten einige Änderungen vorgenommen werden. Der Operator .timestamp() existiert in dieser Version nicht. Also zog ich gerade den timeStamp aus dem Mausklick-Event, da dies für den Code nicht zentral war. –