2016-07-29 6 views
2

Ich lerne RxJava auf Android. Ich möchte es mit dem Netzwerk verwenden, also kann ich keine Sachen im Haupt-Thread machen.Wie Observable.from (Zukunft) zu verwenden?

Ich habe den folgenden Code:

final String RXTAG = "Rx"; 
Log.d(RXTAG, "Starting Rx experiment"); 

final FutureTask<String> future = new FutureTask<>(new Callable<String>() { 
    @Override 
    public String call() throws InterruptedException { 
     Log.d(RXTAG, "Callable called on thread " + Thread.currentThread().getName()); 
     Utils.assertNotUIThread(); 
     Thread.sleep(TimeUnit.SECONDS.toMillis(1)); // Simulates network latency 
     return "hello"; 
    } 
}); 

Observable.from(future, Schedulers.io()).timeout(5, TimeUnit.SECONDS).subscribe(
     new Action1<String>() { 
      @Override 
      public void call(final String s) { 
       Log.d(RXTAG, "Next " + s); 
      } 
     }, 
     new Action1<Throwable>() { 
      @Override 
      public void call(final Throwable throwable) { 
       Log.w(RXTAG, throwable); 
      } 
     }, 
     new Action0() { 
      @Override 
      public void call() { 
       Log.d(RXTAG, "Completed"); 
      } 
     } 
); 

Aber es endet mit TimeoutException nach 5 secunds und die aufrufbare genannt Protokoll wird nie gezeigt. Was ist falsch und wie funktioniert es?

+0

vielleicht sollten Sie Vertx logger 'io.vertxt.core.logging.Logger;' für Log-Nachricht zu zeigen. –

+0

Es ist kein Logger-Problem. Das erste Protokoll wird angezeigt, und auch das Protokoll mit Fehler. Und wenn ich 'Observable.just()' verwende, wird auch das nächste Protokoll angezeigt. – Pitel

+0

Entschuldigung, ich bin falsch mit vertx –

Antwort

0

Sie müssen die Future selbst ausführen. Aber leider führt es dazu, dass Callable auf Haupt-Thread ausgeführt wird, oder Sie müssen Executor verwenden.

Stattdessen komme ich mit folgenden Lösung:

// Extract Callable from FutureTask 

Single.fromCallable(callable).timeout(5, TimeUnit.SECONDS).subscribeOn(Schedulers.io()).subscribe(
    new Action1<String>() { 
     @Override 
     public void call(final String s) { 
      Log.d(RXTAG, "Success " + s); 
     } 
    }, 
    new Action1<Throwable>() { 
     @Override 
     public void call(final Throwable throwable) { 
      Log.w(RXTAG, throwable); 
     } 
    } 
); 

Und jetzt funktioniert es wie erwartet:

07-29 13:41:26.516 D/Rx: Starting Rx experiment 
07-29 13:41:26.547 D/Rx: Callable called on thread RxIoScheduler-2 
07-29 13:41:27.550 D/Rx: Success hello 
+1

An Anfänger, schlage ich vor, sie machen sich mit den verfügbaren Methoden in 'Observable' und' Single' vertraut - auch wenn Sie nur ihre Namen lesen. Die meisten Operatoren werden so benannt, dass sie der typischen Sprache entsprechen, die zum Ausdrücken einer erforderlichen Funktionalität verwendet wird. – akarnokd

1

Versuch unter Code hinzufügen, bevor abonnieren.

.doOnSubscribe(disposable -> future.run()) 
.subscribeOn(Schedulers.io()) 

Rxjava2 Code:

Observable.fromFuture(future, Schedulers.io()) 
.doOnSubscribe(disposable -> future.run()) 
.subscribeOn(Schedulers.io()) 
.subscribe(someConsumer()). 

gut läuft.