2016-06-16 10 views
16

Im documentation for RetryWhen Beispiel gibt es so geht:Fangfehler, wenn retryWhen: s Wiederholungen abläuft

Observable.create((Subscriber<? super String> s) -> { 
    System.out.println("subscribing"); 
    s.onError(new RuntimeException("always fails")); 
}).retryWhen(attempts -> { 
    return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> { 
     System.out.println("delay retry by " + i + " second(s)"); 
     return Observable.timer(i, TimeUnit.SECONDS); 
    }); 
}).toBlocking().forEach(System.out::println); 

Aber wie propagieren ich den Fehler, wenn die Wiederholungen abläuft?

Hinzufügen .doOnError(System.out::println)nach die retryWhen Klausel nicht den Fehler abfangen. Wird es überhaupt emittiert?

Hinzufügen eines .doOnError(System.out::println)vor retryWenn always fails für alle Wiederholungen angezeigt wird.

Antwort

1

Eine Option verwendet Observable.materialize(), um Observable.range() Elemente in Benachrichtigungen zu konvertieren. Dann, wenn onCompleted() ausgegeben wird, kann ein Fehler propagieren stromabwärts (in Probe unter Pair verwendet Observable.range() Meldungen und Ausnahme von Observable einzuwickeln)

@Test 
    public void retryWhen() throws Exception { 

    Observable.create((Subscriber<? super String> s) -> { 
     System.out.println("subscribing"); 
     s.onError(new RuntimeException("always fails")); 
    }).retryWhen(attempts -> { 
     return attempts.zipWith(Observable.range(1, 3).materialize(), Pair::new) 
      .flatMap(notifAndEx -> { 
      System.out.println("delay retry by " + notifAndEx + " second(s)"); 
      return notifAndEx.getRight().isOnCompleted() 
        ? Observable.<Integer>error(notifAndEx.getLeft()) 
        : Observable.timer(notifAndEx.getRight().getValue(), TimeUnit.SECONDS); 
     }); 
    }).toBlocking().forEach(System.out::println); 
} 

    private static class Pair<L,R> { 
     private final L left; 
     private final R right; 

     public Pair(L left, R right) { 
      this.left = left; 
      this.right = right; 
     } 

     public L getLeft() { 
      return left; 
     } 

     public R getRight() { 
      return right; 
     } 
    } 
0

Sie können das Verhalten erhalten Sie die RetryWhen Builder in rxjava-extras verwenden möchten, welche auf ist Maven Zentral. Verwenden Sie die neueste Version.

Observable.create((Subscriber<? super String> s) -> { 
    System.out.println("subscribing"); 
    s.onError(new RuntimeException("always fails")); 
}) 
.retryWhen(RetryWhen 
    .delays(Observable.range(1, 3) 
       .map(n -> (long) n), 
      TimeUnit.SECONDS).build()) 
.doOnError(e -> e.printStackTrace()) 
.toBlocking().forEach(System.out::println); 
+0

Um, warum die down-vote? Dies ist eine Einheit getestet Bibliothek. –

7

Die Javadoc für retryWhen besagt, dass:

If that Observable calls onComplete or onError then retry will call onCompleted or onError on the child subscription.

Einfach gesagt, wenn Sie die Ausnahme propagieren wollen, werden Sie die ursprüngliche Ausnahme erneut auslösen müssen, sobald Sie genug retrying hatte.

Ein einfacher Weg ist, Ihre Observable.range auf 1 größer als die Anzahl der Versuche, die Sie wiederholen möchten, zu setzen.

Dann in Ihrer zip-Funktion die aktuelle Anzahl der Wiederholungen testen. Wenn es NUMBER_OF_RETRIES + 1 entspricht, geben Sie Observable.error(throwable) zurück oder werfen Sie die Ausnahme erneut.

EG

Observable.create((Subscriber<? super String> s) -> { 
      System.out.println("subscribing"); 
      s.onError(new RuntimeException("always fails")); 
     }).retryWhen(attempts -> { 
      return attempts.zipWith(Observable.range(1, NUMBER_OF_RETRIES + 1), (throwable, attempt) -> { 
       if (attempt == NUMBER_OF_RETRIES + 1) { 
        throw Throwables.propagate(throwable); 
       } 
       else { 
        return attempt; 
       } 
      }).flatMap(i -> { 
       System.out.println("delaying retry by " + i + " second(s)"); 
       return Observable.timer(i, TimeUnit.SECONDS); 
      }); 
     }).toBlocking().forEach(System.out::println); 

Als Neben doOnError wirkt sich nicht auf die beobachtbaren in irgendeiner Weise - es einfach ermöglicht Ihnen mit einem Haken, eine Aktion auszuführen, wenn ein Fehler auftritt. Ein häufiges Beispiel ist die Protokollierung.

-1

Sie benötigen onErrorResumeNext nach den retryWhen

In Ihrem Beispiel

Observable.create((Subscriber<? super String> s) -> { 
     System.out.println("subscribing"); 
     s.onError(new RuntimeException("always fails")); 
    }).retryWhen(attempts -> { 
     return attempts.zipWith(Observable.range(1, NUMBER_OF_RETRIES + 1), (n, i) -> { 
      if (i == NUMBER_OF_RETRIES + 1) { 
       throw Throwables.propagate(n); 
      } 
      else { 
       return i; 
      } 
     }).flatMap(i -> { 
      System.out.println("delay retry by " + i + " second(s)"); 
      return Observable.timer(i, TimeUnit.SECONDS); 
     }); 
    }) 
    .onErrorResumeNext(t -> {System.out.println("Error after all retries:" + t.getMessage()); 
               return Observable.error(t); 
              }) 
    .toBlocking().forEach(System.out::println); 

Sie ein praktisches Beispiel sehen können diese Klasse An der Unterseite verwenden, um zu verstehen, wie funktioniert. https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/errors/ObservableExceptions.java

-1

Sie entweder Scan-Funktion verwenden können, die ein Paar mit den kumulierten Index zurückgibt und entscheiden, ob oder nicht auf dem Fehler passieren:

.retryWhen(attempts -> 
    return .scan(Pair.create(0, null), (index, value) -> Pair.create(index.first + 1, value)) 
      .flatMap(pair -> { 
       if(pair.first > MAX_RETRY_COUNT) { 
        throw new RuntimeException(pair.second); 
       } 
       return Observable.timer(pair.first, TimeUnit.SECONDS); 
      }); 

Oder Sie mit zipWith Operator halten können, aber die Zahl in range erhöhen Beobachtbar und gibt ein Paar statt des Indexes allein zurück.Auf diese Weise werden Sie nicht die Informationen über vorherige throwable verlieren.

attempts 
    .zipWith(Observable.range(1, MAX_RETRY_COUNT + 1), (throwable, i) -> Pair.create(i, throwable)) 
    .flatMap(pair -> { 
     if(pair.first > MAX_RETRY_COUNT) throw new RuntimeException(pair.second); 
     System.out.println("delay retry by " + pair.first + " second(s)"); 
     return Observable.timer(pair.first, TimeUnit.SECONDS); 
    }); 
9

Der Doc für retryWhen sagt, dass es onError Mitteilung an seine Abonnenten und endet gibt. So können Sie so etwas tun:

final int ATTEMPTS = 3; 

    Observable.create((Subscriber<? super String> s) -> { 
     System.out.println("subscribing"); 
     s.onError(new RuntimeException("always fails")); 
    }).retryWhen(attempts -> attempts 
      .zipWith(Observable.range(1, ATTEMPTS), (n, i) -> 
        i < ATTEMPTS ? 
          Observable.timer(i, SECONDS) : 
          Observable.error(n)) 
      .flatMap(x -> x)) 
      .toBlocking() 
      .forEach(System.out::println);