2016-07-12 14 views
0

Ich bearbeite eine Liste von Artikeln (200k - 300k), jede Artikel Bearbeitungszeit liegt zwischen 2 bis 8 Sekunden. Um Zeit zu gewinnen, kann ich diese Liste parallel bearbeiten. Wie ich in einem Asynchron-Kontext bin, verwende ich so etwas wie diese:C# Parallel Foreach + Async

public async Task<List<Keyword>> DoWord(List<string> keyword) 
{ 
    ConcurrentBag<Keyword> keywordResults = new ConcurrentBag<Keyword>(); 
    if (keyword.Count > 0) 
    { 
     try 
     { 
      var tasks = keyword.Select(async kw => 
      { 
       return await Work(kw).ConfigureAwait(false); 
      }); 

      keywordResults = new ConcurrentBag<Keyword>(await Task.WhenAll(tasks).ConfigureAwait(false)); 
     } 
     catch (AggregateException ae) 
     { 
      foreach (Exception innerEx in ae.InnerExceptions) 
      { 
       log.ErrorFormat("Core threads exception: {0}", innerEx); 
      } 
     } 
    } 
    return keywordResults.ToList(); 
} 

Die Keyword-Liste immer 8 Elemente enthält (von oben kommend) so verarbeite ich meine Liste von 8 mal 8, aber in diesem Fall, ich denke, Wenn 7 Stichwörter in 3 Sekunden verarbeitet werden und der 8. in 10 Sekunden verarbeitet wird, beträgt die Gesamtzeit für die 8 Stichwörter 10 (korrigieren Sie mich, wenn ich falsch liege). Wie kann ich von der Parallel.Foreach dann nähern? Ich meine: starte 8 Schlüsselwörter, wenn 1 von ihnen fertig ist, starte 1 weitere. In diesem Fall habe ich 8 Arbeitsprozesse permanent. Irgendeine Idee ?

+0

Haben Sie darüber nachgedacht, mit [ 'TPL DataFlow'] (https://msdn.microsoft.com/en-us /library/hh228603(v=vs.110).aspx) um eine Pipeline zur Verarbeitung der Elemente einzurichten? –

+0

es klingt wie das ist, was Sie suchen https://msdn.microsoft.com/en-us/library/system.threading.tasks.paralleloptions.maxdegreeofparallelism(v=vs.110).aspx –

+0

@MatthewWatson, Ich habe es gerade Existenz erfahren, ich werde das überprüfen, danke! – Gun

Antwort

2

Hier ist ein Beispielcode, der zeigt, wie Sie mit TPL Dataflow umgehen können.

Hinweis: Um dies zu kompilieren, müssen Sie TPL Dataflow über NuGet zu Ihrem Projekt hinzufügen.

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Threading.Tasks; 
using System.Threading.Tasks.Dataflow; 

namespace Demo 
{ 
    class Keyword // Dummy test class. 
    { 
     public string Name; 
    } 

    class Program 
    { 
     static void Main() 
     { 
      // Dummy test data. 
      var keywords = Enumerable.Range(1, 100).Select(n => n.ToString()).ToList(); 

      var result = DoWork(keywords).Result; 

      Console.WriteLine("---------------------------------"); 

      foreach (var item in result) 
       Console.WriteLine(item.Name); 
     } 

     public static async Task<List<Keyword>> DoWork(List<string> keywords) 
     { 
      var input = new TransformBlock<string, Keyword> 
      (
       async s => await Work(s), 
       // This is where you specify the max number of threads to use. 
       new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 } 
      ); 

      var result = new List<Keyword>(); 

      var output = new ActionBlock<Keyword> 
      (
       item => result.Add(item), // Output only 1 item at a time, because 'result.Add()' is not threadsafe. 
       new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 } 
      ); 

      input.LinkTo(output, new DataflowLinkOptions { PropagateCompletion = true }); 

      foreach (string s in keywords) 
       await input.SendAsync(s); 

      input.Complete(); 
      await output.Completion; 

      return result; 
     } 

     public static async Task<Keyword> Work(string s) // Stubbed test method. 
     { 
      Console.WriteLine("Processing " + s); 

      int delay; 
      lock (rng) { delay = rng.Next(10, 1000); } 
      await Task.Delay(delay); // Simulate load. 

      Console.WriteLine("Completed " + s); 
      return await Task.Run(() => new Keyword { Name = s }); 
     } 

     static Random rng = new Random(); 
    } 
} 
1

Ein weiterer einfacher Weg, dies zu tun ist, die verwenden AsyncEnumerator NuGet Package:

using System.Collections.Async; 

public async Task<List<Keyword>> DoWord(List<string> keywords) 
{ 
    var keywordResults = new ConcurrentBag<Keyword>(); 
    await keywords.ParallelForEachAsync(async keyword => 
    { 
     try 
     { 
      var result = await Work(keyword); 
      keywordResults.Add(result); 
     } 
     catch (AggregateException ae) 
     { 
      foreach (Exception innerEx in ae.InnerExceptions) 
      { 
       log.ErrorFormat("Core threads exception: {0}", innerEx); 
      } 
     } 
    }, maxDegreeOfParallelism: 8); 
    return keywordResults.ToList(); 
}