4

Ich habe eine 25Gb-Klartext-Datei mit ~ 10 Millionen Zeilen, mehrere hundert Wörter pro Zeile. Jede Zeile muss einzeln verarbeitet werden, und ich versuche, Stücke auf ein Dutzend Arbeiter zu verteilen, die parallel verarbeitet werden. Derzeit wird in einer Million Zeilen auf einmal geladen (dies nimmt aus irgendeinem Grund ~ 10 GB in RAM auf, obwohl es nur ~ 3 GB auf der Festplatte dekomprimiert ist), teilt es gleichmäßig auf 12 Arten auf und ordnet es dann 12 Arbeitern zu, die multiprocessing.Pool verwenden.Python nicht RAM bei der Verarbeitung von großen Dateien parallel

Problem ist, wenn jeder meiner 12 Arbeiter die Verarbeitung ihrer zugewiesenen Daten beendet, ihr RAM wird nicht freigegeben und erhöht nur ein weiteres ~ 10 GB bei der nächsten Millionen Zeilen Iteration.

Ich habe versucht, die vorherigen Daten zu löschen, die vorherigen Daten auf eine leere Zuordnung zurückzusetzen, iterable Variablennamen mit eval(), gc.collect() nach dem Löschen zu erstellen und die IO vollständig zu trennen eigene Funktion, alles ohne Glück und das gleiche Problem. Das Ausführen von Debug zeigt, dass der Python-Interpreter nur die erwarteten Daten erkennt und dass die Daten der vorherigen Iteration nicht zugänglich sind. Warum wird der RAM also nicht freigegeben?

Der folgende Code ist meine letzte Iteration des Versuches, alle Umgebungen zu trennen, nicht die effizienteste, aber "BigFileOnDisk" ist auf einer SSD, so dass das erneute Lesen der Datei jedes Mal vernachlässigbar ist verglichen mit der tatsächlichen Verarbeitung der Daten. Zuvor hatte die "Lesen" -Funktion innerhalb der Zuweisungsfunktion, löschte alle Daten nach den Arbeitern abgeschlossen, mit den gleichen Ergebnissen.

def allocation(): 
    fileCompleted = False 
    currentLine = 0 
    while not fileCompleted: 
     lineData, currentLine, fileCompleted = read(numLines=1000000, startLine=currentLine) 
     list_of_values(function_object=worker, inputs=lineData, workers=12) 


def read(numLines, startLine=0): 
    currentLine = 0 
    lines = [] 
    with open(BigFileOnDisk, 'r') as fid: 
     for line in fid: 
      if currentLine >= startLine: 
       lines.append(line) 
      if currentLine - startLine >= numLines: 
       return lines, counter, False 
      currentLine += 1 
     # or if we've hit the end of the file 
     return lines, counter, True 


def worker(lines): 
    outputPath = *root* + str(datetime.datetime.now().time()) 
    processedData = {} 

    for line in lines: 
     # process data 

    del lines 
    with open(outputPath, 'a') as fid: 
     for item in processedData: 
      fid.write(str(item) + ', ' + str(processedData[item]) + '\n') 


def list_of_values(function_object, inputs, workers = 10): 
    inputs_split = [] 
    subsection_start = 0 
    for n in range(workers): 
     start = int(subsection_start) 
     end = int(subsection_start + len(inputs)/workers) 
     subsection_start = end 

     inputs_split.append(inputs[start:end]) 

    p = Pool(workers) 
    p.map(function_object, inputs_split) 
+0

Wir müssen Ihren Code sehen !! – refi64

+0

@ kirbyfan64sos Vorzugsweise ein [mcve] davon. – jpmc26

+0

Code wurde geschrieben – MKennedy

Antwort

4

Sie treten keinem Unterprozess bei. Nach list_of_values getan Prozesse von noch am Leben (irgendwie, eher wie Zombie, aber mit lebenden Elternprozess). Sie halten immer noch alle ihre Werte. Sie können ihre Daten nicht in main anzeigen, da sie in anderen Prozessen (aus dem gleichen Grund gc.collect nicht funktioniert).

Um Arbeitsspeicher freizugeben, müssen Sie manuell Pool beitreten oder with verwenden.

+0

ah das tat es .. danke! – MKennedy