Ich lerne RxJava
und testet ein Szenario, in dem ich Daten aus einer DB lese und dann in eine Warteschlange posten. Ich habe gerade eine Probe über den ganzen Prozess gemacht, aber ich finde nicht, dass das Observable
so funktioniert, wie ich es wollte. asynchron.Observable ist nicht asynchron
Dies ist mein Code:
package rxJava;
import java.util.ArrayList;
import java.util.List;
import rx.Observable;
import rx.Observer;
import rx.functions.Action1;
public class TestClass {
public static void main(String[] args) {
TestClass test = new TestClass();
System.out.println("---START---");
test.getFromDB().subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("Publish complete.");
}
@Override
public void onError(Throwable t) {
System.out.println(t.getMessage());
}
@Override
public void onNext(String s) {
test.publishToQueue(s).subscribe(new Observer<Boolean>() {
@Override
public void onNext(Boolean b) {
if (b) {
System.out.println("Successfully published.");
}
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable arg0) {
}
});
};
});
System.out.println("---END---");
}
public Observable<String> getFromDB() {
List<String> list = new ArrayList<String>();
for (int i = 0; i < 30; i++) {
list.add(Integer.toString(i));
}
return Observable.from(list).doOnNext(new Action1<String>() {
@Override
public void call(String temp) {
if (temp.contains("2")) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
}
public Observable<Boolean> publishToQueue(String s) {
return Observable.defer(() -> {
try {
if (s.contains("7")) {
Thread.sleep(700);
}
System.out.println("Published:: " + s);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Observable.just(true);
});
}
}
nehme ich eine Liste von der DB erhalten asynchron und wollen es in die Warteschlange stellen ,. Ich habe ein Observable
verwendet, das von getFromDB
zurückgegeben wird und habe es abonniert, das die Daten nachahmt, die ich von DB erhalte. Jedes Mal, wenn ich die Daten von der DB bekomme, möchte ich sie mit publishToQueue
in eine Warteschlange schieben, die auch eine Observable
zurückgibt. Ich wollte die Warteschlange auch asynchron aufrufen lassen. Jetzt auf positive Quittung aus der Warteschlange wie die , die ich zurück (Observable<Boolean>
), möchte ich etwas drucken.
Also im Grunde will ich nur beide Prozesse asynchron sein. Für jede Daten von DB, schiebe ich es asynchron in die Warteschlange.
Ich habe Thread.sleep()
in beiden Methoden, DB-Aufruf und Warteschlange hinzugefügt, um eine Verzögerung nachzuahmen und die asynchronen Operationen zu testen. Ich denke, das ist es, was das Problem verursacht. Aber ich habe auch versucht Obseravable.delay()
, aber das produziert nicht einmal eine Ausgabe.
Bitte helfen Sie mir zu verstehen, wie das funktioniert und wie ich es so machen kann, wie ich es will.
Können Sie bitte erklären? – v1shnu