2016-03-31 8 views
1

Ich habe eine Funktion zum Ersetzen einer einzelnen Entität in einem Repository (z. B. einer Datenbank) durch eine andere Entität. Dies gibt eine Aufgabe zurück, deren Ergebnis anzeigt, ob eine Ersetzung erfolgreich war.Convert IDictionary <T, Aufgabe <bool>> zu IObservable <KeyValuePair <T, bool>>

Jetzt sagen, ich möchte die gleiche Funktion erstellen, aber mehrere Elemente zu ersetzen. Meine Idee für eine Funktion Unterschrift war:

// Multiple Replace 
IObservable<KeyValuePair<Entity, bool>> Replace(IDictionary<Entity, Entity> Map); 

Karte enthält ein Wörterbuch, dessen Schlüssel die Einheiten, die ersetzt werden soll, und seine Werte sind die neuen Einheiten, die die alten ersetzen sollen.

Jedes KeyValuePair dieser Karte Wörterbuch sollte ein KeyValuePair < Entity zurückkehren, bool>, wobei die Taste entspricht Map.Key (= die Entitiy, die ersetzt werden sollen) und Wert ein Bool ist, ob der Ersatz erfolgreich war anzeigt oder nicht.

Ich mag würde dies als Stream zurück, daher wählte ich IObservable < KeyValuePair < Entity, bool>>, ein KeyValuePair < Entity Ausschieben, bool> für jedes Ergebnis, das zur Verfügung steht.

Für jetzt möchte ich nur die Funktion "Mehrere ersetzen" nutzen die "Single Replace" -Funktion zu konvertieren; Aber wie kann ich diese Anrufe machen und das Ergebnis im gewünschten Format zurückgeben?

Task<bool> Replace(Entity e, Entity Other) 
{ 
    // ... do the work ... 
} 

IObservable<KeyValuePair<Entity, bool>> Replace(IDictionary<Entity, Entity> Map) 
{ 
    // How this should work: 
    // for each KeyValuePair in Map, call the Single Replace function 
    // and - once available - return its result (bool) mapped to the Entity in the 
    // resulting Observable stream 

    IDictionary<Entity, Task<bool>> dict = Map.ToDictionary(x => x.Key, x => Replace(x.Key, x.Value)); 

    // .. now what? How can I now convert the dict into an IObservable<KeyValuePair<Entity,bool>> ? 

} 

Würde mich über Hinweise freuen!

Antwort

1

basierend auf Peter-Lösung, eine kleine Änderung, die Aufgabe nutzt.ToObservable:

IObservable<Tuple<Entity, bool>> Replace(IDictionary<Entity, Entity> Map) 
    { 
     Task<Tuple<Entity, bool>>[] tasks = new Task<Tuple<Entity, bool>>(Map.Count); 
     int i = 0; 

     foreach (var kvp in Map) 
     { 
      tasks[i++] = ReplaceAsync(kvp.Key, kvp.Value); 
     } 

     return tasks 
      .Select(x => x.ToObservable()) 
      .Merge(); // or use .Concat for sequential execution 
    } 

Scheint gut zu funktionieren, erkennt jemand mögliche Probleme?

Mithilfe von .Merge wird ein IObservable-Objekt bereitgestellt, das die Tasks parallel startet und nach Abschluss der Tasks alle Ergebnisse liefert.

Alternativ kann ich .Concat(), um sie nacheinander auszuführen, das heißt die erste Aufgabe zum ersten Mal starten, warten, bis es dann zu beenden, die zweite Aufgabe starten, usw.

Schöne daran ist, dass die Fehlerbehandlung kommt für Ich glaube, wenn eine der Aufgaben eine Exception auslöst, wird OnError des IObservable automatisch aufgerufen.

+1

Ich bin nicht so vertraut mit den Reactive-Erweiterungen - in der Tat war mir nicht einmal die 'ToObservable()' und 'Merge()' Methoden bekannt, die das Szenario ziemlich viel zu vereinfachen scheinen. Meine einzige Sorge könnte sein, dass das Obige eine Sequenz von Observablen erzeugt und nicht auf einer Sequenz beobachtbar ist; letzteres könnte _leicht_ effizienter sein. Aber IMHO, Code-Klarheit ist viel wichtiger, so dass Sie ein echtes und signifikantes Leistungsproblem beweisen müssen, bevor Sie sich darüber Gedanken machen. –

+1

Auf jeden Fall verzichtet mein Beispiel auf Feinheiten wie die Fehlerbehandlung, die die Motivation, diese anstelle der eingebauten Operatoren zu nutzen, nur weiter untergräbt. Ich bin froh, dass der Hauptpunkt meiner Antwort Sie dahin gebracht hat, wohin Sie gehen mussten. :) –

1

Ohne eine gute Minimal, Complete, and Verifiable example zu sehen, die deutlich zeigt, was Sie tun, und vor allem, wie Sie das IObservable<T> Objekt aus Ihrer Methode implementieren möchten, ist es unmöglich, spezifische Beratung anzubieten. Ich kann jedoch einige Vorschläge machen, die Sie zumindest in eine nützliche Richtung weisen sollten.

Zuerst werde ich vorschlagen, dass es sinnvoller ist, ein Task<Tuple<Entity, bool>> zu haben (oder Task<KeyValuePair<Entity, bool>>, obwohl ich die Verwendung Wörterbuch bezogenen Typen für nicht-Wörterbuch abhängigen Code & hellip nicht sehr gern bin, also dort, wo Sie wirklich nur die Paarung Dinge, anstatt eine echte Schlüssel/Wert-Paarung zu haben). Auf diese Weise wissen Sie, wenn die Aufgabe abgeschlossen ist, von der Aufgabe selbst alle Informationen, die Sie benötigen.

Dies kann mit einer einfachen Hilfsmethode durchgeführt werden, zum Beispiel:

async Task<Tuple<Entity, bool>> ReplaceAsync(Entity e, Entity other) 
{ 
    return Tuple.Create(e, await Replace(e, other)); 
} 

ich Tuple hier verwendet, aber natürlich können Sie einen benannten Typen zum Zweck erstellen, und so konnte in der Tat hilft Lesbarkeit machen von der Code.

Nachdem das getan hat, können Sie aufzählen nur alle Elemente in Ihrem Eingang Wörterbuch (oder andere, geeignetere Sammlung :)) und Task.WhenAny() dann jedes abgeschlossenes Element iterativ zu entfernen und veröffentlichen verwenden, um, wie sie auftreten:

IObservable<Tuple<Entity, bool>> Replace(IDictionary<Entity, Entity> Map) 
{ 
    Task<Tuple<Entity, bool>>[] tasks = new Task<Tuple<Entity, bool>>(Map.Count); 
    int i = 0; 

    foreach (var kvp in Map) 
    { 
     tasks[i++] = ReplaceAsync(kvp.Key, kvp.Value); 
    } 

    // Create the IObservable object somehow. Make sure it has the 
    // array of tasks in it. E.g. (see below for TaskObservable example) 

    TaskObservable<Tuple<Entity, bool>> observable = 
     new TaskObservable<Tuple<Entity, bool>>(tasks); 

    // Now, start running the observable object. Note: not really all that great 
    // to ignore the returned Task object but without more context in your question 
    // I can't offer anything more specific. You will probably want to store the 
    // Task object *somewhere*, await it, wrap all that in try/catch, etc. to 
    // make sure you are correctly monitoring the progress of the task. 
    var _ = observable.Run(); 

    return observable; 
} 

Die IObservable<T> könnte etwa so aussehen:

class TaskObservable<T> : IObservable<T> 
{ 
    private class ObserverItem : IDisposable 
    { 
     public readonly IObserver<T> Observer; 
     public readonly TaskObservable<T> Owner; 

     public ObserverItem(IObserver<T> observer, TaskObservable<T> owner) 
     { 
      Observer = observer; 
      Owner = owner; 
     } 

     public void Dispose() 
     { 
      if (!Owner._observerItems.Contains(this)) 
      { 
       throw new InvalidOperationException(
        "This observer is no longer subscribed"); 
      } 

      Owner._observerItems.Remove(this); 
     } 
    } 

    private readonly Task<T>[] _tasks; 
    private readonly List<ObserverItem> _observerItems = new List<ObserverItem>(); 

    public TaskObservable(Task<T>[] tasks) 
    { 
     _tasks = tasks; 
    } 

    public IDisposable Subscribe(IObserver<T> observer) 
    { 
     ObserverItem item = new ObserverItem(observer, this); 

     _observerItems.Add(item); 
     return item; 
    } 

    private void Publish(T t) 
    { 
     foreach (ObserverItem item in _observerItems) 
     { 
      item.Observer.OnNext(t); 
     } 
    } 

    private void Complete() 
    { 
     foreach (ObserverItem item in _observerItems) 
     { 
      item.Observer.OnCompleted(); 
     } 
    } 

    async Task Run() 
    { 
     for (int i = 0; i < _tasks.Length; i++) 
     { 
      Task<T> completedTask = await Task.WhenAny(tasks); 

      Publish(completedTask.Result); 
     } 

     Complete(); 
    } 
} 
+0

Vielen Dank für Ihre gut durchdachte Antwort Peter, diese sehr hilfreich und interessant zu lesen - ich hat einiges von Ihrer TaskObservable-Klasse gelernt. Und ich denke, die Transformation zu Task > war in der Tat ein wichtiger Punkt - ich denke, auf diese Weise kann ich sogar eine Klasse wie TaskObservable vermeiden und gleichzeitig eine bequeme Fehlerbehandlung durchführen. Schreibe unten, lass mich wissen, was du denkst. – Bogey

1

Ich bin nicht sicher, ob ich etwas verpasst haben, aber ich denke, das ist für Sie arbeiten könnte:

public IObservable<KeyValuePair<T, bool>> Convert<T>(IDictionary<T, Task<bool>> source) 
{ 
    return 
     Observable 
      .Create<KeyValuePair<T, bool>>(o => 
       (
        from s in source.ToObservable() 
        from v in s.Value.ToObservable() 
        select new KeyValuePair<T, bool>(s.Key, v) 
       ).Subscribe(o)); 
}