2016-07-26 24 views
3

Ich habe eine Reihe von beobachtbaren Ereignisströmen, die alle Ereignisse mit Zeitstempeln bereitstellen. Ich interessiere mich nicht für die Ereignisse einzeln, ich muss wissen, wann sie alle innerhalb eines bestimmten Zeitrahmens gefeuert haben.Rx, wie Observables zusammengeführt werden und nur feuern, wenn alle in einem bestimmten Zeitrahmen zur Verfügung gestellt werden

Zum Beispiel: Knopf ein angeklickt wurde (do not care) Knopf zwei geklickt wurde (do not care) Knopf eine geklickt wurde und innerhalb von 5 Sekunden Taste zwei geklickt wurde (Ich brauche diese)

Ich habe versucht "und dann wenn", aber ich bekomme alte Ereignisse und kann nicht herausfinden, wie man sie ausfiltert, wenn es nicht innerhalb des Zeitfensters ist.

Danke!

Edit: ich versuchte, ein Marmor-Diagramm zu erstellen, um zu klären, was ich ...

ich eine Reihe von zufälligen Ereignisströmen habe zu erreichen versuchen, im oberen Teil dargestellt. Manche Ereignisse feuern öfter als andere. Ich möchte nur die Gruppe von Ereignissen erfassen, die innerhalb eines bestimmten Zeitfensters ausgelöst wurde. In diesem Beispiel habe ich Fenster von 3 Sekunden verwendet. Die Ereignisse, die ich möchte, sind dunkelschwarz hervorgehoben, alle anderen Ereignisse sollten ignoriert werden. Ich hoffe, dies hilft, das Problem besser zu erklären.

enter image description here

+0

Können Sie ein Marmordiagramm veröffentlichen, das zeigt, was Sie erwarten? – Shlomo

+0

@Shlomo, habe ich die Beschreibung aktualisiert. Danke – Jras

+0

Also sollte das letzte Event in jedem Fenster weitergegeben werden, während die anderen im Fenster ignoriert werden? – Shlomo

Antwort

0

Ist Sequenz wichtig? Wenn dies der Fall ist, wird "flatMapLatest" normalerweise mit "this and then that" behandelt. Sie können Ihre anderen Einschränkungen erreichen, indem Sie sie auf den an flatMapLatest übergebenen Stream anwenden. Betrachten Sie das folgende Beispiel:.

const fromClick = x => Rx.Observable.fromEvent(document.getElementById(x), 'click'); 

const a$ = fromClick('button1'); 
const b$ = fromClick('button2'); 

const sub = a$.flatMapLatest(() => b$.first().timeout(5000, Rx.Observable.never())) 
       .subscribe(() => console.log('condition met')); 

Hier sagen wir „für jeden Klick auf die Taste 1, starten Sie auf Taste hören 2 und kehren entweder den ersten Klick oder nichts (wenn wir das Timeout Hit) Ist hier ein Arbeits Beispiel:. https://jsbin.com/zosipociti/edit?js,console,output

+0

Dank Matt, Sequenz ist nicht wichtig, in der Tat muss ich in der Lage sein zu erkennen, dass die Ereignisse in beliebiger Reihenfolge passieren. Ich habe möglicherweise auch wesentlich mehr Ereignisströme beteiligt. – Jras

0

Sie die Sample operator verwenden möchten ich bin nicht sicher, ob Sie .NET oder JS Beispielcode möchten, da Sie beide getaggt


Edit:.

Hier ist ein .NET-Beispiel. metaSample ist eine beobachtbare von 10 Kindobservablen. Jede der Kind-Observablen hat Zahlen von 1 bis 99 mit zufälligen Zeitlücken zwischen jeder Zahl. Die Zeitlücken liegen zwischen 0 und 200 Millisekunden.

var random = new Random(); 

IObservable<IObservable<int>> metaSample = Observable.Generate(1, i => i < 10, i => i + 1, i => 
    Observable.Generate(1, j => j < 100, j => j + 1, j => j, j => TimeSpan.FromMilliseconds(random.Next(200)))); 

Wir probieren dann jeden der Kindbediener jede Sekunde. Dies gibt uns den letzten Wert, der in diesem zweiten Fenster aufgetreten ist. Wir verschmelzen dann diese zusammen abgetastete Ströme:

IObservable<int> joined = metaSample 
    .Select(o => o.Sample(TimeSpan.FromSeconds(1))) 
    .Merge(); 

Ein Marmor-Diagramm für 5 von ihnen könnte wie folgt aussehen:

child1: --1----2--3-4---5----6 
child2: -1-23---4--5----6-7--8 
child3: --1----2----3-4-5--6-- 
child4: ----1-2--34---567--8-9 
child5: 1----2--3-4-5--------6- 
t  : ------1------2------3- 
------------------------------ 
result: ------13122--45345--5768-- 

So ist die nach 1 Sekunde, die neuesten von jedem Kind packt und gibt sie , nach 2 Sekunden gleich. Beachten Sie nach 3 Sekunden, dass child5 nichts ausgesendet hat. Daher werden nur 4 Nummern ausgegeben. Offensichtlich mit unseren Parametern ist das unmöglich, aber das wird demonstriert, wie Sample ohne Ereignisse funktionieren würde.

+0

Ich fand diese großartige Diskussion über diese Konzepte in Rx. https://channel9.msdn.com/Shows/Going+Deep/Programming-Stream-of-Coincidence-with-Join-and-GroupJoin-for-Rx Ich habe es beobachtet und das Beispiel, das sie diskutieren (wer ist im Zimmer zur gleichen Zeit) ist sehr ähnlich ... Außer ich möchte nur eine Veranstaltung, wenn alle im Raum sind. In dem von mir bereitgestellten Beispiel möchte ich ein Ereignis, wenn alle Observablen im angegebenen Fenster ausgelöst wurden. Vielleicht ist es nicht möglich und ich muss jede Veranstaltung auf die Einbeziehung aller Ereignisse überprüfen. Sie diskutieren dies, es scheint, dass eine Kombination aus Fenster- oder Gruppenverbindung funktionieren könnte. – Jras

+0

Ich denke ich verstehe jetzt. Für das obige Marmordiagramm sind die Ereignisse bei t1 und t2 also gut, aber bei t3 sollte nichts emittiert werden. – Shlomo

+0

ignorieren Sie diesen Kommentar – Jras

0

Dies ist die nächste, die ich habe, um diese Aufgabe zu erfüllen ... Es muss einen saubereren Weg mit Gruppenverbindung geben, aber ich kann es nicht herausfinden!

static void Main(string[] args) 
    { 
     var random = new Random(); 
     var o1 = Observable.Interval(TimeSpan.FromSeconds(2)).Select(t => "A " + DateTime.Now.ToString("HH:mm:ss")); 
     o1.Subscribe(Console.WriteLine); 

     var o2 = Observable.Interval(TimeSpan.FromSeconds(3)).Select(t => "B " + DateTime.Now.ToString("HH:mm:ss")); 
     o2.Subscribe(Console.WriteLine); 

     var o3 = Observable.Interval(TimeSpan.FromSeconds(random.Next(3, 7))).Select(t => "C " + DateTime.Now.ToString("HH:mm:ss")); 
     o3.Subscribe(Console.WriteLine); 

     var o4 = Observable.Interval(TimeSpan.FromSeconds(random.Next(5, 10))).Select(t => "D " + DateTime.Now.ToString("HH:mm:ss")); 
     o4.Subscribe(Console.WriteLine); 

     var joined = o1 
      .CombineLatest(o2, (s1, s2)=> new { e1 = s1, e2 = s2}) 
      .CombineLatest(o3, (s1, s2) => new { e1 = s1.e1, e2 = s1.e2, e3 = s2 }) 
      .CombineLatest(o4, (s1, s2) => new { e1 = s1.e1, e2 = s1.e2, e3 = s1.e3, e4 = s2 }) 
      .Sample(TimeSpan.FromSeconds(3)); 
     joined.Subscribe(e => Console.WriteLine($"{DateTime.Now}: {e.e1} - {e.e2} - {e.e3} - {e.e4}")); 
     Console.ReadLine(); 
    }