2016-07-01 27 views
6

Ich starte mehrere Prozesse, um eine Liste neuer Objekte zu erstellen. htop zeigt mir zwischen 1 und 4 Prozesse (ich erstelle immer 3 neue Objekte).Python3: Multiprozessing verbraucht viel RAM und verlangsamt

def foo(self): 
    with multiprocessing.Pool(processes=3, maxtasksperchild=10) as pool: 
     result = pool.map_async(self.new_obj, self.information) 
     self.new_objs = result.get() 
     pool.terminate() 
    gc.collect() 

ich foo() mehrmals aufrufen, jedes Mal aufgerufen wird, wird der gesamte Prozess läuft langsamer, das Programm nicht einmal am Ende beenden, wie es unten zu viel verlangsamt. Das Programm beginnt, all meinen RAM aufzufressen, während der sequentielle Ansatz keine signifikante RAM-Nutzung hat.

Wenn ich das Programm abbringe, war das meistens die Funktion, die das Programm zuletzt ausgeführt hat.

->File "threading.py", line 293, in wait 
    waiter.acquire() 

bearbeiten einige Informationen über meine Verhältnisse zu geben. Ich erstelle einen Baum aus Knoten. foo() wird von einem übergeordneten Knoten aufgerufen, um seine untergeordneten Knoten zu erstellen. Die von den Prozessen zurückgegebenen result sind diese untergeordneten Knoten. Diese werden in einer Liste am übergeordneten Knoten gespeichert. Ich möchte die Erstellung dieser Kindknoten parallelisieren, anstatt sie sequentiell zu erstellen.

Antwort

2

Ich denke, Ihr Problem hat hauptsächlich damit zu tun, dass Ihre parallelisierte Funktion eine Methode des Objekts ist. Es ist schwer, ohne weitere Informationen sicher zu sein, aber dieses kleine Spielzeug-Programm betrachten:

import multiprocessing as mp 
import numpy as np 
import gc 


class Object(object): 
    def __init__(self, _): 
     self.data = np.empty((100, 100, 100), dtype=np.float64) 


class Container(object): 
    def __new__(cls): 
     self = object.__new__(cls) 
     print("Born") 
     return self 

    def __init__(self): 
     self.objects = [] 

    def foo(self): 
     with mp.Pool(processes=3, maxtasksperchild=10) as pool: 
      result = pool.map_async(self.new_obj, range(50)) 
      self.objects.extend(result.get()) 
      pool.terminate() 
     gc.collect() 

    def new_obj(self, i): 
     return Object(i) 

    def __del__(self): 
     print("Dead") 


if __name__ == '__main__': 
    c = Container() 
    for j in range(5): 
     c.foo() 

Jetzt Container nur einmal aufgerufen wird, so dass man von einem "Dead" wird ausgedruckt ein "Born", gefolgt, um zu sehen erwarten würde; aber da der Code, der von den Prozessen ausgeführt wird, eine Methode des Containers ist, bedeutet dies, dass der ganze Container an anderer Stelle ausgeführt werden muss! Das Ausführen dieses, werden Sie einen Strom von verwirbelt "Born" sehen und "Dead" als Container der Karte auf jeder Ausführung neu erstellt wird:

Born 
Born 
Born 
Born 
Born 
Dead 
Born 
Dead 
Dead 
Born 
Dead 
Born 
... 
<MANY MORE LINES HERE> 
... 
Born 
Dead 

sich zu überzeugen, dass der gesamte Behälter um jedes Mal kopiert und gesendet wird , versuchen, einige nicht-serialisable Wert zu setzen:

def foo(self): 
    with mp.Pool(processes=3, maxtasksperchild=10) as pool: 
     result = pool.map_async(self.new_obj, range(50)) 
     self.fn = lambda x: x**2 
     self.objects.extend(result.get()) 
     pool.terminate() 
    gc.collect() 

Welche sofort eine AttributeError erheben, da sie den Behälter nicht serialise kann.

Lasst uns Fazit: wenn dem Pool 1000 Anfragen senden, Container werden an die Prozesse serialisiert, gesendet werden und deserialised dort ein 1000-mal. Sicher, sie werden irgendwann fallengelassen (vorausgesetzt, es gibt nicht zu viele seltsame Querverweise), aber das wird definitiv einen großen Druck auf den RAM ausüben, da das Objekt serialisiert ist, genannt, aktualisiert, reserialisiert ... für jeden Element in Ihren zugeordneten Eingängen.

Wie können Sie das lösen? Nun, im Idealfall nicht teilen Zustand:

def new_obj(_): 
    return Object(_) 


class Container(object): 
    def __new__(cls): 
     self = object.__new__(cls) 
     print("Born") 
     return self 

    def __init__(self): 
     self.objects = [] 

    def foo(self): 
     with mp.Pool(processes=3, maxtasksperchild=10) as pool: 
      result = pool.map_async(new_obj, range(50)) 
      self.objects.extend(result.get()) 
      pool.terminate() 
     gc.collect() 

    def __del__(self): 
     print("Dead") 

Damit ist in einem Bruchteil der Zeit, und nur produziert die kleinste Blimp auf dem RAM (wie ein einzelne Container je gebaut wird).Wenn Sie einen Teil der internen Zustand müssen dort übergeben werden, entpacken Sie es und senden Sie genau das:

def new_obj(tup): 
    very_important_state, parameters = tup 
    return Object(very_important_state=very_important_state, 
        parameters=parameters) 


class Container(object): 
    def __new__(cls): 
     self = object.__new__(cls) 
     print("Born") 
     return self 

    def __init__(self): 
     self.objects = [] 

    def foo(self): 
     important_state = len(self.objects) 
     with mp.Pool(processes=3, maxtasksperchild=10) as pool: 
      result = pool.map_async(new_obj, 
            ((important_state, i) for i in range(50))) 
      self.objects.extend(result.get()) 
      pool.terminate() 
     gc.collect() 

    def __del__(self): 
     print("Dead") 

Dies das gleiche Verhalten hat wie zuvor. Wenn Sie absolut nicht vermeiden können, teilen einige veränderlichen Zustand zwischen den Prozessen, Kasse the multiprocessing tools für das zu tun, ohne alles überall kopieren müssen jedes Mal.

+0

Bitte sehen Sie meine Bearbeitung. Wenn ich dich richtig verstanden habe, muss ich in jedem Prozess eine externe Methode außerhalb meines Objekts aufrufen? – Jonas

+0

Die parallelisierte Funktion 'self.new_obj', die eine Methode des Objekts ist, _erfordert_ den gesamten Elternknoten, der bei jedem Aufruf serialisiert und umgeleitet werden soll; wenn Sie diese Methode so extrahieren können, dass die _function_ 'new_obj (...)' einen (einfachen, verwaisten, 'zustandslosen') neuen Knoten zurückgibt und 'foo' dafür verantwortlich ist, sie zu verlinken (refs eltern <-> Kind hinzufügen usw.) .. aber im _calling process_) wird dieses ganze Problem wahrscheinlich verschwinden: Kindprozesse erfordern nur einen minimalen Zustand, um herumgesandt zu werden. – val