2016-07-29 9 views
0

Ich lerne RxJava und testet ein Szenario, in dem ich Daten aus einer DB lese und dann in eine Warteschlange posten. Ich habe gerade eine Probe über den ganzen Prozess gemacht, aber ich finde nicht, dass das Observable so funktioniert, wie ich es wollte. asynchron.Observable ist nicht asynchron

Dies ist mein Code:

package rxJava; 

import java.util.ArrayList; 
import java.util.List; 

import rx.Observable; 
import rx.Observer; 
import rx.functions.Action1; 

public class TestClass { 

    public static void main(String[] args) { 

     TestClass test = new TestClass(); 
     System.out.println("---START---"); 

     test.getFromDB().subscribe(new Observer<String>() { 

      @Override 
      public void onCompleted() { 
       System.out.println("Publish complete."); 
      } 

      @Override 
      public void onError(Throwable t) { 
       System.out.println(t.getMessage()); 
      } 

      @Override 
      public void onNext(String s) { 
       test.publishToQueue(s).subscribe(new Observer<Boolean>() { 

        @Override 
        public void onNext(Boolean b) { 
         if (b) { 
          System.out.println("Successfully published."); 
         } 
        } 

        @Override 
        public void onCompleted() { 
        } 

        @Override 
        public void onError(Throwable arg0) { 
        } 
       }); 
      }; 
     }); 
     System.out.println("---END---"); 
    } 

    public Observable<String> getFromDB() { 

     List<String> list = new ArrayList<String>(); 
     for (int i = 0; i < 30; i++) { 
      list.add(Integer.toString(i)); 
     } 
     return Observable.from(list).doOnNext(new Action1<String>() { 
      @Override 
      public void call(String temp) { 
       if (temp.contains("2")) { 
        try { 
         Thread.sleep(200); 
        } catch (InterruptedException e) { 
         e.printStackTrace(); 
        } 
       } 
      } 
     }); 

    } 

    public Observable<Boolean> publishToQueue(String s) { 

     return Observable.defer(() -> { 
      try { 
       if (s.contains("7")) { 
        Thread.sleep(700); 
       } 
       System.out.println("Published:: " + s); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
      return Observable.just(true); 
     }); 
    } 
} 

nehme ich eine Liste von der DB erhalten asynchron und wollen es in die Warteschlange stellen ,. Ich habe ein Observable verwendet, das von getFromDB zurückgegeben wird und habe es abonniert, das die Daten nachahmt, die ich von DB erhalte. Jedes Mal, wenn ich die Daten von der DB bekomme, möchte ich sie mit publishToQueue in eine Warteschlange schieben, die auch eine Observable zurückgibt. Ich wollte die Warteschlange auch asynchron aufrufen lassen. Jetzt auf positive Quittung aus der Warteschlange wie die , die ich zurück (Observable<Boolean>), möchte ich etwas drucken.

Also im Grunde will ich nur beide Prozesse asynchron sein. Für jede Daten von DB, schiebe ich es asynchron in die Warteschlange.

Ich habe Thread.sleep() in beiden Methoden, DB-Aufruf und Warteschlange hinzugefügt, um eine Verzögerung nachzuahmen und die asynchronen Operationen zu testen. Ich denke, das ist es, was das Problem verursacht. Aber ich habe auch versucht Obseravable.delay(), aber das produziert nicht einmal eine Ausgabe.

Bitte helfen Sie mir zu verstehen, wie das funktioniert und wie ich es so machen kann, wie ich es will.

Antwort

1

ist standardmäßig RxJava synchron. Das bedeutet, dass alles standardmäßig in demselben Thread (und dem aktuellen Thread) ausgeführt wird. Sie können zu observeOn/subscribeOn Methoden Aufgaben in einem anderen Thread dank durchführen oder einige Operatoren, die Aufgaben in einem anderen Job auszuführen (weil es einen anderen Scheduler verwenden, wie delay, interval, ...)

In Ihrem Beispiel haben Sie um zu bestimmen, in welchem ​​Scheduler das Abonnement ausgeführt wird. (Hier in dem Thread Observable.from emittieren Liste)

test.getFromDb() 
    .subscribeOn(Schedulers.io()) 
    .subscribe(); 

Dann können Sie den flatMap Operator verwenden und Ihre publishToQueue Methode aufrufen. Diese Methode wird im vorherigen Scheduler ausgeführt, aber Sie können es mithilfe der Methode observeOn dazu zwingen, einen anderen Scheduler zu verwenden. Alles nach der observeOn-Methode wird in einem anderen Thread ausgeführt.

test.fromDb() 
    .subscribeOn(Schedulers.io()) 
    .observeOn(Schedulers.computation()) 
    .flatMap(l -> test.publishToqueue(l)) 
    .subscribe();