2016-04-14 14 views
0

Versuch, ein System zu modellieren, das Benachrichtigungen von mehreren Herausgebern mit RX sendet.Mehrere benutzerdefinierte Observables in RX zusammenfügen

Ich habe zwei benutzerdefinierte Schnittstellen ITopicObservable und ITopicObserver, um die Tatsache zu modellieren, dass die implementierenden Klassen andere Eigenschaften und Methoden als die IObservable- und IObserver-Schnittstellen haben.

Das Problem, das ich habe, ist, dass mein Denken ist, dass ich in der Lage sein sollte, eine Anzahl von Observablen zusammenzufügen, sie zusammenzufassen und einen Beobachter zu abonnieren, um Updates von allen zusammengeführten Observablen bereitzustellen. Der Code mit dem Kommentar "issue" löst jedoch eine ungültige Cast-Ausnahme aus. Der Anwendungsfall ist eine Anzahl von unabhängigen Sensoren, die zum Beispiel jeweils eine Temperatur in einer Box überwachen, die alle ihre Berichte zu einem Temperaturbericht zusammenfasst, der dann von einem Temperaturzustandsmonitor abonniert wird.

Was fehlt mir hier? Oder gibt es eine bessere Möglichkeit, das Szenario mit RX zu implementieren?

-Code unten

using System; 
using System.Reactive.Linq; 
using System.Collections.Generic; 

namespace test 
{ 
class MainClass 
{ 
    public static void Main (string[] args) 
    { 
     Console.WriteLine ("Hello World!"); 
     var to = new TopicObserver(); 
     var s = new TopicObservable ("test"); 

     var agg = new AggregatedTopicObservable(); 
     agg.Add (s); 

     agg.Subscribe (to); 
    } 
} 

public interface ITopicObservable<TType>:IObservable<TType> 
{ 
    string Name{get;} 
} 

public class TopicObservable:ITopicObservable<int> 
{ 
    public TopicObservable(string name) 
    { 
     Name = name; 
    } 
    #region IObservable implementation 
    public IDisposable Subscribe (IObserver<int> observer) 
    { 
     return null; 
    } 
    #endregion 
    #region ITopicObservable implementation 
    public string Name { get;private set;} 

    #endregion 
} 

public class AggregatedTopicObservable:ITopicObservable<int> 
{ 
    List<TopicObservable> _topics; 
    private ITopicObservable<int> _observable; 
    private IDisposable _disposable; 

    public AggregatedTopicObservable() 
    { 
     _topics = new List<TopicObservable>(); 
    } 

    public void Add(ITopicObservable<int> observable) 
    { 
     _topics.Add ((TopicObservable)observable); 
    } 

    #region IObservable implementation 
    public IDisposable Subscribe (IObserver<int> observer) 
    { 
     _observable = (ITopicObservable<int>)_topics.Merge(); 

     _disposable = _observable.Subscribe(observer); 

     return _disposable; 
    } 
    #endregion 
    #region ITopicObservable implementation 
    public string Name { get;private set;} 
    #endregion 

} 



public interface ITopicObserver<TType>:IObserver<TType> 
{ 
    string Name{get;} 
} 

public class TopicObserver:ITopicObserver<int> 
{ 
    #region IObserver implementation 
    public void OnNext (int value) 
    { 
     Console.WriteLine ("next {0}", value); 
    } 
    public void OnError (Exception error) 
    { 
     Console.WriteLine ("error {0}", error.Message); 
    } 
    public void OnCompleted() 
    { 
     Console.WriteLine ("finished"); 
    } 
    #endregion 
    #region ITopicObserver implementation 
    public string Name { get;private set;} 
    #endregion 

} 
} 

Antwort

1

die Unterschrift des .Merge(...) Operator Schicht, die Sie verwenden ist:

IObservable<TSource> Merge<TSource>(this IEnumerable<IObservable<TSource>> sources) 

Der tatsächliche Typ dieses .Merge() zurückgekehrt ist:

System.Reactive.Linq.ObservableImpl.Merge`1[System.Int32] 

... so sollte es ziemlich klar sein, dass der Aufruf (ITopicObservable<int>)_topics.Merge(); fehlschlagen würde.

Lees Empfehlung, eines von IObservable<> oder IObserver<> nicht zu implementieren, ist das richtige. Es führt zu Fehlern wie dem obigen.

Wenn Sie etwas zu tun hätte, würde ich es auf diese Weise tun:

public interface ITopic 
{ 
    string Name { get; } 
} 

public interface ITopicObservable<TType> : ITopic, IObservable<TType> 
{ } 

public interface ITopicSubject<TType> : ISubject<TType>, ITopicObservable<TType> 
{ } 

public interface ITopicObserver<TType> : ITopic, IObserver<TType> 
{ } 

public class Topic 
{ 
    public string Name { get; private set; } 

    public Topic(string name) 
    { 
     this.Name = name; 
    } 
} 

public class TopicSubject : Topic, ITopicSubject<int> 
{ 
    private Subject<int> _subject = new Subject<int>(); 

    public TopicSubject(string name) 
     : base(name) 
    { } 

    public IDisposable Subscribe(IObserver<int> observer) 
    { 
     return _subject.Subscribe(observer); 
    } 

    public void OnNext(int value) 
    { 
     _subject.OnNext(value); 
    } 

    public void OnError(Exception error) 
    { 
     _subject.OnError(error); 
    } 

    public void OnCompleted() 
    { 
     _subject.OnCompleted(); 
    } 
} 

public class AggregatedTopicObservable : Topic, ITopicObservable<int> 
{ 
    List<ITopicObservable<int>> _topics = new List<ITopicObservable<int>>(); 

    public AggregatedTopicObservable(string name) 
     : base(name) 
    { } 

    public void Add(ITopicObservable<int> observable) 
    { 
     _topics.Add(observable); 
    } 

    public IDisposable Subscribe(IObserver<int> observer) 
    { 
     return _topics.Merge().Subscribe(observer); 
    } 
} 

public class TopicObserver : Topic, ITopicObserver<int> 
{ 
    private IObserver<int> _observer; 

    public TopicObserver(string name) 
     : base(name) 
    { 
     _observer = 
      Observer 
       .Create<int>(
        value => Console.WriteLine("next {0}", value), 
        error => Console.WriteLine("error {0}", error.Message), 
        () => Console.WriteLine("finished")); 
    } 

    public void OnNext(int value) 
    { 
     _observer.OnNext(value); 
    } 
    public void OnError(Exception error) 
    { 
     _observer.OnError(error); 
    } 
    public void OnCompleted() 
    { 
     _observer.OnCompleted(); 
    } 
} 

und führen Sie es mit:

var to = new TopicObserver("watching"); 
var ts1 = new TopicSubject("topic 1"); 
var ts2 = new TopicSubject("topic 2"); 

var agg = new AggregatedTopicObservable("agg"); 

agg.Add(ts1); 
agg.Add(ts2); 

agg.Subscribe(to); 

ts1.OnNext(42); 
ts1.OnCompleted(); 

ts2.OnNext(1); 
ts2.OnCompleted(); 

Welche gibt:

 
next 42 
next 1 
finished 

Aber Abgesehen davon, dass ich alles einen Namen geben kann (was ich nicht weiß, wie es hilft), könntest du das immer tun:

var to = 
    Observer 
     .Create<int>(
      value => Console.WriteLine("next {0}", value), 
      error => Console.WriteLine("error {0}", error.Message), 
      () => Console.WriteLine("finished")); 

var ts1 = new Subject<int>(); 
var ts2 = new Subject<int>(); 

var agg = new [] { ts1, ts2 }.Merge(); 

agg.Subscribe(to); 

ts1.OnNext(42); 
ts1.OnCompleted(); 

ts2.OnNext(1); 
ts2.OnCompleted(); 

Gleiche Ausgabe ohne Schnittstellen und Klassen.

Es gibt sogar einen interessanteren Weg. Versuchen Sie folgendes:

var to = 
    Observer 
     .Create<int>(
      value => Console.WriteLine("next {0}", value), 
      error => Console.WriteLine("error {0}", error.Message), 
      () => Console.WriteLine("finished")); 

var agg = new Subject<IObservable<int>>(); 

agg.Merge().Subscribe(to); 

var ts1 = new Subject<int>(); 
var ts2 = new Subject<int>(); 

agg.OnNext(ts1); 
agg.OnNext(ts2); 

ts1.OnNext(42); 
ts1.OnCompleted(); 

ts2.OnNext(1); 
ts2.OnCompleted(); 

var ts3 = new Subject<int>(); 

agg.OnNext(ts3); 

ts3.OnNext(99); 
ts3.OnCompleted(); 

Dies erzeugt:

 
next 42 
next 1 
next 99 

Es ermöglicht Sie, neue Quelle Observablen nach der Zusammenführung hinzuzufügen!

+0

Vielen Dank dafür! Um die Frage nach der Verwendung der Schnittstelle zu beantworten, haben wir zum Beispiel eine Anzahl von Temperaturfühlern, die verschiedene Teile eines Systems überwachen. Jede dieser Sonden hat eine eindeutige ID/einen eindeutigen Namen. Sie protokollieren ihre Temperaturen unabhängig voneinander in der Datei. Es würde einen aggregierten Temperaturmonitor geben, der effektiv das gesamte System überwacht, wo ein AggregatedTopic ins Spiel kommt. Es würde auch eine andere Entität (Observer) geben, die basierend auf den AggregatedTopic-Informationen entscheiden würde, Dinge ein- oder auszuschalten. All dies mit Protokollierung. Gibt es einen saubereren Weg, dies zu erreichen? – Bernard

+0

@Bernard - Ja, es gibt einen viel einfacheren Weg, es zu tun. Sie sollten die Schnittstellen nicht erweitern. Stattdessen sollten Sie ein benutzerdefiniertes Objekt erstellen, das die ID, den Namen und den produzierten Wert enthält. Dann erstellen Sie Observables, die dieses benutzerdefinierte Objekt zurückgeben - das Sie sehr einfach zusammenführen können. Sie sollten das wirklich als separate Frage stellen und ich würde Ihnen gerne eine Antwort geben. Wenn Sie das tun können, geben Sie bitte an, wie Sie wissen, wann die Temperaturfühler einen Wert erzeugen (zeigt auch den Code an). – Enigmativity

+0

Ich habe eine neue Frage http://stackoverflow.com/questions/36723106/building-a-sensor-monitoring-system-using-rx wie angefordert erstellt. Vielen Dank für die Hilfe! – Bernard

2

Mein erster Gedanke ist, dass Sie nicht IObservable<T> implementieren sollten, können Sie es durch Aussetzen es als Eigenschaft oder das Ergebnis eines Verfahrens zusammensetzen sollte.

Der zweite Gedanke ist, dass es in Rx Operatoren gibt, die beim Zusammenführen/Zusammenfassen mehrerer Sequenzen ausgezeichnet sind. Sie sollten diese bevorzugen.

Drittens, die mit dem ersten ähnlich ist, Sie in der Regel nicht IObserver<T> implementieren tun, müssen Sie nur auf die beobachtbaren Sequenz abonnieren und Teilnehmer bieten für jeden Anruf zurück (OnNext, OnError und OnComplete)

So Ihr Code im Grunde auf

reduziert
Console.WriteLine("Hello World!"); 
var topic1 = TopicListener("test1"); 
var topic2 = TopicListener("test2"); 

topic1.Merge(topic2) 
    .Subscribe(
    val => { Console.WriteLine("One of the topics published this value {0}", val);}, 
    ex => { Console.WriteLine("One of the topics errored. Now the whole sequence is dead {0}", ex);}, 
    () => {Console.WriteLine("All topics have completed.");}); 

Wo TopicListener(string) nur eine Methode ist, die IObservable<T> zurückgibt. Die Implementierung der TopicListener(string) Methode würde höchstwahrscheinlich Observable.Create verwenden.

Es kann hilfreich sein, Beispiele für das Mapping von Rx über ein themenbasiertes Messaging-System zu sehen. Es ist ein Beispiel dafür, wie Sie Rx über TibRv Themen hier https://github.com/LeeCampbell/RxCookbook/blob/master/IO/Comms/TibRvSample.linq

+0

Danke dafür! Ich schätze die klare Antwort, aber ich denke @Enigmatismus war ein wenig beschreibender. – Bernard

+0

Ja, seine Antwort ist sehr detailliert. Sei aber dieser 'Themen' überdrüssig! –

+0

Campell, was sind die Schattenseiten? – Bernard