Wir haben Micro-Services-Architektur, wo wir Inter-Service-Anrufe über ein Netzwerk tätigen. Wir verwenden RxJava im Top-Level-Service, was zur Erzeugung einer großen Anzahl von parallelen Anfragen an den unteren Service führt. Aus diesem Grund bekomme ich "No Route to Host Fehler" oder "Verbindungsfehler". Zu diesem Zweck möchte ich die Emission von RxJava Observable verlangsamen, so dass die frühere Verbindung geschlossen wird, bevor Sie eine neue erstellen. Unten ist der Beispielcode:So verzögern Sie die beobachtbare Emission in RxJava
package com.demo.rxjava.rxjaxa.creation;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
public class Delay {
public static void main(String[] args) throws InterruptedException {
Observable.just(1, 2, 3, 4, 5).subscribeOn(Schedulers.io())
.flatMap(integer -> {
return function1(integer);
}).observeOn(Schedulers.io())
.subscribe(new Subscriber<String>() {
@Override
public void onNext(String item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
}
public Observable<String> function1(String id) {
// This is where we make network call
Observable<Response> response = Rx.newClient(RxObservableInvoker.class)
.target("http://example.com/resource")
.request()
.queryParam("id", id)
.rx()
.get();
response.obserOn(Schedulers.from(threadExecutor)).flatMap(response->{
return response.extractResponse();
});
}
}
Dies wird nur den Beginn der Emission zu verzögern, ich möchte jede Emission von beobachtbaren durch bestimmtes Intervall zu verzögern. –
Überprüfen Sie meine neue Antwort – paul
Es funktioniert, aber auf Produktionscode gibt es fehlende Rückstau Fehler. –