Ich denke, Ihr Problem hat hauptsächlich damit zu tun, dass Ihre parallelisierte Funktion eine Methode des Objekts ist. Es ist schwer, ohne weitere Informationen sicher zu sein, aber dieses kleine Spielzeug-Programm betrachten:
import multiprocessing as mp
import numpy as np
import gc
class Object(object):
def __init__(self, _):
self.data = np.empty((100, 100, 100), dtype=np.float64)
class Container(object):
def __new__(cls):
self = object.__new__(cls)
print("Born")
return self
def __init__(self):
self.objects = []
def foo(self):
with mp.Pool(processes=3, maxtasksperchild=10) as pool:
result = pool.map_async(self.new_obj, range(50))
self.objects.extend(result.get())
pool.terminate()
gc.collect()
def new_obj(self, i):
return Object(i)
def __del__(self):
print("Dead")
if __name__ == '__main__':
c = Container()
for j in range(5):
c.foo()
Jetzt Container
nur einmal aufgerufen wird, so dass man von einem "Dead"
wird ausgedruckt ein "Born"
, gefolgt, um zu sehen erwarten würde; aber da der Code, der von den Prozessen ausgeführt wird, eine Methode des Containers ist, bedeutet dies, dass der ganze Container an anderer Stelle ausgeführt werden muss! Das Ausführen dieses, werden Sie einen Strom von verwirbelt "Born"
sehen und "Dead"
als Container der Karte auf jeder Ausführung neu erstellt wird:
Born
Born
Born
Born
Born
Dead
Born
Dead
Dead
Born
Dead
Born
...
<MANY MORE LINES HERE>
...
Born
Dead
sich zu überzeugen, dass der gesamte Behälter um jedes Mal kopiert und gesendet wird , versuchen, einige nicht-serialisable Wert zu setzen:
def foo(self):
with mp.Pool(processes=3, maxtasksperchild=10) as pool:
result = pool.map_async(self.new_obj, range(50))
self.fn = lambda x: x**2
self.objects.extend(result.get())
pool.terminate()
gc.collect()
Welche sofort eine AttributeError
erheben, da sie den Behälter nicht serialise kann.
Lasst uns Fazit: wenn dem Pool 1000 Anfragen senden, Container
werden an die Prozesse serialisiert, gesendet werden und deserialised dort ein 1000-mal. Sicher, sie werden irgendwann fallengelassen (vorausgesetzt, es gibt nicht zu viele seltsame Querverweise), aber das wird definitiv einen großen Druck auf den RAM ausüben, da das Objekt serialisiert ist, genannt, aktualisiert, reserialisiert ... für jeden Element in Ihren zugeordneten Eingängen.
Wie können Sie das lösen? Nun, im Idealfall nicht teilen Zustand:
def new_obj(_):
return Object(_)
class Container(object):
def __new__(cls):
self = object.__new__(cls)
print("Born")
return self
def __init__(self):
self.objects = []
def foo(self):
with mp.Pool(processes=3, maxtasksperchild=10) as pool:
result = pool.map_async(new_obj, range(50))
self.objects.extend(result.get())
pool.terminate()
gc.collect()
def __del__(self):
print("Dead")
Damit ist in einem Bruchteil der Zeit, und nur produziert die kleinste Blimp auf dem RAM (wie ein einzelne Container
je gebaut wird).Wenn Sie einen Teil der internen Zustand müssen dort übergeben werden, entpacken Sie es und senden Sie genau das:
def new_obj(tup):
very_important_state, parameters = tup
return Object(very_important_state=very_important_state,
parameters=parameters)
class Container(object):
def __new__(cls):
self = object.__new__(cls)
print("Born")
return self
def __init__(self):
self.objects = []
def foo(self):
important_state = len(self.objects)
with mp.Pool(processes=3, maxtasksperchild=10) as pool:
result = pool.map_async(new_obj,
((important_state, i) for i in range(50)))
self.objects.extend(result.get())
pool.terminate()
gc.collect()
def __del__(self):
print("Dead")
Dies das gleiche Verhalten hat wie zuvor. Wenn Sie absolut nicht vermeiden können, teilen einige veränderlichen Zustand zwischen den Prozessen, Kasse the multiprocessing tools für das zu tun, ohne alles überall kopieren müssen jedes Mal.
Bitte sehen Sie meine Bearbeitung. Wenn ich dich richtig verstanden habe, muss ich in jedem Prozess eine externe Methode außerhalb meines Objekts aufrufen? – Jonas
Die parallelisierte Funktion 'self.new_obj', die eine Methode des Objekts ist, _erfordert_ den gesamten Elternknoten, der bei jedem Aufruf serialisiert und umgeleitet werden soll; wenn Sie diese Methode so extrahieren können, dass die _function_ 'new_obj (...)' einen (einfachen, verwaisten, 'zustandslosen') neuen Knoten zurückgibt und 'foo' dafür verantwortlich ist, sie zu verlinken (refs eltern <-> Kind hinzufügen usw.) .. aber im _calling process_) wird dieses ganze Problem wahrscheinlich verschwinden: Kindprozesse erfordern nur einen minimalen Zustand, um herumgesandt zu werden. – val