2015-10-27 4 views
14

Ich verwende RxAndroid für Stream-Operationen. In meinem realen Anwendungsfall hole ich eine Liste vom Server (unter Verwendung Retrofit). Ich benutze Scheduler, um die Arbeit an einem Hintergrund-Thread zu erledigen und die endgültige Emission auf dem Android-UI-Thread (Haupt-Thread) zu erhalten.Process Observable auf Hintergrund Thread

Das funktioniert gut für den Netzwerkanruf, aber ich erkannte, dass meine Operatoren nach dem Netzwerkaufruf nicht den Hintergrund-Thread verwenden, sondern auf den Haupt-Thread aufgerufen werden.

myService.fetchSomeIntegersFromServer() 
     .subscribeOn(Schedulers.newThread()) 
     .observeOn(AndroidSchedulers.mainThread()) 
     .filter(integer -> { 
      System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread()); 
      return true; 
     }) 
     .subscribe(integer1 -> {}); 

Wie kann ich sicherstellen, dass alle Operationen auf einem Hintergrund-Thread ausgeführt werden?

Antwort

39

TL; DR: Verschieben observeOn(AndroidSchedulers.mainThread()) unter filter(...).


subscribeOn(...) verwendet wird, auf dem die Observable auf Betrieb beginnt Thread zu bezeichnen. Nachfolgende Aufrufe an subscribeOn werden ignoriert.

Wenn Sie also die folgenden schreiben sind, alles würde auf Schedulers.newThread() ausgeführt werden:

myService.fetchSomeIntegersFromServer() 
     .subscribeOn(Schedulers.newThread()) 
     .filter(integer -> { 
      System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread()); 
      return true; 
     }) 
     .subscribe(integer1 -> { doSomething(integer1); }); 

Nun, natürlich, das ist nicht das, was Sie wollen: Sie auf dem Haupt-Thread doSomething wollen .
Das ist, wo observeOn an Ort und Stelle kommt. Alle Aktionen nachobserveOn werden auf diesem Scheduler ausgeführt. Daher wird in Ihrem Beispiel die filter auf dem Haupt-Thread ausgeführt.

Stattdessen bewegen observeOn bis kurz vor subscribe:

myService.fetchSomeIntegersFromServer() 
     .subscribeOn(Schedulers.newThread()) 
     .filter(integer -> { 
      System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread()); 
      return true; 
     }) 
     .observeOn(AndroidSchedulers.mainThread()) 
     .subscribe(integer1 -> { doSomething(integer1) }); 

Nun filter auf dem 'neuen Thread' passieren wird, und doSomething auf dem Haupt-Thread.


noch weiter zu gehen, Sie observeOn mehrfach verwenden können:

myService.fetchSomeIntegersFromServer() 
     .subscribeOn(Schedulers.newThread()) 
     .observeOn(Schedulers.computation()) 
     .filter(integer -> { 
      System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread()); 
      return true; 
     }) 
     .observeOn(AndroidSchedulers.mainThread()) 
     .subscribe(integer1 -> { doSomething(integer1) }); 

In diesem Fall auf einen neuen Thread, die Filterung auf einer Berechnung Gewinden und doSomething auf die das Abrufen auftreten Haupt-Bedroung.

Checkout ReactiveX - SubscribeOn operator für die offizielle Dokumentation.

+0

Danke für die detaillierte Antwort! Gibt es eine Möglichkeit, die Scheduler am Anfang der Methoden-Aufrufkette zu definieren (wie im OP)? – WonderCsabo

+0

Die Observables folgen dem [Decorator Pattern] (https://en.wikipedia.org/wiki/Decorator_pattern), das ist also per Design nicht möglich. "observeOn" (und die anderen Methoden) geben nicht dieselbe "Observable" -Instanz wie das Builder-Muster zurück, sondern geben stattdessen ein _new_ "Observable" zurück, das das "alte" "Observable" umhüllt. – nhaarman

+0

Ich verstehe das. Das ist unglücklich für den Fall, denn das bedeutet, ich muss den 'observeOn() 'Aufruf zu jedem Stream, den ich schreibe, hinzufügen? :( – WonderCsabo