8

Ich habe die folgende Methode geschrieben, um eine riesige CSV-Datei im Stapel zu verarbeiten. Die Idee besteht darin, einen Teil der Zeilen aus der Datei in den Speicher einzulesen und diese Zeilen dann in Stapel fester Größe aufzuteilen. Sobald wir die Partitionen erhalten haben, senden Sie diese Partitionen an einen Server (sync oder async), was eine Weile dauern kann.Wie werden asynchrone Vorgänge in einem TPL Dataflow für beste Leistung ausgeführt?

Dieses Stück Code scheint nicht sehr effizient aus zwei Gründen bcoz.

  1. Der Hauptthread, der aus der CSV-Datei liest, wird blockiert, bis alle Partitionen verarbeitet sind.

  2. Die AsParallel-Blöcke, bis alle Aufgaben abgeschlossen sind. Wenn also im Thread-Pool mehr Threads für die Arbeit verfügbar sind, verwende ich sie nicht, da die Anzahl der Tasks durch keine der Partitionen gebunden ist.

Die Chargengröße festgelegt ist, so kann nicht geändert werden, aber chunkSize für Performance abstimmbaren. Ich kann eine ausreichend große chunkSize wählen, so dass keine erstellten Chargen >> keine der im System verfügbaren Threads sind, aber es bedeutet immer noch, dass die Parallel.ForEach-Methode blockiert, bis alle Aufgaben beendet sind.

Wie kann ich den Code so ändern, dass alle verfügbaren Threads im System verwendet werden, um die Arbeit zu erledigen, ohne im Leerlauf zu sitzen. Ich denke, ich könnte eine BlockingCollection verwenden, um die Chargen zu speichern, aber nicht sicher, welche Kapazitätsgröße es geben soll, da keine Charge in jedem Chunk dynamisch ist.

Haben Sie Ideen, wie Sie mithilfe von TPL die Thread-Auslastung maximieren können, sodass die meisten verfügbaren Threads auf dem System immer funktionieren?

UPDATE: Dies ist, was ich bisher mit TPL Datenfluss. Ist das richtig?

private static void UploadData (string filePath, int chunkSize, int batchSize) 
    { 
     BatchBlock<string> buffer1 = new BatchBlock<string>(chunkSize); 
     BufferBlock<IEnumerable<string>> buffer2 = new BufferBlock<IEnumerable<string>>(); 

     var action1 = new ActionBlock<string[]>(t => 
     { 
      Console.WriteLine("Got a chunk of lines " + t.Count()); 

      // Partition each chunk into smaller chunks grouped on column 1 
      var partitions = t.GroupBy(c => c.Split(',')[0], (key, g) => g); 

      // Further beakdown the chunks into batch size groups 
      var groups = partitions.Select(x => x.Select((i, index) => new { i, index }).GroupBy(g => g.index/batchSize, e => e.i)); 

      // Get batches from groups 
      var batches = groups.SelectMany(x => x).Select(y => y.Select(z => z)); 

      foreach(var batch in batches) 
      { 
       buffer2.Post(batch); 
      } 

     }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 }); 

     buffer1.LinkTo(action1, new DataflowLinkOptions { PropagateCompletion = true }); 

     var action2 = new TransformBlock<IEnumerable<string>, IEnumerable<string>>(async b => 
     { 
      await ExecuteBatch(b); 
      return b; 

     }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded }); 

     buffer2.LinkTo(action2, new DataflowLinkOptions { PropagateCompletion = true }); 

     var action3 = new ActionBlock<IEnumerable<string>>(b => 
     { 
      Console.WriteLine("Finised executing a batch"); 
     }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 }); 

     action2.LinkTo(action3, new DataflowLinkOptions { PropagateCompletion = true }); 

     Task produceTask = Task.Factory.StartNew(() => 
      { 
       foreach (var line in File.ReadLines(filePath)) 
       { 
        buffer1.Post(line); 
       } 

       //Once marked complete your entire data flow will signal a stop for 
       // all new items 
       Console.WriteLine("Finished producing"); 
       buffer1.Complete(); 
      }); 

     Task.WaitAll(produceTask); 
     Console.WriteLine("Produced complete"); 

     action1.Completion.Wait();//Will add all the items to buffer2 
     Console.WriteLine("Action1 complete"); 

     buffer2.Complete();//will not get any new items 
     action2.Completion.Wait();//Process the batch of 5 and then complete 

     Task.Wait (action3.Completion); 

     Console.WriteLine("Process complete"); 
     Console.ReadLine(); 
    } 
+0

Wie Sie gesagt haben, sollten Sie TPL Datenfluss verwendet werden. Haben Sie eine konkrete Frage dazu? – i3arnon

+0

"Die AsParallel-Blöcke, bis alle Aufgaben beendet sind" Dies geschieht nicht, weil Ihr Lambda sofort zurückkehrt, weil es asynchron ist. Dies ist ein Fehler. – usr

+0

Ok. Also, wie das zu beheben? – user330612

Antwort

1

Sie waren in der Nähe, in TPL Daten von einem Block zum anderen fließt, und Sie sollten zu diesem Paradigma zu halten versuchen. So sollte zum Beispiel action1 ein TransformManyBlock sein, weil ein ActionnBlock ein ITargetBlock ist (d. H. Ein Terminierungsblock).

Wenn Sie die Propagierungsvervollständigung für eine Verbindung angeben, wird das vollständige Ereignis automatisch durch den Block geleitet, sodass Sie nur eine Wartezeit() für den letzten Block ausführen müssen.

Denken Sie an ist wie eine Dominos-Kette, die Sie auf dem ersten Block vollständig aufrufen und es wird durch die Kette bis zum letzten Block propagieren.

Sie sollten auch überlegen, was und warum Sie Multithreading sind; Ihr Beispiel ist stark I/O-gebunden, und ich glaube nicht, dass das Binden eines Threads, um auf I/O-Vervollständigung zu warten, die richtige Lösung ist.

Denken Sie schließlich daran, was blockiert oder nicht. In Ihrem Beispiel ist buffer1.Post (...) nicht ein blockierender Aufruf, Sie haben keinen Grund, das in einer Aufgabe zu haben.

Ich habe den folgenden Beispielcode geschrieben, die TPL Dataflow verwendet:

static void Main(string[] args) 
    { 
     var filePath = "C:\\test.csv"; 
     var chunkSize = 1024; 
     var batchSize = 128; 

     var linkCompletion = new DataflowLinkOptions 
     { 
      PropagateCompletion = true 
     }; 

     var uploadData = new ActionBlock<IEnumerable<string>>(
      async (data) => 
      { 
       WebClient client = new WebClient(); 
       var payload = data.SelectMany(x => x).ToArray(); 
       byte[] bytes = System.Text.Encoding.ASCII.GetBytes(payload); 
       //await client.UploadDataTaskAsync("myserver.com", bytes); 
       await Task.Delay(2000); 
      }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded /* Prefer to limit that to some reasonable value */ }); 

     var lineBuffer = new BatchBlock<string>(chunkSize); 

     var splitData = new TransformManyBlock<IEnumerable<string>, IEnumerable<string>>(
      (data) => 
      { 
       // Partition each chunk into smaller chunks grouped on column 1 
       var partitions = data.GroupBy(c => c.Split(',')[0]); 

       // Further beakdown the chunks into batch size groups 
       var groups = partitions.Select(x => x.Select((i, index) => new { i, index }).GroupBy(g => g.index/batchSize, e => e.i)); 

       // Get batches from groups 
       var batches = groups.SelectMany(x => x).Select(y => y.Select(z => z)); 

       // Don't forget to enumerate before returning 
       return batches.ToList(); 
      }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded }); 
     lineBuffer.LinkTo(splitData, linkCompletion); 
     splitData.LinkTo(uploadData, linkCompletion); 

     foreach (var line in File.ReadLines(filePath)) 
     { 
      lineBuffer.Post(line); 
     } 
     lineBuffer.Complete(); 

     // Wait for uploads to finish 
     uploadData.Completion.Wait(); 
    }