2016-07-27 16 views
2

Ich habe ein Problem mit RxJava onErrorResumeNext Operator. Ich möchte den Speicherort abrufen, dann Daten vom Server abrufen (mit Retrofit) hängt vom Speicherort ab, aber wenn kein Speicherort (Fehler: Sequenz enthält keine Elemente), möchte ich Daten vom Server mit einem anderen Observable erhalten (was nicht abhängt Lage). Ich habe versucht, onErrorResumeNext-Operator zu verwenden, aber bekomme "java.io.InterruptedIOException: thread interrupted".RxJava onErrorResumeNext Aufrufe java.io.InterrupedIOException

-Code vor onErrorResumeNext Zugabe - funktioniert gut

LocationService.getUpdatedOrLastKnownLocation(getContext())) 
      .flatMap(location -> RestService.getPostsAround(location,0,10)) //offset = 0, limit = 10; 
      .subscribeOn(Schedulers.io()) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribe(subscriber); 

Code mit onErrorResumeNext - throws Exception

LocationService.getUpdatedOrLastKnownLocation(getContext())) 
      .flatMap(location -> RestService.getPostsAround(location,0,10)) //offset = 0, limit = 10; 
      .onErrorResumeNext(RestService.getPostsByMapProjection(googleMap.getProjection().getVisibleRegion())) 
      .subscribeOn(Schedulers.io()) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribe(subscriber); 

Stacktrace:

07-27 22:33:58.384 18632-18632/com.blacksea.plamobi W/System.err: java.io.InterruptedIOException: thread interrupted 
07-27 22:33:58.384 18632-18632/com.blacksea.plamobi W/System.err:  at okio.Timeout.throwIfReached(Timeout.java:145) 
07-27 22:33:58.385 18632-18632/com.blacksea.plamobi W/System.err:  at okio.Okio$1.write(Okio.java:77) 
07-27 22:33:58.385 18632-18632/com.blacksea.plamobi W/System.err:  at okio.AsyncTimeout$1.write(AsyncTimeout.java:155) 
07-27 22:33:58.385 18632-18632/com.blacksea.plamobi W/System.err:  at okio.RealBufferedSink.flush(RealBufferedSink.java:221) 
07-27 22:33:58.386 18632-18632/com.blacksea.plamobi W/System.err:  at okhttp3.internal.http.Http1xStream.finishRequest(Http1xStream.java:159) 
07-27 22:33:58.386 18632-18632/com.blacksea.plamobi W/System.err:  at okhttp3.internal.http.HttpEngine.readNetworkResponse(HttpEngine.java:721) 
07-27 22:33:58.387 18632-18632/com.blacksea.plamobi W/System.err:  at okhttp3.internal.http.HttpEngine.access$200(HttpEngine.java:81) 
07-27 22:33:58.387 18632-18632/com.blacksea.plamobi W/System.err:  at okhttp3.internal.http.HttpEngine$NetworkInterceptorChain.proceed(HttpEngine.java:708) 
07-27 22:33:58.389 18632-18632/com.blacksea.plamobi W/System.err:  at okhttp3.internal.http.HttpEngine.readResponse(HttpEngine.java:563) 
07-27 22:33:58.389 18632-18632/com.blacksea.plamobi W/System.err:  at okhttp3.RealCall.getResponse(RealCall.java:241) 
07-27 22:33:58.389 18632-18632/com.blacksea.plamobi W/System.err:  at okhttp3.RealCall$ApplicationInterceptorChain.proceed(RealCall.java:198) 
07-27 22:33:58.391 18632-18632/com.blacksea.plamobi W/System.err:  at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:160) 
07-27 22:33:58.392 18632-18632/com.blacksea.plamobi W/System.err:  at okhttp3.RealCall.execute(RealCall.java:57) 
07-27 22:33:58.392 18632-18632/com.blacksea.plamobi W/System.err:  at retrofit2.OkHttpCall.execute(OkHttpCall.java:174) 
07-27 22:33:58.392 18632-18632/com.blacksea.plamobi W/System.err:  at retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$RequestArbiter.request(RxJavaCallAdapterFactory.java:171) 
07-27 22:33:58.392 18632-18632/com.blacksea.plamobi W/System.err:  at rx.Subscriber.setProducer(Subscriber.java:211) 
07-27 22:33:58.393 18632-18632/com.blacksea.plamobi W/System.err:  at rx.Subscriber.setProducer(Subscriber.java:205) 
07-27 22:33:58.394 18632-18632/com.blacksea.plamobi W/System.err:  at retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call(RxJavaCallAdapterFactory.java:152) 
07-27 22:33:58.394 18632-18632/com.blacksea.plamobi W/System.err:  at retrofit2.adapter.rxjava.RxJavaCallAdapterFactory$CallOnSubscribe.call(RxJavaCallAdapterFactory.java:138) 
07-27 22:33:58.394 18632-18632/com.blacksea.plamobi W/System.err:  at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:50) 
07-27 22:33:58.395 18632-18632/com.blacksea.plamobi W/System.err:  at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) 
07-27 22:33:58.395 18632-18632/com.blacksea.plamobi W/System.err:  at rx.Observable.unsafeSubscribe(Observable.java:8460) 
07-27 22:33:58.396 18632-18632/com.blacksea.plamobi W/System.err:  at rx.internal.operators.OnSubscribeFlattenIterable.call(OnSubscribeFlattenIterable.java:65) 
07-27 22:33:58.396 18632-18632/com.blacksea.plamobi W/System.err:  at rx.internal.operators.OnSubscribeFlattenIterable.call(OnSubscribeFlattenIterable.java:37) 
07-27 22:33:58.396 18632-18632/com.blacksea.plamobi W/System.err:  at rx.Observable.unsafeSubscribe(Observable.java:8460) 
07-27 22:33:58.398 18632-18632/com.blacksea.plamobi W/System.err:  at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError(OperatorOnErrorResumeNextViaFunction.java:141) 
07-27 22:33:58.398 18632-18632/com.blacksea.plamobi W/System.err:  at rx.internal.operators.OperatorMerge$MergeSubscriber.reportError(OperatorMerge.java:266) 
07-27 22:33:58.398 18632-18632/com.blacksea.plamobi W/System.err:  at rx.internal.operators.OperatorMerge$MergeSubscriber.checkTerminate(OperatorMerge.java:810) 
07-27 22:33:58.400 18632-18632/com.blacksea.plamobi W/System.err:  at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:571) 
07-27 22:33:58.400 18632-18632/com.blacksea.plamobi W/System.err:  at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:560) 
07-27 22:33:58.400 18632-18632/com.blacksea.plamobi W/System.err:  at rx.internal.operators.OperatorMerge$MergeSubscriber.onError(OperatorMerge.java:276) 
07-27 22:33:58.401 18632-18632/com.blacksea.plamobi W/System.err:  at rx.internal.operators.OperatorMap$MapSubscriber.onError(OperatorMap.java:85) 
07-27 22:33:58.403 18632-18632/com.blacksea.plamobi W/System.err:  at rx.internal.operators.OperatorMerge$MergeSubscriber.reportError(OperatorMerge.java:266) 
07-27 22:33:58.403 18632-18632/com.blacksea.plamobi W/System.err:  at rx.internal.operators.OperatorMerge$MergeSubscriber.checkTerminate(OperatorMerge.java:810) 
07-27 22:33:58.403 18632-18632/com.blacksea.plamobi W/System.err:  at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:571) 
07-27 22:33:58.404 18632-18632/com.blacksea.plamobi W/System.err:  at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:560) 
07-27 22:33:58.404 18632-18632/com.blacksea.plamobi W/System.err:  at rx.internal.operators.OperatorMerge$InnerSubscriber.onError(OperatorMerge.java:844) 
07-27 22:33:58.404 18632-18632/com.blacksea.plamobi W/System.err:  at rx.internal.operators.OperatorSubscribeOn$1$1.onError(OperatorSubscribeOn.java:59) 
07-27 22:33:58.405 18632-18632/com.blacksea.plamobi W/System.err:  at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.checkTerminated(OperatorObserveOn.java:264) 
07-27 22:33:58.406 18632-18632/com.blacksea.plamobi W/System.err:  at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.call(OperatorObserveOn.java:207) 
07-27 22:33:58.406 18632-18632/com.blacksea.plamobi W/System.err:  at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55) 
07-27 22:33:58.406 18632-18632/com.blacksea.plamobi W/System.err:  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:423) 
07-27 22:33:58.406 18632-18632/com.blacksea.plamobi W/System.err:  at java.util.concurrent.FutureTask.run(FutureTask.java:237) 
07-27 22:33:58.407 18632-18632/com.blacksea.plamobi W/System.err:  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:269) 
07-27 22:33:58.407 18632-18632/com.blacksea.plamobi W/System.err:  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1113) 
07-27 22:33:58.407 18632-18632/com.blacksea.plamobi W/System.err:  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:588) 
07-27 22:33:58.407 18632-18632/com.blacksea.plamobi W/System.err:  at java.lang.Thread.run(Thread.java:818) 
+0

Können Sie veröffentlichen, wie dies Observable erstellt? 'RestService.getPostsByMapProjection (googleMap.getProjection(). GetVisibleRegion()' tut es observeOn() neuer Thread? – Prakash

+0

Nein, tut es nicht. Aber wenn ich es hinzufüge, ändert sich nichts. Observable erstellt von Retrofit –

Antwort

2

Wenn retrofit2 Wird verwendet, seine Observablen ändern nicht den Thread, auf dem sie operieren, also läuft die gesamte Kette bis subscribeOn() auf einem einzelnen Thread von io scheduler ab. Scheinbar Fehler von RestService.getPostsAround setzt unterbrochene Flag für diesen Thread, und okio Drosseln auf das in RestService.getPostsByMapProjection. Sie können versuchen, subscribeOn(Schedulers.io()) nach RestService.getPostsByMapProjection in onErrorResumeNext() hinzuzufügen.

+0

perfekt, '.subscribeOn (Schedulers.io())' gelöstes Problem. Vielen Dank. –

2

Nach meiner Erfahrung ist die java.io.InterruptedIOException ein roter Hering , wie es normalerweise bedeutet, dass einige andere Thread abgemeldet, und alle laufenden Threads (z. B. Netzwerkanfragen) werden unterbrochen.

In Ihrem Fall wird RestService.getPostsByMapProjection jedes Mal aufgerufen, ob es einen Fehler gibt oder nicht, der möglicherweise nicht das gewünschte Verhalten ist; Betrachten Sie es in einem Observable.defer()

+0

Ich habe den Code geändert in: '.onErrorResumeNext (Observable.defer (() -> RestService.getPostsByMapProjection (googleMap.getProjection(). getVisibleRegion())))', aber nichts ändert sich –

0

Umwickeln Sie max Parallelität auf Ihrem flatMap können nur sicher sein, dass du bist keine gleichzeitige Ausgabe mit

@Beta 
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func, int maxConcurrent) { 
    if (getClass() == ScalarSynchronousObservable.class) { 
     return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func); 
    } 
    return merge(map(func), maxConcurrent); 
} 

In Ihrem Code

LocationService.getUpdatedOrLastKnownLocation(getContext())) 
     .flatMap(location -> RestService.getPostsAround(location,0,10),1) //offset = 0, limit = 10; 
     .subscribeOn(Schedulers.io()) 
     .observeOn(AndroidSchedulers.mainThread()) 
     .subscribe(subscriber); 
+0

Nichts ändert sich –