2015-11-06 12 views
5

Ich bin ein Neuling in Rxjava. Ich habe den folgenden Code:Rxjava: Abonnieren Sie den spezifischen Thread

System.out.println("1: " + Thread.currentThread().getId()); 
    Observable.create(new rx.Observable.OnSubscribe<String>() { 
     @Override 
     public void call(Subscriber<? super String> subcriber) { 
      System.out.println("2: " + Thread.currentThread().getId()); 
      // query database 
      String result = .... 
      subcriber.onNext(result); 
     } 

    }).subscribeOn(Schedulers.newThread()).subscribe(countResult -> { 
     System.out.println("3: " + Thread.currentThread().getId()); 
    }); 

Zum Beispiel wird der Ausgang sein:

1: 50
2: 100
3: 100

Ich möchte Abonnenten auf der Flucht Thread mit ID 50. Wie kann ich das tun?

+0

Können Sie uns auch sagen, warum Sie das tun möchten? Uns fehlt ein bisschen Kontext. –

Antwort

-1

Mit RxJava 1.0.15 können Sie toBlocking() vor subscribe anwenden und alles wird auf dem Thread ausgeführt, der die gesamte Sequenz von Operatoren erstellt hat.

+0

Ich glaube nicht, dass der OP seinen Code synchron machen will. Stattdessen möchte er zu einem bestimmten Thread wechseln, bevor er jeden Teilnehmer anruft. – FriendlyMikhail

+0

Die op erwähnt Thread 50, der der Thread ist, der die asynchrone Berechnung startet. Es gibt keinen anderen Weg als das Blockieren von Abonnements, um zu demselben Thread zurückzukehren. – akarnokd

+0

Ich dachte darüber nach, 'Schedulers.from (executorService)' '' ExecutorService '' zu geben, etwa so: '' 'ExecutorService executorService = Executors.newSingleThreadExecutor (r -> Thread.currentThread());' ''. Stürzt leider mit 'IllegalThreadStateException' ab. – AndroidEx

-1

So SubscribeOn bezeichnet, welcher Thread der Observable beginnt, Elemente auf, zu emittieren, beobachtendOn "schaltet" zu einem Thread für den Rest der beobachtbaren Kette. Setzen Sie ein observationOn (schedulers.CurrentThread()) direkt vor dem subscribe und Sie werden in dem Thread sein, in dem dieses Observable erstellt wird und nicht in dem Thread, in dem es ausgeführt wird. Hier ist eine Ressource, die rxjava Threading wirklich gut erklärt. http://www.grahamlea.com/2014/07/rxjava-threading-examples/

+1

Es gibt keinen CurrentThread-Scheduler in RxJava und es gibt keine Möglichkeit, die Ausführung desselben Threads in verschiedenen Teilen der Pipeline mit mehreren ObserveOn zurückzusetzen, es sei denn, der Scheduler ist single-threaded. – akarnokd

+0

try Schedulers.immediate(), die "erstellt und gibt einen {@ Link Scheduler}, der Arbeit sofort auf dem aktuellen Thread ausführt." – FriendlyMikhail

+0

Das bleibt immer noch nicht beim Erstellungs-Thread und ist effektiv ein No-Op. – akarnokd

-1

Ich glaube @akarnokd hat Recht. Sie laufen entweder ohne die .subscribeOn(Schedulers.newThread()), so dass es synchron passiert oder verwenden Sie toBlocking() kurz vor dem Abonnement. Alternativ, wenn Sie einfach alles auf dem gleichen Thread passieren soll, aber es nicht den aktuellen Thread sein, dann können Sie dies tun:

Observable 
    .defer(() -> Observable.create(...)) 
    .subscribeOn(Schedulers.newThread()) 
    .subscribe(subscriber); 

defer stellt sicher, dass der Anruf an create auf dem Abonnement Thread geschieht .

1

Ich denke, dass es zwei Fälle gibt. Entweder benötigen Sie sie für den UI-Thread oder für die Synchronisierung. Wie ich weiß, können Sie eine Funktion für einen bestimmten Thread nicht aufrufen, da die Methode beim Aufruf der Methode an den Kontext des Threads gebunden ist. Daher ist es unmöglich, eine Methode von einem Thread zu einem anderen Thread aufzurufen. Ihr Problem ist, dass die Methode im Subskribenten von Schedulers.newThread() aufgerufen wird. Ich fand auch diese github issue über Schedulers.currentThread(). Was Sie brauchen, ist, den Anrufer Thread zu benachrichtigen, wenn der Beobachter aufgerufen wird.

Sie können auch akka verwenden, es ist viel einfacher, gleichzeitig Code damit zu schreiben.

Entschuldigung für meine schlechte Grammatik.

+0

Seltsam, sie erwähnen AndroidSchedulers.handlerThread, aber es gibt keine solche Methode in meinem rxjava .. – Fen1kz

0

Vom docs:

standardmäßig eine beobachtbare und die Kette von Operatoren, die Sie wenden sie ihre Arbeit tun, und wird seine Beobachter melden, auf dem gleichen Thread , auf denen ihre abonnieren Methode wird aufgerufen. Der SubscribeOn-Operator ändert dieses Verhalten, indem er einen anderen Scheduler angibt, auf dem Observable ausgeführt werden soll. Der Operator ObserveOn gibt einen anderen Scheduler an, den das Observable zum Senden von Benachrichtigungen an seine Beobachter verwendet.

So können Sie nur subscribe anstelle von subscribeOn Ihre Sammlung auf dem gleichen Thread zu beobachten, es erstellt wurde, das so etwas wie:

Observable.create(new rx.Observable.OnSubscribe<String>() { 
     @Override 
     public void call(Subscriber<? super String> subcriber) { 
      System.out.println("2: " + Thread.currentThread().getId()); 
      // query database 
      String result = .... 
      subcriber.onNext(result); 
     } 

    }).subscribe(countResult -> { 
     System.out.println("3: " + Thread.currentThread().getId()); 
    }); 

UPDATE: Wenn Ihre Anwendung ein Android-Anwendung, Sie können abonnieren Sie auf einem Hintergrund-Thread als Sie tun und übergibt die Ergebnisse an den Haupt-Thread mit Handler Nachrichten.

Wenn Ihre Anwendung eine Java-Anwendung ist, kann ich den Mechanismus wait() und notify() vorschlagen oder in komplexeren Szenarien Frameworks wie EventBus oder akka verwenden.

+0

Der Punkt ist, Thread zu wechseln und zurück zu ihm. – Fen1kz