2016-06-22 8 views
0

Update: Unterschrift und Type DefinitionsRxNet Paginierung

GetChecklistStringParents

IObservable<ChecklistStringParents_Response> GetChecklistStringParents(int company_id, string auth_token, string q = null, int page = 1) 

Top-Level-Reaktion die eine Seite

public class ChecklistStringParents_Response 
{ 
    //the content of the page a list of string parents 
    public List<Resp_ChecklistStringParent> BODY { get; set; } 
    public Resp_Meta META { get; set; } 
    public List<object> ERRORS { get; set; } 
} 

Response-Klasse für einen einzelnen String pare nt

public class Resp_ChecklistStringParent 
{ 
    public int checklist_string_type { get; set; } 
    public string client_date_created { get; set; } 
    public string uploaded_date { get; set; } 
    public string last_modified { get; set; } 
    public string value { get; set; } 
    public int id { get; set; } 
} 

Ich versuche, reaktiv (RxNet) eine REST-API zugreifen ich gebaut habe, aber ich bin sehr neu auf dem Paradigma und bin ein bisschen fest.

Die folgende Funktion fordert Daten von einem Endpunkt Seite für Seite an.

Observable.DoWhile und Observable.Defer werden verwendet, so dass für jede Seite eine neue Observable erstellt wird, und wir erstellen sie so lange, bis wir eine leere Liste als Hauptteil der Seitenantwort erhalten.

Ich abonniere die Observable von Observable.DoWhile zurückgegeben, um die Seite zu aktualisieren und die Ergebnisanzahl zu aktualisieren. Das fühlt sich nicht richtig an, aber ich habe keine Alternative gesehen.

Meine Frage ist, ist dies der beste Weg, Paginate Ergebnisse in RxNet? Ich würde auch gerne einen Strom von Ergebnissen erhalten, d. H. Der Inhalt jeder Seite würde zu einer einzigen beobachtbaren Information abgeflacht, die ich von dieser Funktion zurückgeben könnte, aber ich habe keine Ahnung, wie ich das erreichen kann.

private void FetchStringParents(int checklist_id) 
    { 
     /*Setup the filter to the endpoint such that string parents returned are those associated with the passed checklist*/ 

     Filter base_filter = new Filter("checklist_id", "eq", checklist_id.ToString()); 
     NestedFilter nested_filter = new NestedFilter("checklists", "any", base_filter); 
     Filters filt = new Filters(); 
     filt.filters.Add(nested_filter); 
     string sp_query_json = JsonConvert.SerializeObject(filt); 
     int result_count = 0; 
     int page = 1; 
     var paged_observable = Observable.DoWhile(
      //At least once create an observable on the api endpoint 
      Observable.Defer(() => NetworkService_ChecklistStringParentService.GetInstance.GetChecklistStringParents(6, 
      this.cached_auth.Auth_Token, sp_query_json, page)), 
     /*boolean function which determines if we should hit the api again (if we got back a non empty result the last time)*/ 
     () => result_count > 0) 
     .Subscribe(
      st => 
      { 
       //on successful receipt of a page 
       Debug.WriteLine("Success"); 
       page++;//update page so that the next observable created is created on page++ 
       result_count = st.BODY.Count;//update result_count (we will stop listening on a count of 0) 
      }, 
       _e => 
       { 
        Debug.WriteLine("Fail"); 
       }); 
    } 

Update: Lösung

private IObservable<Resp_ChecklistStringParent> StreamParents(int checklist_id) 
    { 
     Filter base_filter = new Filter("checklist_id", "eq", checklist_id.ToString()); 
     NestedFilter nested_filter = new NestedFilter("checklists", "any", base_filter); 
     Filters filt = new Filters(); 
     filt.filters.Add(nested_filter); 
     string sp_query_json = JsonConvert.SerializeObject(filt); 
     return Observable.Create<List<Resp_ChecklistStringParent>>(async (obs, ct) => 
     { 
      int pageIdx = 1; 
      //for testing page size is set to 1 on server 
      int pageSize = 1; 
      while (!ct.IsCancellationRequested) 
      { 
       //Pass in cancellation token here? 
       var page = await NetworkService_ChecklistStringParentService.GetInstance.GetChecklistStringParents(6, 
       this.cached_auth.Auth_Token, sp_query_json, pageIdx++); 
       obs.OnNext(page.BODY); 
       if (page.BODY.Count < pageSize) 
       { 
        obs.OnCompleted(); 
        break; 
       } 
      } 
     }) 
     .SelectMany(page => page); 
    } 
+0

Ihr Code verwendet ein Mischmasch von Staat und Rx, die eher merkwürdig erscheint. Es sollte einen sehr sauberen Rx Weg geben, dies zu tun, aber es scheint wirklich so, als müssten wir die Signatur von 'GetChecklistStringParents' kennen, um helfen zu können. – Enigmativity

+0

Und alle zugehörigen Typdefinitionen. :-) – Enigmativity

+0

@Enigmativity Hallo, vielen Dank für einen Blick. Ich habe diese Details zu der Frage hinzugefügt. –

Antwort

2

Hier ist ein Beispiel, das zeigt, wie Sie Datenstrom könnte sowohl mit IEnumerable<T> und IObservable<T>. Das IEnumerable<T> Beispiel dient zum Kontext und Vergleich.

Ich verwende nicht Ihren Datentyp, da ich denke, dass es keine Relevanz für die Frage hat (in der Hoffnung, dass das in Ordnung ist).

LINQPad Skript

void Main() 
{ 
    var enumerableItems = GetItemsSync(); 
    foreach (var element in enumerableItems) 
    { 
     Console.WriteLine(element); 
    } 
    Console.WriteLine("Received all synchronous items"); 
    StreamItems().Subscribe(
     element => Console.WriteLine(element), 
     () => Console.WriteLine("Received all asynchronous items")); 
} 

// Define other methods and classes here 

public IObservable<string> StreamItems() 
{ 
    return Observable.Create<string[]>(async (obs, ct) => 
    { 
     var pageIdx = 0; 
     var pageSize = 3; 
     while (!ct.IsCancellationRequested) 
     { 
      //Pass in cancellation token here? 
      var page = await GetPageAsync(pageIdx++, pageSize); 
      obs.OnNext(page); 
      if (page.Length < pageSize) 
      { 
       obs.OnCompleted(); 
       break; 
      } 
     } 
    }) 
    .SelectMany(page => page); 
} 

public IEnumerable<string> GetItemsSync() 
{ 
    return GetPagesSync().SelectMany(page => page); 
} 
public IEnumerable<string[]> GetPagesSync() 
{ 
    var i = 0; 
    var pageSize = 3; 
    while (true) 
    { 
     var page = GetPageSync(i++, pageSize); 
     yield return page; 
     if (page.Length < pageSize) 
      yield break; 
    } 
} 
private static string[] _fakeData = new string[]{ 
    "one", 
    "two", 
    "three", 
    "four", 
    "Five", 
    "six", 
    "Se7en", 
    "Eight" 
}; 
public string[] GetPageSync(int pageIndex, int pageSize) 
{ 
    var idx = pageSize * pageIndex; 
    var bufferSize = Math.Min(pageSize, _fakeData.Length-idx); 
    var buffer = new string[bufferSize]; 
    Array.Copy(_fakeData, idx, buffer, 0, bufferSize); 
    return buffer; 
} 
public Task<string[]> GetPageAsync(int pageIndex, int pageSize) 
{ 
    //Just to emulate an Async method (like a web request). 
    return Task.FromResult(GetPageSync(pageIndex, pageSize)); 
} 
+0

Danke Lee, habe das für meine eigenen ruchlosen Zwecke angepasst. Danke auch für http://www.introtorx.com/, ohne die mein Fummeln noch amüsanter wäre. –