Ich habe die folgende Methode geschrieben, um eine riesige CSV-Datei im Stapel zu verarbeiten. Die Idee besteht darin, einen Teil der Zeilen aus der Datei in den Speicher einzulesen und diese Zeilen dann in Stapel fester Größe aufzuteilen. Sobald wir die Partitionen erhalten haben, senden Sie diese Partitionen an einen Server (sync oder async), was eine Weile dauern kann.Wie werden asynchrone Vorgänge in einem TPL Dataflow für beste Leistung ausgeführt?
Dieses Stück Code scheint nicht sehr effizient aus zwei Gründen bcoz.
Der Hauptthread, der aus der CSV-Datei liest, wird blockiert, bis alle Partitionen verarbeitet sind.
Die AsParallel-Blöcke, bis alle Aufgaben abgeschlossen sind. Wenn also im Thread-Pool mehr Threads für die Arbeit verfügbar sind, verwende ich sie nicht, da die Anzahl der Tasks durch keine der Partitionen gebunden ist.
Die Chargengröße festgelegt ist, so kann nicht geändert werden, aber chunkSize für Performance abstimmbaren. Ich kann eine ausreichend große chunkSize wählen, so dass keine erstellten Chargen >> keine der im System verfügbaren Threads sind, aber es bedeutet immer noch, dass die Parallel.ForEach-Methode blockiert, bis alle Aufgaben beendet sind.
Wie kann ich den Code so ändern, dass alle verfügbaren Threads im System verwendet werden, um die Arbeit zu erledigen, ohne im Leerlauf zu sitzen. Ich denke, ich könnte eine BlockingCollection verwenden, um die Chargen zu speichern, aber nicht sicher, welche Kapazitätsgröße es geben soll, da keine Charge in jedem Chunk dynamisch ist.
Haben Sie Ideen, wie Sie mithilfe von TPL die Thread-Auslastung maximieren können, sodass die meisten verfügbaren Threads auf dem System immer funktionieren?
UPDATE: Dies ist, was ich bisher mit TPL Datenfluss. Ist das richtig?
private static void UploadData (string filePath, int chunkSize, int batchSize)
{
BatchBlock<string> buffer1 = new BatchBlock<string>(chunkSize);
BufferBlock<IEnumerable<string>> buffer2 = new BufferBlock<IEnumerable<string>>();
var action1 = new ActionBlock<string[]>(t =>
{
Console.WriteLine("Got a chunk of lines " + t.Count());
// Partition each chunk into smaller chunks grouped on column 1
var partitions = t.GroupBy(c => c.Split(',')[0], (key, g) => g);
// Further beakdown the chunks into batch size groups
var groups = partitions.Select(x => x.Select((i, index) => new { i, index }).GroupBy(g => g.index/batchSize, e => e.i));
// Get batches from groups
var batches = groups.SelectMany(x => x).Select(y => y.Select(z => z));
foreach(var batch in batches)
{
buffer2.Post(batch);
}
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
buffer1.LinkTo(action1, new DataflowLinkOptions { PropagateCompletion = true });
var action2 = new TransformBlock<IEnumerable<string>, IEnumerable<string>>(async b =>
{
await ExecuteBatch(b);
return b;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
buffer2.LinkTo(action2, new DataflowLinkOptions { PropagateCompletion = true });
var action3 = new ActionBlock<IEnumerable<string>>(b =>
{
Console.WriteLine("Finised executing a batch");
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
action2.LinkTo(action3, new DataflowLinkOptions { PropagateCompletion = true });
Task produceTask = Task.Factory.StartNew(() =>
{
foreach (var line in File.ReadLines(filePath))
{
buffer1.Post(line);
}
//Once marked complete your entire data flow will signal a stop for
// all new items
Console.WriteLine("Finished producing");
buffer1.Complete();
});
Task.WaitAll(produceTask);
Console.WriteLine("Produced complete");
action1.Completion.Wait();//Will add all the items to buffer2
Console.WriteLine("Action1 complete");
buffer2.Complete();//will not get any new items
action2.Completion.Wait();//Process the batch of 5 and then complete
Task.Wait (action3.Completion);
Console.WriteLine("Process complete");
Console.ReadLine();
}
Wie Sie gesagt haben, sollten Sie TPL Datenfluss verwendet werden. Haben Sie eine konkrete Frage dazu? – i3arnon
"Die AsParallel-Blöcke, bis alle Aufgaben beendet sind" Dies geschieht nicht, weil Ihr Lambda sofort zurückkehrt, weil es asynchron ist. Dies ist ein Fehler. – usr
Ok. Also, wie das zu beheben? – user330612