2016-04-20 6 views
4

Ich schreibe einen Socket-Server, der jede Nachricht von jedem Client behandelt sollte, der damit verbunden ist.Disposable kombinierte Observables von Ereignissen

Alle Nachrichten sind in einer beobachtbaren Reihenfolge eingereiht, so dass sie abonniert und beobachtet werden können.

Um sicherzustellen, dass alle Client-Socket-Nachrichten auf dem gleichen beobachtbaren gesetzt werden ich folgende Schnipsel verwendet habe:

private Subject<IObservable<Msg>> _msgSubject; 
public IObservable<Msg> Messages { get; private set; } 

public SocketServer() 
{ 
    // ... 
    // Initialization of server socket 
    // ... 
    _msgSubject = new Subject<IObservable<Msg>>(); 
    Messages = _msgSubject.Merge(); 
} 

private void OnNewConnection(ISocket socket) 
{ 
    var evtObservable = Observable.FromEvent<Action<byte[]>, byte[]>(action => action.Invoke, h => socket.OnMessage += h, h => socket.OnMessage -= h); 

    _msgSubject.OnNext(evtObservable); 
} 
Jetzt

habe ich die (de) Zuweisung des Speichers überprüft und das Problem ist, Selbst wenn der Sockel richtig geschlossen ist, gibt es immer noch Hinweise auf das relative Observable, das dem Subjekt hinzugefügt wird; Außerdem wird die Abmeldung des Ereignisses nie aufgerufen.

Also, hier ist die Frage: Gibt es eine Möglichkeit, die Entfernung der "Steckdose Observable" vom Thema zu erzwingen?

Vielleicht etwas, um die OnComplete des Sockets beobachtbar sollte die Arbeit tun, aber wie?

Antwort

3

Momentan stellen Sie die Terminierung (OnComplete oder OnError) der inneren Observable-Sequenzen sowieso nicht aus. Sie erlauben die Aufhebung der Registrierung des Event-Handlers, wenn der Kunde das Abonnement abschafft, aber dies ist verbrauchergesteuert.

Haben Sie ein Ereignis, das auf der ISocket Schnittstelle ausgelöst wird, die das Schließen des Sockels darstellt? Wenn dies der Fall ist, können Sie einen Observable.FromEvent Wrapper hinzufügen. Verwenden Sie dann diese beobachtbare Sequenz, um die OnMessage Sequenz zu beenden.

var closeObservable = Observable.FromEvent<Action<byte[]>, byte[]>(
    action => action.Invoke, 
    h => socket.OnClose+= h, 
    h => socket.OnClose -= h); 

var msgObservable = Observable.FromEvent<Action<byte[]>, byte[]>(
    action => action.Invoke, 
    h => socket.OnMessage += h, 
    h => socket.OnMessage -= h); 

msgObservable.TakeUntil(closeObservable) 
+0

Tatsächlich tue ich! Und jetzt, wo du es erwähnst, kann ich nicht glauben, dass ich nicht gedacht habe! Vielen Dank! – Atropo