2015-11-20 10 views
7

ist es sowieso zu sagen java rx den aktuellen thread in der observeOn funktion zu verwenden? Ich schreibe Code für den Android-Syncadapter und ich möchte, dass die Ergebnisse im Sync-Adapter-Thread und nicht im Haupt-Thread beobachtet werden.Wie beobachte ich den aufrufenden Thread in Java Rx?

Ein Beispiel Netzanruf mit Retrofit + RX Java sieht ungefähr wie folgt aus:

MyRetrofitApi.getInstance().getObjects() 
.subscribeOn(Schedulers.io()) 
.observeOn(<current_thread>) 
.subscribe(new Subscriber<Object>() { 
    //do stuff on the sync adapter thread 

} 

ich versucht, mit

... 
.observeOn(AndroidSchedulers.handlerThread(new Handler(Looper.myLooper()))) 
... 

verwendet, die die gleiche Art und Weise android rx den Scheduler für den Haupt-Thread erstellt ist funktioniert aber nicht mehr, sobald ich Looper.myLooper() für Looper.getMainLooper() ersetze.

Ich könnte die Schedulers.newThread() aber als komplexe Synchronisierung Code mit einer Menge von Serveranrufe würde ich ständig einen neuen Thread erstellen, nur um neue Netzwerkanrufe zu feuern, die wiederum neue Threads erstellen, um mehr Netzwerkanrufe zu starten . Gibt es eine Möglichkeit, dies zu tun? Oder ist mein Ansatz völlig falsch?

+0

Dies etwas spekulativ ist, so dass ich dies nicht als ein Posting antwort: Ab Version 2.0-beta2 setzt Retrofit die Netzwerkanfrage nicht mehr auf einen anderen Thread - siehe hier: https://github.com/square/retrofit/commit/38ce2bee70342ac1ab08115d74802d3a54d85511 Also, wenn Sie eine aktuelle Version von Nachrüsten sollte man einfach "subscribeOn" und "observeOn" überspringen können und einfach auf dem Sync Adapter Thread bleiben. Oder habe ich Ihre Frage missverstanden und Sie möchten neue Threads erstellen, aber sie sollten alle zu dem Thread zurückkehren, von dem Sie angefangen haben? –

Antwort

1

Oh, ich habe gerade diese im Wiki unter: https://github.com/ReactiveX/RxAndroid#observing-on-arbitrary-threads

new Thread(new Runnable() { 
    @Override 
    public void run() { 
     final Handler handler = new Handler(); // bound to this thread 
     Observable.just("one", "two", "three", "four", "five") 
       .subscribeOn(Schedulers.newThread()) 
       .observeOn(HandlerScheduler.from(handler)) 
       .subscribe(/* an Observer */) 

     // perform work, ... 
    } 
}, "custom-thread-1").start(); 

ich denke, das sollte für Ihren Fall arbeiten, auch - mit Ausnahme der Schaffung eines neuen Thema, natürlich ... Also einfach:

final Handler handler = new Handler(); // bound to this thread 
MyRetrofitApi.getInstance().getObjects() 
    .subscribeOn(Schedulers.io()) 
    .observeOn(HandlerScheduler.from(handler)) 
    .subscribe(new Subscriber<Object>() { 
     //do stuff on the sync adapter thread 

    } 
+0

Danke für den schnellen Repy. Ich habe Rx Android aktualisiert (ich hatte eine ältere Version benutzt) und habe das Beispiel ausprobiert. Leider hat es nicht funktioniert, aber ich habe einige Tests erstellt und herausgefunden, dass onNext() aufgerufen wird, aber der Abonnent erhält es nicht. Erst nach dem Hinzufügen von Looper.loop() am Ende scheint es zu funktionieren (was eine infite Schleife ist, die die Nachricht verarbeitet). Sogar das genaue Beispiel sagt mir, dass ich Looper.prepare() benutzen muss, bevor ich den Handler erstelle. Vielleicht mache ich etwas grundlegend falsch oder es könnte ein Problem mit Android RX selbst sein? – tiqz

+0

Ich fürchte, ich kann dir nicht sagen, was hier schief läuft ... lass uns sehen, was andere zu sagen haben ... –

+0

@tiqz 'fand heraus, dass onNext() aufgerufen wird, aber der Abonnent erhält es nicht 'onNext is die Methode eines Abonnenten, also was du schreibst, ist ein bisschen komisch für mich. In der Theorie sollte die Antwort von David gut sein. Ich würde Ihnen empfehlen, detailliertere Informationen zu veröffentlichen, damit wir Ihnen helfen können. – Diolor

3

Versuchen Sie es mit Schedulers.immediate()

MyRetrofitApi.getInstance().getObjects() 
.subscribeOn(Schedulers.io()) 
.observeOn(Schedulers.immediate()) 
.subscribe(new Subscriber<Object>() { 
    //do stuff on the sync adapter thread 

} 

Die Beschreibung sagt: Creates and returns a Scheduler that executes work immediately on the current thread.

HINWEIS:
Ich denke, es ist okay, auf dem Gewinde des SyncAdapter der ganze Arbeit zu halten, weil es bereits einen anderen Thread mit

+0

Schedulers.immediate() funktioniert nicht für mich. Ich muss auf GlThread beobachten aber es auf Thread von subscribeOn beobachten. Was könnte der Grund sein? –

+0

@DmitriyPuchkov Was ist die Reihenfolge Ihrer Funktionen 'subscribeOn' und' observeOn'? –

+0

Die Reihenfolge ist die gleiche wie Sie schreiben. Ich löse mein Problem, indem ich einen benutzerdefinierten Executor erzeuge, der Runnables nach GLThread schickt. –