2016-07-05 7 views
2

Ich habe endlosen Stream (der nicht onComplete überhaupt aufgerufen wird). Und ich cachen letzten Wert darin:Wie CachedObservable heruntergefahren werden

Observable<T> endlessStream = createStream().cache(); 

Subscription s1 = endlessStream.subscribe(...) 
Subscription s2 = endlessStream.subscribe(...) 

Durch einen Zustand endlessStream nicht gültig und ich ersetzen (mit switchMap, aber es ist nicht wichtig).

s1.unsubscribe() 
s2.unsubscribe() 

Aber CachedObservable wird immer speichern Verbindung zum Quellenstrom (wieder von createStream()). Dies führt zu einem Speicherleck. So trennen Sie CachedObservable von der Quelle beobachtbar?

Weitere Informationen:

CachedObservable enthält Feld state, die SerialSubscription zu Quelle Observablen enthalten (connection). Wenn ich nächste Hack nennen, alles wird OK:

private void disconnectCachedObservable(CachedObservable<T> observable) { 
    try { 
     Field fieldState = CachedObservable.class.getDeclaredField("state"); 
     fieldState.setAccessible(true); 
     Object state = fieldState.get(observable); 
     Field fieldConnection = state.getClass().getDeclaredField("connection"); 
     fieldConnection.setAccessible(true); 
     SerialSubscription subscription = (SerialSubscription) fieldConnection.get(state); 
     subscription.unsubscribe(); 
    } catch (NoSuchFieldException e) { 
     e.printStackTrace(); 
    } catch (IllegalAccessException e) { 
     e.printStackTrace(); 
    } 
} 

Aber Reflexion ist nicht gut Lösung :(

Antwort

0

gefunden Lösung:

Operator Cache haben ein ähnliches Verhalten wie Operator Replay + autoconnect (1).

replay().autoConnect(1, toStop -> { /* store Subscription to cancel later */ }); 

, die wir an, Sie, indem Sie ein Abonnement der Upstream stoppen.