Ich habe einen Strom mit Buchstaben (A-Z) und Zahlen (1-9). Ich möchte Briefe, die innerhalb eines Timeouts eingehen (dies kann sich ändern), zusammenfügen und immer sofort Nummern ausgeben. Können Sie mir vorschlagen, welche Funktionen am besten geeignet sind?Puffer ausgewählte Nachrichten mit variabler Zeitüberschreitung
Beispielarbeitscode (nicht sicher, ob dies richtig ist und/oder eine gute Lösung):
private BehaviorSubject<TimeSpan> sTimeouts = new BehaviorSubject<TimeSpan>(0.ms());
private IObservable<string> lettersJoined(IObservable<char> ob)
{
return Observable.Create<string>(observer =>
{
var letters = new List<char>();
var lettersFlush = new SerialDisposable();
return ob.Subscribe(c =>
{
if (char.IsUpper(c))
{
if ((await sTimeouts.FirstAsync()).Ticks > 0)
{
letters.Add(c);
lettersFlush.Disposable =
VariableTimeout(sTimeouts)
.Subscribe(x => {
observer.OnNext(String.Concat(letters));
letters.Clear();
});
}
else
observer.OnNext(letters.ToString());
}
else if (char.IsDigit(c))
observer.OnNext(c.ToString());
}
}
}
private IObservable<long> VariableTimeout(IObservable<TimeSpan> timeouts)
{
return Observable.Create<long>(obs =>
{
var sd = new SerialDisposable();
var first = DateTime.Now;
return timeouts
.Subscribe(timeout =>
{
if (timeout.Ticks == 0 || first + timeout < DateTime.Now)
{
sd.Disposable = null;
obs.OnNext(timeout.Ticks);
obs.OnCompleted();
}
else
{
timeout -= DateTime.Now - first;
sd.Disposable =
Observable
.Timer(timeout)
.Subscribe(t => {
obs.OnNext(t);
obs.OnCompleted();
});
}
});
});
}
private void ChangeTimeout(int timeout)
{
sTimeouts.OnNext(timeout.ms())
}
// I use the following extension method
public static class TickExtensions
{
public static TimeSpan ms(this int ms)
{
return TimeSpan.FromMilliseconds(ms);
}
}
das Timeout zu ändern, kann ich einfach die privaten Timeout Variable ändern, aber wahrscheinlich ein Thema für sich wäre OK wenn nötig/besser.
UPDATE
var scheduler = new TestScheduler();
var timeout = scheduler.CreateColdObservable<int>(
ReactiveTest.OnNext(0000.Ms(), 2000),
ReactiveTest.OnNext(4300.Ms(), 1000));
var input = scheduler.CreateColdObservable<char>(
ReactiveTest.OnNext(0100.Ms(), '1'),
ReactiveTest.OnNext(1600.Ms(), '2'),
ReactiveTest.OnNext(1900.Ms(), 'A'),
ReactiveTest.OnNext(2100.Ms(), 'B'),
ReactiveTest.OnNext(4500.Ms(), 'C'),
ReactiveTest.OnNext(5100.Ms(), 'A'),
ReactiveTest.OnNext(5500.Ms(), '5'),
ReactiveTest.OnNext(6000.Ms(), 'B'),
ReactiveTest.OnNext(7200.Ms(), '1'),
ReactiveTest.OnNext(7500.Ms(), 'B'),
ReactiveTest.OnNext(7700.Ms(), 'A'),
ReactiveTest.OnNext(8400.Ms(), 'A'));
var expected = scheduler.CreateColdObservable<string>(
ReactiveTest.OnNext(0100.Ms(), "1"),
ReactiveTest.OnNext(1600.Ms(), "2"),
ReactiveTest.OnNext(4100.Ms(), "AB"),
ReactiveTest.OnNext(5500.Ms(), "5"),
ReactiveTest.OnNext(7000.Ms(), "CAB"),
ReactiveTest.OnNext(7200.Ms(), "1"),
ReactiveTest.OnNext(9400.Ms(), "BAA"));
// if ReactiveTest.OnNext(3800.Ms(), 1000)
// then expected is ReactiveTest.OnNext(3800.Ms(), "AB")
UPDATE # 2
Refined Lösung
Faire Kritik. Vielen Dank. Ich bin in der dunklen, alten Kunst des Hand-Waving ™ gut geschult und schlechte Gewohnheiten sterben schwer. – Shlomo
Danke Lee, sehr nützliche Kommentare in Ihrer Antwort. Ich werde es so schnell wie möglich analysieren, in der Zwischenzeit habe ich meine Frage verbessert. – zpul