2016-07-10 17 views
0

Meine Anwendung muss in der Regel zwei Dinge tun:Limit-Aktionen mit rxJava und retryWhen Betreiber

  • nur eine Netzwerkanfrage zur gleichen Zeit Accept
  • Retry wenn Anfrage

fehlgeschlagen Das ist, wie ich Implementieren Sie es:

public class RequestsLocker { 

    private volatile boolean isLocked; 

    public <T> Observable.Transformer<T, T> applyLocker() { 
     if(!isLocked()) { 
      return observable -> observable 
        .doOnSubscribe(() -> { 
         lockChannel(); 
        }) 
        .doOnUnsubscribe(() -> { 
         freeChannel(); 
        }); 
     } else { 
      return observable -> Observable.error(new ChannelBusyException("Channel is busy now.")); 
     } 
    } 

    private void lockChannel() { 
     isLocked = true; 
    } 

    private void freeChannel() { 
     isLocked = false; 
    } 

    public boolean isLocked() { 
     return isLocked; 
    } 

} 

Sieht gut aus.

Nun meine retryWhen Umsetzung:

public static Observable<?> retryWhenAnyIoExceptionWithDelay(Observable<? extends Throwable> observable) { 
    return observable.flatMap(error -> { 
     // For IOExceptions, we retry 
     if (error instanceof IOException) { 
      return Observable.timer(2, TimeUnit.SECONDS); 
     } 

     // For anything else, don't retry 
     return Observable.error(error); 
    }); 
} 

Es ist, wie ich es verwenden:

public Observable<List<QueueCarItem>> finishService(int id, PaymentType paymentType, String notes) { 
    return carsQueueApi.finishService(id, new FinishCarServiceRequest(paymentType.getName(), notes)) 
      .compose(requestsLocker.applyLocker(RequestsLocker.RequestChannel.CHANGE)); 
} 

...

public void finishCarService(QueueCarItem carItem, PaymentType paymentType, 
          String notes, Subscriber<List<QueueCarItem>> subscriber) { 
    queueApiMediator.finishService(carItem.getId(), paymentType, notes) 
      .subscribeOn(ioScheduler) 
      .observeOn(uiScheduler) 
      .doOnError(this::handleError) 
      .retryWhen(RxOperatorsHelpers::retryWhenAnyIoExceptionWithDelay) 
      .subscribe(subscriber); 
} 

Das Hauptproblem, dass doOnUnsubscribe() bei jedem Fehler genannt und Dann ist das Schließfach für jede neue Anfrage offen, bis der Timer abläuft und neu happiert ens wieder. Das ist das Problem. Während der Timer tickt, kann der Benutzer eine andere Anfrage stellen.

Wie kann ich es beheben?

+1

Könnten Sie einen Code veröffentlichen, der zeigt, wie Sie 'applyLocker' transformator und' retryWhenAnyIoExceptionWithDelay' tatsächlich verwenden? – JohnWowUs

+0

@ JohnWowUs bereit. – Alexandr

Antwort

1

Das Problem ist, dass Sie Ihren Transformator an die Quelle zu beobachten heißt, bevor Sie retrywhen sich bewerben. Wenn es einen Fehler gibt, werden Sie sich immer abmelden und dann erneut die Quelle abonnieren, die zu Ihrem doOnUnsubscribe aufgerufen wird.

Ich schlage vor, Sie versuchen,

public Observable<List<QueueCarItem>> finishService(int id, PaymentType paymentType, String notes) { 
    return carsQueueApi.finishService(id, new FinishCarServiceRequest(paymentType.getName(), notes));    
} 


public void finishCarService(QueueCarItem carItem, PaymentType paymentType, 
          String notes, Subscriber<List<QueueCarItem>> subscriber) { 
    queueApiMediator.finishService(carItem.getId(), paymentType, notes) 
      .subscribeOn(ioScheduler) 
      .observeOn(uiScheduler) 
      .doOnError(this::handleError) 
      .retryWhen(RxOperatorsHelpers::retryWhenAnyIoExceptionWithDelay) 
      .compose(requestsLocker.applyLocker(RequestsLocker.RequestChannel.CHANGE)); 
      .subscribe(subscriber); 
} 

PS: Der Umkleidewandlers ein bisschen anders aussieht heißt es kein Argument im Code nimmt Sie verknüpft.

+0

Ja, ich verstehe.Das ist das Problem: Ich habe mich entschieden, CleanAritecture von android10 zu testen. Also, 'RequetsLocker' und' finishService' Methode ist auf der 'Datenschicht'. Wenn 'finishCarService' Methode für die Domäne. Jetzt denke ich daran, den Operator retryWhen an die Datenebene zu verschieben. Aber es bricht die Philosophie von CA, da "Datenschicht" entscheiden muss, "was zu tun ist", nicht "wie". Ein anderer Weg - "RequestLocker" zu "Domain-Layer". In diesem Fall muss ich 'Domain Layer' über die Quelle der Daten benachrichtigen, da 'RequestLocker' nur auf die Netzwerkanforderung und nicht auf 'getFromCache' angewendet werden muss. Das ist auch eine schlechte Übung. Meh.. – Alexandr

1

Mit retryWhen, abmelden zu vermeiden onError Sie onErrorResumeNext verwenden müssen, die pflegen Sie abmelden.

Schauen dieses Beispiels

/** 
* Here we can see how onErrorResumeNext works and emit an item in case that an error occur in the pipeline and an exception is propagated 
*/ 
@Test 
public void observableOnErrorResumeNext() { 
    Subscription subscription = Observable.just(null) 
              .map(Object::toString) 
              .doOnError(failure -> System.out.println("Error:" + failure.getCause())) 
              .retryWhen(errors -> errors.doOnNext(o -> count++) 
                    .flatMap(t -> count > 3 ? Observable.error(t) : Observable.just(null)), 
                Schedulers.newThread()) 
              .onErrorResumeNext(t -> { 
               System.out.println("Error after all retries:" + t.getCause()); 
               return Observable.just("I save the world for extinction!"); 
              }) 
              .subscribe(s -> System.out.println(s)); 
    new TestSubscriber((Observer) subscription).awaitTerminalEvent(500, TimeUnit.MILLISECONDS); 
} 

auch über Gleichzeitigkeit, wenn Sie eine Operation in einem flatMap Operator tun, können Sie die Max gleichzeitig angeben können.

public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func, int maxConcurrent) { 
    if (getClass() == ScalarSynchronousObservable.class) { 
     return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func); 
    } 
    return merge(map(func), maxConcurrent); 
} 

können Sie weitere Beispiele hier sehen https://github.com/politrons/reactive

+0

danke. Ich kann deinen Code gerade jetzt nicht verstehen, aber ich werde ihn in ein paar Tagen testen. – Alexandr

0

Meine aktuelle Lösung ist nicht RequestLocker auf IOException wie in diesem Fall Anfrage nach einer Verzögerung wiederholt werden zu entsperren.

public <T> Observable.Transformer<T, T> applyLocker() { 
    if(!isLocked()) { 
     return observable -> observable.doOnSubscribe(() -> { 
      lockChannel(); 
     }).doOnNext(obj -> { 
      freeChannel(); 
     }).doOnError(throwable -> { 
      if(throwable instanceof IOException) { 
       return; // as any request will be repeated in case of IOException 
      } 
      freeChannel(channel); 
     }); 
    } else { 
     return observable -> Observable.error(new ChannelBusyException("Channel is busy now")); 
    } 
}