2016-07-26 16 views
0

Ich habe einen Strom mit Buchstaben (A-Z) und Zahlen (1-9). Ich möchte Briefe, die innerhalb eines Timeouts eingehen (dies kann sich ändern), zusammenfügen und immer sofort Nummern ausgeben. Können Sie mir vorschlagen, welche Funktionen am besten geeignet sind?Puffer ausgewählte Nachrichten mit variabler Zeitüberschreitung

Beispielarbeitscode (nicht sicher, ob dies richtig ist und/oder eine gute Lösung):

private BehaviorSubject<TimeSpan> sTimeouts = new BehaviorSubject<TimeSpan>(0.ms()); 

private IObservable<string> lettersJoined(IObservable<char> ob) 
{ 
    return Observable.Create<string>(observer => 
    { 
     var letters = new List<char>(); 
     var lettersFlush = new SerialDisposable(); 

     return ob.Subscribe(c => 
     { 
      if (char.IsUpper(c)) 
      { 

       if ((await sTimeouts.FirstAsync()).Ticks > 0) 
       { 
        letters.Add(c); 

        lettersFlush.Disposable = 
         VariableTimeout(sTimeouts) 
         .Subscribe(x => { 
          observer.OnNext(String.Concat(letters)); 
          letters.Clear(); 
         }); 

       } 
       else 
        observer.OnNext(letters.ToString()); 


      } 
      else if (char.IsDigit(c)) 
       observer.OnNext(c.ToString()); 
     } 

    } 
} 


private IObservable<long> VariableTimeout(IObservable<TimeSpan> timeouts) 
{ 
    return Observable.Create<long>(obs => 
    { 
     var sd = new SerialDisposable(); 
     var first = DateTime.Now; 

     return timeouts 
      .Subscribe(timeout => 
      { 
       if (timeout.Ticks == 0 || first + timeout < DateTime.Now) 
       { 
        sd.Disposable = null; 
        obs.OnNext(timeout.Ticks); 
        obs.OnCompleted(); 
       } 
       else 
       { 
        timeout -= DateTime.Now - first; 

        sd.Disposable = 
         Observable 
         .Timer(timeout) 
         .Subscribe(t => { 
          obs.OnNext(t); 
          obs.OnCompleted(); 
         }); 
       } 
      }); 

    }); 
} 

private void ChangeTimeout(int timeout) 
{ 
    sTimeouts.OnNext(timeout.ms()) 
} 


// I use the following extension method 
public static class TickExtensions 
{ 
    public static TimeSpan ms(this int ms) 
    { 
     return TimeSpan.FromMilliseconds(ms); 
    } 
} 

das Timeout zu ändern, kann ich einfach die privaten Timeout Variable ändern, aber wahrscheinlich ein Thema für sich wäre OK wenn nötig/besser.

UPDATE

var scheduler = new TestScheduler(); 

var timeout = scheduler.CreateColdObservable<int>(
    ReactiveTest.OnNext(0000.Ms(), 2000), 
    ReactiveTest.OnNext(4300.Ms(), 1000)); 

var input = scheduler.CreateColdObservable<char>(
    ReactiveTest.OnNext(0100.Ms(), '1'), 
    ReactiveTest.OnNext(1600.Ms(), '2'), 
    ReactiveTest.OnNext(1900.Ms(), 'A'), 
    ReactiveTest.OnNext(2100.Ms(), 'B'), 
    ReactiveTest.OnNext(4500.Ms(), 'C'), 
    ReactiveTest.OnNext(5100.Ms(), 'A'), 
    ReactiveTest.OnNext(5500.Ms(), '5'), 
    ReactiveTest.OnNext(6000.Ms(), 'B'), 
    ReactiveTest.OnNext(7200.Ms(), '1'), 
    ReactiveTest.OnNext(7500.Ms(), 'B'), 
    ReactiveTest.OnNext(7700.Ms(), 'A'), 
    ReactiveTest.OnNext(8400.Ms(), 'A')); 

var expected = scheduler.CreateColdObservable<string>(
    ReactiveTest.OnNext(0100.Ms(), "1"), 
    ReactiveTest.OnNext(1600.Ms(), "2"), 
    ReactiveTest.OnNext(4100.Ms(), "AB"), 
    ReactiveTest.OnNext(5500.Ms(), "5"), 
    ReactiveTest.OnNext(7000.Ms(), "CAB"), 
    ReactiveTest.OnNext(7200.Ms(), "1"), 
    ReactiveTest.OnNext(9400.Ms(), "BAA")); 


// if ReactiveTest.OnNext(3800.Ms(), 1000) 
// then expected is ReactiveTest.OnNext(3800.Ms(), "AB") 

UPDATE # 2

Refined Lösung

Antwort

2

Mehrere Dinge, die hier helfen kann.

Erste Marmor-Diagramme sind schön für die Visualisierung des Problems, aber wenn Sie beweisen, ob etwas funktioniert oder nicht, lassen Sie uns vorschreibend und Unit-Test mit ITestableObservable<T> Instanzen.

Zweitens bin ich mir nicht sicher, was Ihre Lösung sein sollte. Wenn ich mir Ihre Marmordiagramme ansehe, sehe ich Diskrepanzen. Hier habe ich eine Zeitleiste zur Visualisierung hinzugefügt.

    111111111122222222223 
Time: 123456789
Input: 1---2--A-B----C--A-B-1--B-A--A 
Output: 1---2----AB-------CAB-1-----BAA 

Hier sehe ich die „AB“ Ausgang am Gerät veröffentlicht 10. Dann sehe ich die „CAB“ -Ausgabe am Gerät veröffentlicht 19. Weiter sehe ich die "BAA" Ausgabe veröffentlicht in der Einheit 29. Aber Sie schlagen vor, diese sollten bei konstanten Timeouts auseinander auftreten. Also dann denke ich, es ist vielleicht die Kluft zwischen den Werten, die wichtig ist, aber das scheint auch nicht zu summieren. Das führt mich nur zu meinem obigen Punkt zurück. Bitte geben Sie einen Komponententest an, der bestanden oder nicht bestanden werden kann.

Drittens könnten Sie es in Bezug auf Ihre Implementierung etwas besser machen, indem Sie den Typ SerialDisposable für den Typ lettersFlush verwenden.

mir den Unit-Test einrichten Damit ich den folgenden Codeblock erstellen

var scheduler = new TestScheduler(); 
var input = scheduler.CreateColdObservable<char>(
    ReactiveTest.OnNext(0100.Ms(), '1'), 
    ReactiveTest.OnNext(0500.Ms(), '2'), 
    ReactiveTest.OnNext(0800.Ms(), 'A'), 
    ReactiveTest.OnNext(1000.Ms(), 'B'), 
    ReactiveTest.OnNext(1500.Ms(), 'C'), 
    ReactiveTest.OnNext(1800.Ms(), 'A'), 
    ReactiveTest.OnNext(2000.Ms(), 'B'), 
    ReactiveTest.OnNext(2200.Ms(), '1'), 
    ReactiveTest.OnNext(2500.Ms(), 'B'), 
    ReactiveTest.OnNext(2700.Ms(), 'A'), 
    ReactiveTest.OnNext(3000.Ms(), 'A')); 

var expected = scheduler.CreateColdObservable<string>(
    ReactiveTest.OnNext(0100.Ms(), "1"), 
    ReactiveTest.OnNext(0500.Ms(), "2"), 
    ReactiveTest.OnNext(1000.Ms(), "AB"), 
    ReactiveTest.OnNext(2000.Ms(), "CAB"), 
    ReactiveTest.OnNext(2200.Ms(), "1"), 
    ReactiveTest.OnNext(3000.Ms(), "BAA")); 

ich einige Freiheiten genommen habe, um einige Werte zu ändern, was ich denke, Sie durch Ihren Marmor Diagramme gemeint.

Wenn ich dann die sehr gute Antwort von @Shlomo nutze, sehe ich weitere Probleme mit der Verwendung von unscharfen Marmordiagrammen. Da die Puffergrenze auftreten müsste, nachdem der letzte einzubeziehende Wert auftritt, müssen diese Fenster auf einmal geschlossen werden.

void Main() 
{ 
    var scheduler = new TestScheduler(); 
    var input = scheduler.CreateColdObservable<char>(
     ReactiveTest.OnNext(0100.Ms(), '1'), 
     ReactiveTest.OnNext(0500.Ms(), '2'), 
     ReactiveTest.OnNext(0800.Ms(), 'A'), 
     ReactiveTest.OnNext(1000.Ms(), 'B'), 
     ReactiveTest.OnNext(1500.Ms(), 'C'), 
     ReactiveTest.OnNext(1800.Ms(), 'A'), 
     ReactiveTest.OnNext(2000.Ms(), 'B'), 
     ReactiveTest.OnNext(2200.Ms(), '1'), 
     ReactiveTest.OnNext(2500.Ms(), 'B'), 
     ReactiveTest.OnNext(2700.Ms(), 'A'), 
     ReactiveTest.OnNext(3000.Ms(), 'A')); 

    var expected = scheduler.CreateColdObservable<string>(
     ReactiveTest.OnNext(0100.Ms(), "1"), 
     ReactiveTest.OnNext(0500.Ms(), "2"), 
     ReactiveTest.OnNext(1000.Ms()+1, "AB"), 
     ReactiveTest.OnNext(2000.Ms()+1, "CAB"), 
     ReactiveTest.OnNext(2200.Ms(), "1"), 
     ReactiveTest.OnNext(3000.Ms()+1, "BAA")); 

    /* 
        111111111122222222223 
    Time: 123456789
    Input: 1---2--A-B----C--A-B-1--B-A--A 
    Output: 1---2----AB-------CAB-1-----BAA 
    */ 

    var bufferBoundaries = //Observable.Timer(TimeSpan.FromSeconds(1), scheduler); 
      //Move to a hot test sequence to force the windows to close just after the values are produced 
      scheduler.CreateHotObservable<Unit>(
     ReactiveTest.OnNext(1000.Ms()+1, Unit.Default), 
     ReactiveTest.OnNext(2000.Ms()+1, Unit.Default), 
     ReactiveTest.OnNext(3000.Ms()+1, Unit.Default), 
     ReactiveTest.OnNext(4000.Ms()+1, Unit.Default)); 

    var publishedFinal = input 
     .Publish(i => i 
      .Where(c => char.IsLetter(c)) 
      .Buffer(bufferBoundaries) 
      .Where(l => l.Any()) 
      .Select(lc => new string(lc.ToArray())) 
      .Merge(i 
       .Where(c => char.IsNumber(c)) 
       .Select(c => c.ToString()) 
      ) 
     ); 

    var observer = scheduler.CreateObserver<string>(); 

    publishedFinal.Subscribe(observer); 
    scheduler.Start(); 

    //This test passes with the "+1" values hacked in. 
    ReactiveAssert.AreElementsEqual(
     expected.Messages, 
     observer.Messages); 

} 

// Define other methods and classes here 
public static class TickExtensions 
{ 
    public static long Ms(this int ms) 
    { 
     return TimeSpan.FromMilliseconds(ms).Ticks; 
    } 
} 

Ich nehme mein Punkt ist, dass Rx deterministisch ist, deshalb können wir Tests erstellen, die deterministisch sind. Während also Ihre Frage sehr gut ist und ich glaube, dass @Shlomo eine solide endgültige Antwort liefert, können wir besser als nur unscharfe Marmordiagramme und unter Verwendung von Random in unseren Beispielen/Tests. Genau hier zu sein, sollte dazu beitragen, dumme Rennbedingungen in der Produktion zu verhindern, und dem Leser helfen, diese Lösungen besser zu verstehen.

+0

Faire Kritik. Vielen Dank. Ich bin in der dunklen, alten Kunst des Hand-Waving ™ gut geschult und schlechte Gewohnheiten sterben schwer. – Shlomo

+0

Danke Lee, sehr nützliche Kommentare in Ihrer Antwort. Ich werde es so schnell wie möglich analysieren, in der Zwischenzeit habe ich meine Frage verbessert. – zpul

2

Angenommen sampleInput als Probeneingabe während Pufferung Timeout Änderung korrekt Stütz:

var charStream = "12ABCAB1BAA".ToObservable(); 
var random = new Random(); 
var randomMilliTimings = Enumerable.Range(0, 12) 
    .Select(i => random.Next(2000)) 
    .ToList(); 

var sampleInput = charStream 
    .Zip(randomMilliTimings, (c, ts) => Tuple.Create(c, TimeSpan.FromMilliseconds(ts))) 
    .Select(t => Observable.Return(t.Item1).Delay(t.Item2)) 
    .Concat(); 

Zuerst stattdessen eine veränderbare Variable zu verändern, wäre es am besten, um stattdessen etwas Strom zu erzeugen, um Ihre Pufferfenster darstellen:

Input: 1---2--A-B----C--A-B-1--B-A--A 
Window: ---------*--------*---------*-- 
Output: 1---2----AB-------CAB-1-----BAA 

ich einen Strom von Inkrementieren TimeSpan s erzeugt und nannte es bufferBoundaries wie so zu demonstrieren, :

var bufferBoundaries = Observable.Range(1, 20) 
    .Select(t => Observable.Return(t).Delay(TimeSpan.FromSeconds(t))) 
    .Concat(); 

Dies würde wie folgt aussehen:

Seconds: 0--1--2--3--4--5--6--7--8--9--10 
BB  : ---1-----2--------3-----------4- 

... weiter wollen Sie aufteilen, dass sampleInput in einzelne Ströme für Buchstaben und Zahlen, und sie entsprechend behandeln:

var letters = sampleInput 
    .Where(c => char.IsLetter(c)) 
    .Buffer(bufferBoundaries) 
    .Where(l => l.Any()) 
    .Select(lc => new string(lc.ToArray())); 

var numbers = sampleInput 
    .Where(c => char.IsNumber(c)) 
    .Select(c => c.ToString()); 

Als nächstes fusionieren die beiden Ströme zusammen:

var finalOutput = letters.Merge(numbers); 

Schließlich ist es im Allgemeinen keine gute Idee, zweimal die gleiche Eingabe (in unserem Fall sampleInput) zu abonnieren, wenn Sie es helfen können. Also in unserem Fall sollten wir letters ersetzen, numbers und finalOutput mit den folgenden:

var publishedFinal = sampleInput 
    .Publish(_si => _si 
     .Where(c => char.IsLetter(c)) 
     .Buffer(bufferBoundaries) 
     .Where(l => l.Any()) 
     .Select(lc => new string(lc.ToArray())) 
     .Merge(_si 
      .Where(c => char.IsNumber(c)) 
      .Select(c => c.ToString()) 
     ) 
    ); 
+0

Danke für die interessante Antwort, ich muss es gründlich analysieren, um zu verstehen, ob es wirklich auf meinen realen Fall (komplexer) anwendbar ist. In der Zwischenzeit habe ich die Frage verbessert. – zpul