2014-10-28 10 views
13

Ich habe Code wie folgt aus:Wie LINQ 'let' Anweisungen parallel ausgeführt werden?

var list = new List<int> {1, 2, 3, 4, 5}; 

var result = from x in list.AsParallel() 
      let a = LongRunningCalc1(x) 
      let b = LongRunningCalc2(x) 
      select new {a, b}; 

Lassen Sie uns sagen, dass die LongRunningCalc Methoden je 1 Sekunde. Der obige Code benötigt etwa 2 Sekunden, um ausgeführt zu werden, da die Liste der 5 Elemente parallel bearbeitet wird, werden die beiden Methoden, die aus den Anweisungen let aufgerufen werden, sequenziell aufgerufen.

Diese Methoden können jedoch auch parallel sicher aufgerufen werden. Sie müssen natürlich wieder für die select zusammenführen, aber bis dahin sollte parallel laufen - die select sollte auf sie warten.

Gibt es einen Weg, dies zu erreichen?

Antwort

7

Sie werden nicht in der Lage sein, um die Abfragesyntax oder die let Operation zu verwenden, aber Sie können eine Methode schreiben, um mehrere Operationen für jedes Element parallel durchführen:

public static ParallelQuery<TFinal> SelectAll<T, TResult1, TResult2, TFinal>(
    this ParallelQuery<T> query, 
    Func<T, TResult1> selector1, 
    Func<T, TResult2> selector2, 
    Func<TResult1, TResult2, TFinal> resultAggregator) 
{ 
    return query.Select(item => 
    { 
     var result1 = Task.Run(() => selector1(item)); 
     var result2 = Task.Run(() => selector2(item)); 
     return resultAggregator(result1.Result, result2.Result); 
    }); 
} 

Diese schreiben Sie erlauben würde:

var query = list.AsParallel() 
    .SelectAll(LongRunningCalc1, 
     LongRunningCalc2, 
     (a, b) => new {a, b}) 

Sie Überlastungen für zusätzliche parallele Operationen als auch hinzufügen:

public static ParallelQuery<TFinal> SelectAll<T, TResult1, TResult2, TResult3, TFinal> 
    (this ParallelQuery<T> query, 
    Func<T, TResult1> selector1, 
    Func<T, TResult2> selector2, 
    Func<T, TResult3> selector3, 
    Func<TResult1, TResult2, TResult3, TFinal> resultAggregator) 
{ 
    return query.Select(item => 
    { 
     var result1 = Task.Run(() => selector1(item)); 
     var result2 = Task.Run(() => selector2(item)); 
     var result3 = Task.Run(() => selector3(item)); 
     return resultAggregator(
      result1.Result, 
      result2.Result, 
      result3.Result); 
    }); 
} 

Es ist möglich, eine Version zu behandeln eine Reihe von Selektoren nicht bei der Kompilierung bekannt zu schreiben, aber zu tun, dass sie alle einen Wert des gleichen Typs berechnen müssen:

public static ParallelQuery<IEnumerable<TResult>> SelectAll<T, TResult>(
    this ParallelQuery<T> query, 
    IEnumerable<Func<T, TResult>> selectors) 
{ 
    return query.Select(item => selectors.AsParallel() 
      .Select(selector => selector(item)) 
      .AsEnumerable()); 
} 
public static ParallelQuery<IEnumerable<TResult>> SelectAll<T, TResult>(
    this ParallelQuery<T> query, 
    params Func<T, TResult>[] selectors) 
{ 
    return SelectAll(query, selectors); 
} 
+0

Dies ist nicht mein Expertenbereich, aber müssen Sie nicht auf die Ergebnisse der Aufgaben warten? – Magnus

+0

@Magnus bin ich schon. – Servy

+0

Yeh Calling '.Result' wartet auf die Ausführung der Task. Ich habe meinen Beispielcode ein wenig vereinfacht, aber das hat mich auf den richtigen Weg gebracht, danke für die Hilfe. – Lyall

0

ich tun würde, dies mit Microsofts Reactive Framework ("Rx-Main" in NuGet).

Hier ist sie:

var result = 
    from x in list.ToObservable() 
    from a in Observable.Start(() => LongRunningCalc1(x)) 
    from b in Observable.Start(() => LongRunningCalc2(x)) 
    select new {a, b}; 

Das Schöne daran ist, dass man auf die Ergebnisse zugreifen können, wie sie hergestellt werden, mit der .Subscribe(...) Methode:

result.Subscribe(x => /* Do something with x.a and/or x.b */); 

Super einfach!