TL; DR: Verschieben observeOn(AndroidSchedulers.mainThread())
unter filter(...)
.
subscribeOn(...)
verwendet wird, auf dem die Observable
auf Betrieb beginnt Thread zu bezeichnen. Nachfolgende Aufrufe an subscribeOn
werden ignoriert.
Wenn Sie also die folgenden schreiben sind, alles würde auf Schedulers.newThread()
ausgeführt werden:
myService.fetchSomeIntegersFromServer()
.subscribeOn(Schedulers.newThread())
.filter(integer -> {
System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
return true;
})
.subscribe(integer1 -> { doSomething(integer1); });
Nun, natürlich, das ist nicht das, was Sie wollen: Sie auf dem Haupt-Thread doSomething
wollen .
Das ist, wo observeOn
an Ort und Stelle kommt. Alle Aktionen nachobserveOn
werden auf diesem Scheduler ausgeführt. Daher wird in Ihrem Beispiel die filter
auf dem Haupt-Thread ausgeführt.
Stattdessen bewegen observeOn
bis kurz vor subscribe
:
myService.fetchSomeIntegersFromServer()
.subscribeOn(Schedulers.newThread())
.filter(integer -> {
System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
return true;
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(integer1 -> { doSomething(integer1) });
Nun filter
auf dem 'neuen Thread' passieren wird, und doSomething
auf dem Haupt-Thread.
noch weiter zu gehen, Sie observeOn
mehrfach verwenden können:
myService.fetchSomeIntegersFromServer()
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.computation())
.filter(integer -> {
System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
return true;
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(integer1 -> { doSomething(integer1) });
In diesem Fall auf einen neuen Thread, die Filterung auf einer Berechnung Gewinden und doSomething
auf die das Abrufen auftreten Haupt-Bedroung.
Checkout ReactiveX - SubscribeOn operator für die offizielle Dokumentation.
Danke für die detaillierte Antwort! Gibt es eine Möglichkeit, die Scheduler am Anfang der Methoden-Aufrufkette zu definieren (wie im OP)? – WonderCsabo
Die Observables folgen dem [Decorator Pattern] (https://en.wikipedia.org/wiki/Decorator_pattern), das ist also per Design nicht möglich. "observeOn" (und die anderen Methoden) geben nicht dieselbe "Observable" -Instanz wie das Builder-Muster zurück, sondern geben stattdessen ein _new_ "Observable" zurück, das das "alte" "Observable" umhüllt. – nhaarman
Ich verstehe das. Das ist unglücklich für den Fall, denn das bedeutet, ich muss den 'observeOn() 'Aufruf zu jedem Stream, den ich schreibe, hinzufügen? :( – WonderCsabo