2016-07-21 6 views
0

Ich bin auf der Suche nach einer Observable Implementierung, die ein oder mehrere Observables enthält und führt sie zusammen. Aber hier ist der Kicker: Ich möchte hinzufügen mehr Observables zu jeder Zeit zusammengeführt werden, und ich denke, es könnte ebenso gut entfernen sie auch.RxJava - Zusammengeführtes Observable, das zu jeder Zeit mehr Observables akzeptiert?

Damit es wirklich effektiv ist, müssen alle Abonnenten Benachrichtigungen von neuen Observablen erhalten, die nach der Anmeldung hinzugefügt werden. Wenn nicht alle verschmolzenen Observables kalt sind und onComplete() anrufen, dann ist es in Ordnung, die Abonnements abzubestellen, auch wenn mehr Observables hinzugefügt werden. Dies ist mehr für das Zusammenführen mehrerer unbegrenzter heißer Observables und das Hinzufügen von mehr zu jeder Zeit.

MergableObservable<MyEvent> allSources = new MergableObservable<>(); 

//later in application 
Observable<MyEvent> eventSource1 = ... 
allSources.add(eventSource1); 

//and later again 
Observable<MyEvent> eventSource2 = ... 
allSources.add(eventSource2); 

//and so on 
Observable<MyEvent> eventSource3 = ... 
allSources.add(eventSource3); 

Ich weiß, dass es Merging-Operatoren gibt, aber ich brauche eine veränderbare Struktur. Fehle ich etwas, das bereits existiert? Ich würde es vorziehen, keine Themen zu verwenden, es sei denn, es ist absolut angemessen für diese Situation.

Antwort

2

Sie nicht Themen vermeiden können, weil Sie manuell neu wollen Observable Quellen schieben und sie nicht „natürlich“ erzeugen.

Subject<Observable<T>, Observable<T>> o = PublishSubject.<Observable<T>>().toSerialized(); 

ConcurrentHashSet<Observable<T>> live = ... 

o.flatMap(v -> v.takeWhile(x -> live.containsKey(v))).subscribe(...); 

Observable<T> inner = ... 
live.add(inner); 
o.onNext(inner); 

//... 

live.remove(inner); 
+0

Schön. Das ist ziemlich schlau. – tmn

0

Vielleicht sind Themen der vernünftigste Weg, damit umzugehen. Ich bin offen für andere Antworten, wenn jemand einen besseren Weg kennt. Es bringt mich um, es gibt keine eingebaute Methode, dies zu tun. Ich stelle mir vor, es wäre ein gemeinsames Bedürfnis.

Kotlin Implementierung

class MergingObservable<T> { 

    private val subject: SerializedSubject<T, T> = PublishSubject<T>().toSerialized() 

    fun toObservable(): Observable<T> = subject 

    operator fun plusAssign(observable: Observable<T>) { 
     add(observable) 
    } 

    fun add(observable: Observable<T>): Subscription = observable.subscribe(subject) 
}