2016-07-02 10 views
1

Wir haben Micro-Services-Architektur, wo wir Inter-Service-Anrufe über ein Netzwerk tätigen. Wir verwenden RxJava im Top-Level-Service, was zur Erzeugung einer großen Anzahl von parallelen Anfragen an den unteren Service führt. Aus diesem Grund bekomme ich "No Route to Host Fehler" oder "Verbindungsfehler". Zu diesem Zweck möchte ich die Emission von RxJava Observable verlangsamen, so dass die frühere Verbindung geschlossen wird, bevor Sie eine neue erstellen. Unten ist der Beispielcode:So verzögern Sie die beobachtbare Emission in RxJava

package com.demo.rxjava.rxjaxa.creation; 
    import rx.Observable; 
    import rx.Subscriber; 
    import rx.schedulers.Schedulers; 

    public class Delay { 

     public static void main(String[] args) throws InterruptedException { 
      Observable.just(1, 2, 3, 4, 5).subscribeOn(Schedulers.io()) 
        .flatMap(integer -> { 
         return function1(integer); 
        }).observeOn(Schedulers.io()) 
        .subscribe(new Subscriber<String>() { 
         @Override 
         public void onNext(String item) { 
          System.out.println("Next: " + item); 
         } 

         @Override 
         public void onError(Throwable error) { 
          System.err.println("Error: " + error.getMessage()); 
         } 

         @Override 
         public void onCompleted() { 
          System.out.println("Sequence complete."); 
         } 
        }); 
     } 

    public Observable<String> function1(String id) { 
       // This is where we make network call 
       Observable<Response> response = Rx.newClient(RxObservableInvoker.class) 
         .target("http://example.com/resource") 
         .request() 
         .queryParam("id", id) 
         .rx() 
         .get(); 
       response.obserOn(Schedulers.from(threadExecutor)).flatMap(response->{ 
        return response.extractResponse(); 
       }); 
    } 
} 

Antwort

0

Um einen bestimmten Schritt zu verzögern Sie zip verwenden und kombinieren können, dass jedes Element in Ihrem ersten Observable.from mit einem Intervall von X Zeit gehen emittiert.

/** 
* If we want to delay the every single item emitted in the pipeline we will need a hack, 
* one possible hack is use zip operator and combine every item emitted with an interval so every item emitted has to wait until interval emit the item. 
*/ 
@Test 
public void delay() { 
    long start = System.currentTimeMillis(); 
    Subscription subscription = Observable.zip(Observable.from(Arrays.asList(1, 2, 3)), Observable.interval(200, TimeUnit.MILLISECONDS), (i, t) -> i) 
              .subscribe(n -> System.out.println("time:" + (System.currentTimeMillis() - start))); 
    new TestSubscriber((Observer) subscription).awaitTerminalEvent(3000, TimeUnit.MILLISECONDS); 
} 

Dies wird

time:537 
    time:738 
    time:936 

Weitere practicle Beispiele hier drucken https://github.com/politrons/reactive

+0

Dies wird nur den Beginn der Emission zu verzögern, ich möchte jede Emission von beobachtbaren durch bestimmtes Intervall zu verzögern. –

+0

Überprüfen Sie meine neue Antwort – paul

+0

Es funktioniert, aber auf Produktionscode gibt es fehlende Rückstau Fehler. –

0

Anstatt Ihre Anfragen zu verzögern, sollten Sie die Anforderungen an den Boden Service auf einem Scheduler auftreten, die parallele Aktivität begrenzt . Zum Beispiel:

int maxParallel = 4; 
Scheduler scheduler = Schedulers.from(
    Executors.newFixedThreadPool(maxParallel)); 
... 
observable 
    .flatMap(x -> 
     submitToBottomService(x) 
     .subscribeOn(scheduler)) 
    .subscribe(subscriber); 

Übrigens erwähnen Sie das Schließen einer Verbindung. Der Operator Observable.using dient zum Schließen von Ressourcen in einem reaktiven Kontext (er schließt Ressourcen bei Beendigung und Abmeldung). Wenn Sie es noch nicht benutzen, dann werfen Sie einen Blick darauf.

+0

Wir führen Bottom-Service-Anruf auf dem Scheduler, aber wir erstellen Async-Aufruf an den unteren Dienst, so dass es eine Menge Verbindung erstellt, bevor der erste Aufruf Ergebnisse zurückgibt. –

+0

der async-Aufruf sollte mit dem Scheduler erfolgen, möchten Sie mehr Code zur Verfügung stellen? –

+0

Ich kann den ursprünglichen Code nicht teilen, aber aktualisierte Frage mit dem Sudo-Code –