2016-07-05 11 views
3

mit dem folgenden Beispiel (Kotlin Code)Observable Operator Hinrichtungen

val subject = PublishSubject.create<Int>() 

val stream = subject.map { 
    println("mapping") 
    it * 2 
} 

stream.forEach { println("A: $it") } 
stream.forEach { println("B: $it") } 

subject.onNext(1) 
subject.onCompleted() 

Ausgang wird wieder zu verwenden sein

mapping 
A: 2 
mapping 
B: 2 

, was ich will erreichen, dass Quelle beobachtbaren abgebildet einmal werden und alle Teilnehmer Holen Sie sich das Ergebnis, aber nicht die Mapping-Operation für jede einzelne von ihnen ausführen ...

so

mapping 
A: 2 
B: 2 

In meinem Fall habe ich sehr teuer Berechnung geht, wo Latenz und Leistung kritisch, ich habe eine heiße beobachtbaren als Quelle und viele Abonnenten ...

Wie Wiederverwendung wir Betreiber Hinrichtungen? und generell verschiedene Mapping-Operationen?

Antwort

2

Sie können cache verwenden das Ergebnis der Quelle beobachtbaren für alle künftigen Teilnehmer cachen:

val stream = subject.map { 
    println("mapping") 
    it * 2 
}.cache() 

Wenn Sie eine subtilere Kontrolle darüber, wie die Dinge wollen zwischengespeichert werden, die replay wert ist in suchen.

Wenn Sie nicht jedes Element der Quelle cachen wollen beobachtbar, sondern nur erneut veröffentlichen neue Objekte können Sie verwenden publish mit autoConnect:

val stream = subject.map { 
    println("mapping") 
    it * 2 
}.publish() 
.autoConnect() 

, die die folgende Sequenz von Ereignissen gegeben:

stream.forEach { println("A: $it") } 
stream.forEach { println("B: $it") } 

subject.onNext(1) 

stream.forEach { println("C: $it") } 
subject.onNext(2) 
subject.onCompleted() 

Würde drucken:

mapping 
A: 2 
B: 2 
mapping 
A: 4 
B: 4 
C: 4 
+0

oh mein Gott das ist wirklich schlecht, ich habe nur Quellen für Betreiber überprüft, jeder einzelne Teilnehmer führt alles in der Kette ... cache tut nicht das, was ich suche, ich möchte nicht alle emissionen irgendwo speichern, ich will nur berechnen und propagate an alle anstatt für jeden berechnen :( – vach

+0

ich weiß, dass dies um einen Gefallen bitten, aber bitte Kopiere und modifiziere die OperatorMap (basic mapping operator), so dass sie den Transformer nur einmal aufruft ... Ich muss nur sehen, was der richtige Weg ist ... wenn – vach

+0

Debugging in den Workflow ist, scheint es, als ob ich falsch liege zu verstehen, wie dies funktioniert, Originalquelle beobachtbar hat zwei Beobachter, so dass die Abteilung kommt direkt von der Wurzel, wo ich erwartet, dass es einen Teilnehmer haben und das seinerseits hat andere 2 ... – vach

1

Ich habe die Lösung gefunden. Um die Ausführung der Pipe wiederzuverwenden, müssen wir sicherstellen, dass es nur einen Abonnenten gibt und dieser Abonnent alle Emissionen vom Ende der Pipe zu den Einträgen aller Abonnenten weiterleitet ... Das klingt sehr nach Subject!

Wenn wir nur 100-mal abonniert werden wir 100 Rohre von der Quelle beginnen zu beobachten, während in diesem Fall haben wir ein Rohr, die Zweige zu 100 winzigen Rohren im Ende davon ...

fun <T> Observable<T>.hub(): Observable<T> { 
    val hub = PublishSubject.create<T>() 
    this.subscribe(hub) 
    return hub 
} 

jetzt können wir das

val subject = PublishSubject.create<Int>() 

val stream = subject.map { 
    println("mapping") 
    it * 2 
} 

val hub = stream.hub() 

hub.subscribe { println("A: $it") } 
hub.subscribe { println("B: $it") } 

subject.onNext(1) 

subject.onCompleted() 

die diese

mapping 
A: 2 
B: 2 

geben Problem gelöst!