2014-07-03 7 views
7

Ich versuche, Reactive Extensions (Rx) für eine Aufgabe zu verwenden, wo es gut zu passen scheint, in einem bestimmten Intervall einen Webdienst abfragt und seinen letzten anzeigt x ErgebnisseWebservice mit Reactive Extensions abfragen und die letzten x Ergebnisse binden

Ich habe einen Webservice, der mir den Status eines Instruments, das ich überwachen möchte, sendet. Ich möchte dieses Instrument mit einer bestimmten Rate abfragen und in einer Liste die letzten 20 Status anzeigen, die abgefragt wurden.

Also meine Liste wäre wie ein "bewegliches Fenster" des Service-Ergebnisses.

Ich entwickle eine WPF-App mit Caliburn.Micro, aber ich denke nicht, dass dies sehr relevant ist.

Was konnte ich bis jetzt bekommen, ist die folgende (nur eine Beispielanwendung, die ich schnell gehackt, ich werde tun dies nicht in der ShellViewModel in der realen app):

public class ShellViewModel : Caliburn.Micro.PropertyChangedBase, IShell 
{ 
    private ObservableCollection<string> times; 
    private string currentTime; 

    public ShellViewModel() 
    { 
     times = new ObservableCollection<string>(); 

     Observable 
      .Interval(TimeSpan.FromSeconds(1)) 
      .SelectMany(x => this.GetCurrentDate().ToObservable()) 
      .ObserveOnDispatcher() 
      .Subscribe(x => 
      { 
       this.CurrentTime = x; 
       this.times.Add(x); 
      }); 
    } 

    public IEnumerable<string> Times 
    { 
     get 
     { 
      return this.times; 
     } 
    } 

    public string CurrentTime 
    { 
     get 
     { 
      return this.currentTime; 
     } 
     set 
     { 
      this.currentTime = value; 
      this.NotifyOfPropertyChange(() => this.CurrentTime); 
     } 
    } 

    private async Task<string> GetCurrentDate() 
    { 
     var client = new RestClient("http://www.timeapi.org"); 
     var request = new RestRequest("/utc/now.json"); 

     var response = await client.ExecuteGetTaskAsync(request); 

     return response.Content; 
    } 
} 

In der Ansicht Ich habe nur ein Label an die CurrentTime Eigenschaft gebunden und eine Liste an die Times Eigenschaft gebunden.

Das Problem, das ich habe, ist:

  • Es bin nicht auf die 20 Elemente in der Liste beschränkt, wie ich auf die ObservableCollection immer Elemente hinzufügen, aber ich kann einen besseren Weg nicht finden
  • das Intervalls auf Databind funktioniert nicht so, wie ich es möchte. Wenn die Abfrage länger als 1 Sekunde dauert, werden zwei Abfragen parallel ausgeführt, die ich nicht passieren möchte. Mein Ziel wäre, dass die Abfrage unbegrenzt wiederholt wird, aber mit einer Geschwindigkeit von nicht mehr als 1 Abfrage pro Sekunde. Wenn eine Abfrage mehr als 1 Sekunde zum Beenden benötigt, sollte sie auf das Ende warten und die neue Abfrage direkt auslösen.

Zweite edit:

Zurück bearbeiten unten wurde mir dumm und sehr verwirrt zu sein, löst es Ereignisse kontinuierlich weil Intervall ist etwas, kontinuierlich die niemals endet. Brandons Lösung ist korrekt und funktioniert wie erwartet.

Edit:

auf Brandons Beispiel Basierend habe ich versucht, den folgenden Code in LinqPad zu tun:

Observable 
    .Merge(Observable.Interval(TimeSpan.FromSeconds(2)), Observable.Interval(TimeSpan.FromSeconds(10))) 
    .Repeat() 
    .Scan(new List<double>(), (list, item) => { list.Add(item); return list; }) 
    .Subscribe(x => Console.Out.WriteLine(x)) 

Und ich kann sehen, dass der Schreibvorgang auf die Konsole alle 2 Sekunden, und nicht jeder kommt 10. Die Wiederholung wartet also nicht darauf, dass beide Observable beendet werden, bevor sie wiederholt werden.

Antwort

6

Versuchen Sie folgendes:

// timer that completes after 1 second 
var intervalTimer = Observable 
    .Empty<string>() 
    .Delay(TimeSpan.FromSeconds(1)); 

// queries one time whenever subscribed 
var query = Observable.FromAsync(GetCurrentDate); 

// query + interval timer which completes 
// only after both the query and the timer 
// have expired 

var intervalQuery = Observable.Merge(query, intervalTimer); 

// Re-issue the query whenever intervalQuery completes 
var queryLoop = intervalQuery.Repeat(); 

// Keep the 20 most recent results 
// Note. Use an immutable list for this 
// https://www.nuget.org/packages/microsoft.bcl.immutable 
// otherwise you will have problems with 
// the list changing while an observer 
// is still observing it. 
var recentResults = queryLoop.Scan(
    ImmutableList.Create<string>(), // starts off empty 
    (acc, item) => 
    { 
     acc = acc.Add(item); 
     if (acc.Count > 20) 
     { 
      acc = acc.RemoveAt(0); 
     } 

     return acc; 
    }); 

// store the results 
recentResults 
    .ObserveOnDispatcher() 
    .Subscribe(items => 
    { 
     this.CurrentTime = items[0]; 
     this.RecentItems = items; 
    }); 
+0

Sehr interessante Antwort, danke. Ich sehe also, dass es keine andere Möglichkeit gibt, die Liste der von der Abfrage zurückgegebenen Elemente abzurufen, als die Abfrage zu durchsuchen und eine Liste zu erstellen. Gibt es einen Grund, warum Sie [TakeLast] (http://msdn.microsoft.com/en-us/library/hh212114 (v = vs.103) .aspx) nicht verwendet haben und den Scan für die Überprüfung durchgeführt haben die 20 Gegenstände? Was auch, wenn ich in der Lage sein möchte, die 1s Verzögerung zwischen den Abfragen dynamisch zu ändern? – Gimly

+0

'TakeLast' nimmt die ** letzten ** N Items aus dem Stream, nicht die ** neuesten ** N Items. Mit anderen Worten, es wird überhaupt nichts produzieren, bis der Stream * abgeschlossen ist, dann wird es die letzten N Items ergeben, die es produziert hat. Nicht sehr nützlich hier. Wenn Sie die Definition von "intervalTimer" mit "Observable.Defer" umhüllen und auch eine Variable lesen oder eine Funktion für die Zeitspanne aufrufen, wird 'Defer' den Timer bei jeder Iteration neu aufbauen, was Ihnen die Möglichkeit gibt, die Verzögerung zu ändern Zeitraum. – Brandon

+0

Ah, deshalb waren meine Tests mit Take und TakeLast nicht erfolgreich. Danke nochmal für deine Hilfe. – Gimly

0

Dies sollte die Intervallnachrichten überspringen, während ein GetCurrentDate in Bearbeitung ist.

Observable 
    .Interval(TimeSpan.FromSeconds(1)) 
    .GroupByUntil(p => 1,p => GetCurrentDate().ToObservable().Do(x => { 
          this.CurrentTime = x; 
          this.times.Add(x); 
         })) 
    .SelectMany(p => p.LastAsync()) 
    .Subscribe();