2016-05-03 11 views
7

Ich versuche, den neuesten Wert eines gegebenen Observable zu bekommen und es sofort emittieren, sobald es aufgerufen wird. In Anbetracht der Code unten als Beispiel:Holen Sie sich den neuesten Wert eines Observable und emittieren Sie es sofort

return Observable.just(myObservable.last()) 
    .flatMap(myObservable1 -> { 
     return myObservable1; 
    }) 
    .map(o -> o.x) // Here I want to end up with a T object instead of Observable<T> object 

Dies funktioniert nicht, da durch diese die flatMap tun myObservable1 die wiederum emittiert wird müssen emittieren, um die map zu erreichen. Ich weiß nicht, ob so etwas überhaupt möglich ist. Weiß jemand, wie dieses Ziel erreicht werden kann? Danke

+0

Worüber sprechen Sie? Was meinst du zuletzt? – akarnokd

+0

'myObservable' ist eine heiße Observable, die zB in unregelmäßigen Abständen" 1 "," 2 "," 3 "ausgibt. Was ich tun möchte, ist in der Lage zu sein, "MyObservable" neuesten Wert ("3" in unserem Fall) zu der Zeit, die ich erreichen die Return-Anweisung –

+0

Ist meinObservable heiß oder kalt? –

Antwort

17

last() Methode wird hier keine Hilfe sein, da es darauf wartet, dass das Observable beendet wird, um Ihnen das letzte ausgegebene Element zu geben.

Angenommen, Sie haben nicht die Kontrolle über die emittierende Observable, können Sie einfach eine BehaviorSubject erstellen und abonnieren Sie die Observable, die die Daten ausgibt, die Sie hören möchten und dann abonnieren Sie das erstellte Thema. Da Subject sowohl Observable als auch Subscriber ist, bekommen Sie, was Sie wollen.

Ich denke (habe nicht die Zeit, es jetzt zu überprüfen) müssen Sie möglicherweise manuell von der ursprünglichen Observable als BehaviorSubject abmelden, sobald alle seine Abonnenten abmelden wird nicht automatisch abbestellen.

Etwas wie folgt aus:

BehaviorSubject subject = new BehaviorSubject(); 
hotObservable.subscribe(subject); 
subject.subscribe(thing -> { 
    // Here just after subscribing 
    // you will receive the last emitted item, if there was any. 
    // You can also always supply the first item to the behavior subject 
}); 

http://reactivex.io/RxJava/javadoc/rx/subjects/BehaviorSubject.html

0

In RxJava wird subscriber.onXXX genannt asynchronous.It bedeutet, dass, wenn Ihr beobachtbare emit Artikel im neuen Thread, können Sie nie das letzte Element, bevor zurück, außer Sie blockieren den Thread und warten auf das Element. Aber wenn die Observable Element synchron ausgeben und Sie nicht ändern, es ist Thread von SubscribeOn und ObservOn, wie der Code:

Observable.just(1,2,3).subscribe(); 

In diesem Fall können Sie das letzte Element erhalten, indem wie dies zu tun:

Integer getLast(Observable<Integer> o){ 
    final int[] ret = new int[1]; 
    Observable.last().subscribe(i -> ret[0] = i); 
    return ret[0]; 
} 

Es ist eine schlechte Idee, wie this.RxJava tun Sie bevorzugen, indem sie es asynchrone Arbeit zu tun.

+1

wird die meiste Zeit nicht funktionieren – ndori

0

Was Sie hier eigentlich erreichen möchten, ist eine asynchrone Aufgabe zu übernehmen und in eine synchrone umzuwandeln.

Es gibt mehrere Möglichkeiten, es zu erreichen, jeder mit seinen Vor-und Nachteile:

  • Verwendung toBlocking() - es bedeutet, dass dieser Thread blockiert wird, bis der Strom Finish ist, um nur eins zu bekommen verwenden Sie einfach first(), da es abgeschlossen wird, sobald ein Artikel geliefert wird. Ihr gesamte Strom ist Observable<T> getData(); dann ein Verfahren sagen lassen, die unmittelbar auf den letzten Wert erhalten werden wie folgt aussehen:

public T getLastItem(){ return getData().toBlocking().first(); }

benutzen Sie bitte nicht last(), wie es für den Stream warten abzuschließen und nur dann wird das letzte Element emittieren.

Wenn es sich bei Ihrem Stream um eine Netzwerkanforderung handelt und noch kein Element gefunden wurde, wird Ihr Thread blockiert! Verwenden Sie ihn also nur, wenn Sie sicher sind, dass ein Element sofort verfügbar ist (oder wenn Sie wirklich eine Blockierung wünschen) ...)

  • eine weitere Option ist es, einfach das letzte Ergebnis zwischenspeichern, so etwas wie folgt aus:.

    getData() abonnieren (t-> cachedT = t;) // irgendwo im Code und es speichert den zuletzt gelieferten Artikel weiter public T getLastItem() { return cachedT; }

wenn gab es keine Einzelteil durch die Zeit, die Sie anfordern gesendet werden Sie null erhalten oder was auch immer Anfangswert Sie eingestellt haben. Das Problem mit diesem Ansatz ist, dass die Subscribe-Phase nach dem Holen passieren könnte und eine Race-Bedingung machen könnte, wenn sie in 2 verschiedenen Threads verwendet wird.