2016-08-08 49 views
0

Ich habe diese Observable:Wie konvertiere ich dieses RxJava-Observable, um alle 200 ms Ergebnisse zu erhalten?

Observable<String> concatenatedSets = 
      Observable.just("1/5/8", "1/9/11/58/16/", "9/15/56/49/21"); 

    concatenatedSets.flatMap(s -> Observable.from(s.split("/"))) 
      .map(s -> Integer.valueOf(s)) 
      .subscribeOn(Schedulers.computation()) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribe(i -> tvCounter.setText(String.valueOf(i))); 

wie konvertiere ich es oder welche Bediener kann die Kette ich es so, dass onNext mit 200 ms zwischen Anrufe genannt wird?

ich bei dieser Frage SO suchen: Pause between call onNext in RxJava

aber Antworten dort beginnen mit einer Pause zu schaffen interval mit wie so

Observable.interval(100, TimeUnit.MILLISECONDS) 

Allerdings bin ich meine beobachtbaren auf eine andere Weise zu schaffen (mit just), also wie mische ich beide Funktionalitäten (just und interval) oder was mache ich, um eine Pause von 200 ms zwischen den Emissionen zu erreichen?

PS: gerade versucht delay - es aufgeschoben Gesamtausführung einmal um die Zeit, die ich zur Verfügung gestellt. zip könnte die Antwort sein, aber es ist sehr verwirrend, also wie zip ich meine beobachtbare und ein Intervall eins?

+0

haben Sie versuchen, Operator zu verzögern? oder diese http://stackoverflow.com/questions/33291245/rxjava-delay-for-each-item-of-list-emitted –

+0

gerade versuchte 'Verzögerung' - es aufgeschoben Gesamtausführung einmal um die Zeit, die ich zur Verfügung gestellt. 'zip' könnte die Antwort sein, aber es ist sehr verwirrend, also wie zippe ich meine beobachtbare und ein Intervall eins? –

+0

Sehen Sie sich zuerst den benutzerdefinierten Operator an. Vergessen, es zu bemerken :) https://gist.github.com/matir91/aac2c1318a1a1facd811 –

Antwort

2
Observable<Integer> observable = Observable.just("1/5/8", "1/9/11/58/16/", "9/15/56/49/21") 
       .flatMap(s -> Observable.from(s.split("/"))) 
       .map(s -> Integer.valueOf(s)); 

Observable<Integer> zippedObservable = Observable.zip(
       observable, 
       Observable.interval(100, TimeUnit.MILLISECONDS), 
       (number, interval) -> number); 

zippedObservable 
     .subscribeOn(Schedulers.computation()) 
     .observeOn(AndroidSchedulers.mainThread()) 
     .subscribe(i -> tvCounter.setText(String.valueOf(i))); 
+0

Danke! Was macht das Funktionsargument in '.zip'? –

+1

Die Funktion nimmt zwei Objekte aus beiden Observablen und gibt ein Objekt zurück, das von zip Observable emitiert wird. Hier ist das Dokument: http://reactivex.io/RxJava/javadoc/rx/Observable.html#zip(rx.Observable,%20rx.Observable,%20rx.functions.Func2) – nshmura