2016-05-17 11 views
1

Ich versuche, eine beliebige Anzahl von Streams in Rx, zip, wo Elemente entsprechen, aber möglicherweise außerhalb der Reihenfolge verarbeitet werden. Die Elemente jedes Streams haben eine Kennung, mit der sie miteinander übereinstimmen können. Z.B. Elemente wie folgt aussehen:Rx kombiniert viele Streams durch Beitritt zu einer Eigenschaft

public class Element 
{ 
    public string Key {get; set;} 
} 

Normalerweise zip wird durch ihren Index des Auftretens kombinieren Elemente nur:

|-A-----------A 
|--B---------B- 
|-----C------C- 
|-----ABC-----ABC <- zip 

Aber was, wenn wir Elemente nur übereinstimmen soll, die den gleichen Schlüssel teilen? Ich suche nach einer Sequenz, die mehr wie das funktioniert:

(In diesem Beispiel ist der Schlüssel 1 oder 2)

|--2A-------1A---------- 
|----1B----------2B----- 
|------1C-----------2C-- 
|-----------1ABC----2ABC <- zipped by key 1 & 2 respectively 

Ich glaube, dass GroupJoin dieses Szenario passt, aber es dient nur zwei Observable und sie zu verketten war ziemlich schnell außer Kontrolle geraten.

Ich schaute auch auf And/Then/Wann, aber nicht wirklich verstanden, wie es für dieses Szenario zu strukturieren.

Idealerweise möchte ich eine Erweiterungsmethode aufrufen, die ich aufrufen kann, und einen Ergebnisselektor bereitstellen, bei dem die Eingaben des Ergebnisselektors garantiert denselben Schlüssel haben.

Wie würden Sie dieses Problem angehen?

Antwort

0

Hier ist etwas, was ich in LinqPad zusammen getroffen habe. Es entspricht Ihren Anforderungen Ihres Marmordiagramms. Es ist jedoch unordentlicher als ich möchte.

Nuget Abhängigkeiten von Rx-Testing

void Main() 
{ 
    TestScheduler scheduler = new TestScheduler(); 
    /* 
|--2A-------1A---------- 
|----1B----------2B----- 
|------1C-----------2C-- 
|-----------1ABC----2ABC <- zipped by key 1 & 2 respectively 

    */ 
    var sourceA = scheduler.CreateColdObservable(
     ReactiveTest.OnNext(3, "2A"), 
     ReactiveTest.OnNext(12, "1A")); 
    var sourceB = scheduler.CreateColdObservable(
     ReactiveTest.OnNext(5, "1B"), 
     ReactiveTest.OnNext(17, "2B")); 
    var sourceC= scheduler.CreateColdObservable(
     ReactiveTest.OnNext(7, "1C"), 
     ReactiveTest.OnNext(20, "2C")); 

    var observer = scheduler.CreateObserver<string>(); 


    var query = Observable.Merge(sourceA, sourceB, sourceC) 
     .GroupBy(x => GetKey(x)) 
     .SelectMany(grp => grp.Select(x=>GetValue(x)) 
           .Take(3) 
           .Aggregate(new List<string>(), 
             (accumulator, current) => { 
              accumulator.Add(current); 
              return accumulator; 
             }) 
          .Select(acc=>CreateGroupResult(grp.Key, acc))); 

    query.Subscribe(observer); 
    scheduler.Start(); 

    ReactiveAssert.AreElementsEqual(
     new[]{ 
      ReactiveTest.OnNext(12, "1ABC"), 
      ReactiveTest.OnNext(20, "2ABC") 
     }, 
     observer.Messages 
    ); 

} 

// Define other methods and classes here 
private static string CreateGroupResult(string key, IEnumerable<string> values) 
{ 
    var combinedOrderedValues = string.Join(string.Empty, values.OrderBy(v => v)); 
    return string.Format("{0}{1}", key, combinedOrderedValues); 
} 

private static string GetKey(string message) 
{ 
    return message.Substring(0, 1); 
} 

private static string GetValue(string message) 
{ 
    return message.Substring(1); 
}