Ich habe eine 25Gb-Klartext-Datei mit ~ 10 Millionen Zeilen, mehrere hundert Wörter pro Zeile. Jede Zeile muss einzeln verarbeitet werden, und ich versuche, Stücke auf ein Dutzend Arbeiter zu verteilen, die parallel verarbeitet werden. Derzeit wird in einer Million Zeilen auf einmal geladen (dies nimmt aus irgendeinem Grund ~ 10 GB in RAM auf, obwohl es nur ~ 3 GB auf der Festplatte dekomprimiert ist), teilt es gleichmäßig auf 12 Arten auf und ordnet es dann 12 Arbeitern zu, die multiprocessing.Pool verwenden.Python nicht RAM bei der Verarbeitung von großen Dateien parallel
Problem ist, wenn jeder meiner 12 Arbeiter die Verarbeitung ihrer zugewiesenen Daten beendet, ihr RAM wird nicht freigegeben und erhöht nur ein weiteres ~ 10 GB bei der nächsten Millionen Zeilen Iteration.
Ich habe versucht, die vorherigen Daten zu löschen, die vorherigen Daten auf eine leere Zuordnung zurückzusetzen, iterable Variablennamen mit eval(), gc.collect() nach dem Löschen zu erstellen und die IO vollständig zu trennen eigene Funktion, alles ohne Glück und das gleiche Problem. Das Ausführen von Debug zeigt, dass der Python-Interpreter nur die erwarteten Daten erkennt und dass die Daten der vorherigen Iteration nicht zugänglich sind. Warum wird der RAM also nicht freigegeben?
Der folgende Code ist meine letzte Iteration des Versuches, alle Umgebungen zu trennen, nicht die effizienteste, aber "BigFileOnDisk" ist auf einer SSD, so dass das erneute Lesen der Datei jedes Mal vernachlässigbar ist verglichen mit der tatsächlichen Verarbeitung der Daten. Zuvor hatte die "Lesen" -Funktion innerhalb der Zuweisungsfunktion, löschte alle Daten nach den Arbeitern abgeschlossen, mit den gleichen Ergebnissen.
def allocation():
fileCompleted = False
currentLine = 0
while not fileCompleted:
lineData, currentLine, fileCompleted = read(numLines=1000000, startLine=currentLine)
list_of_values(function_object=worker, inputs=lineData, workers=12)
def read(numLines, startLine=0):
currentLine = 0
lines = []
with open(BigFileOnDisk, 'r') as fid:
for line in fid:
if currentLine >= startLine:
lines.append(line)
if currentLine - startLine >= numLines:
return lines, counter, False
currentLine += 1
# or if we've hit the end of the file
return lines, counter, True
def worker(lines):
outputPath = *root* + str(datetime.datetime.now().time())
processedData = {}
for line in lines:
# process data
del lines
with open(outputPath, 'a') as fid:
for item in processedData:
fid.write(str(item) + ', ' + str(processedData[item]) + '\n')
def list_of_values(function_object, inputs, workers = 10):
inputs_split = []
subsection_start = 0
for n in range(workers):
start = int(subsection_start)
end = int(subsection_start + len(inputs)/workers)
subsection_start = end
inputs_split.append(inputs[start:end])
p = Pool(workers)
p.map(function_object, inputs_split)
Wir müssen Ihren Code sehen !! – refi64
@ kirbyfan64sos Vorzugsweise ein [mcve] davon. – jpmc26
Code wurde geschrieben – MKennedy