8

Ich habe dieses Problem jetzt seit einer Woche angepackt und es ist ziemlich frustrierend geworden, denn jedes Mal, wenn ich ein einfacheres, aber ähnliches Beispiel verwende, wird es Multiprocessing sein. Die Art und Weise, wie es Shared Memory behandelt, verwirrte mich, weil es so begrenzt ist, dass es ziemlich schnell unbrauchbar werden kann.Bessere Möglichkeit, Speicher für Multiprocessing in Python zu teilen?

Also die grundlegende Beschreibung meines Problems ist, dass ich einen Prozess erstellen muss, der in einigen Parametern übergeben wird, um ein Bild zu öffnen und ca. 20K Patches der Größe 60x40 zu erstellen. Diese Patches werden jeweils in einer Liste 2 gespeichert und müssen an den Hauptthread zurückgegeben werden, um dann von zwei anderen gleichzeitigen Prozessen, die auf der GPU ausgeführt werden, erneut verarbeitet zu werden.

Der Prozess und der Workflow und alles, was meistens erledigt wird, was ich jetzt brauche, ist der Teil, der am einfachsten sein sollte, erweist sich als der schwierigste. Ich konnte nicht speichern und die Liste mit 20K Patches zurück zum Hauptthread bekommen.

Das erste Problem war, weil ich diese Patches als PIL-Bilder gespeichert habe. Ich habe dann herausgefunden, dass alle Daten, die zu einem Queue-Objekt hinzugefügt werden, gebeizt werden müssen. Das zweite Problem war, dass ich dann die Patches in ein Array von je 60x40 konvertiert und in einer Liste gespeichert habe. Und das geht jetzt immer noch nicht? Anscheinend haben Warteschlangen eine begrenzte Menge an Daten, die sie sonst speichern können, wenn Sie queue_obj.get() aufrufen, hängt das Programm.

Ich habe viele andere Dinge ausprobiert, und jede neue Sache, die ich versuche, funktioniert nicht, also würde ich gerne wissen, ob jemand andere Empfehlungen einer Bibliothek hat, die ich verwenden kann, um Objekte ohne all den Fuzz zu teilen?

Hier ist eine Beispiel-Implementierung der Art von was ich betrachte. Denken Sie daran, das funktioniert vollkommen in Ordnung, aber die vollständige Implementierung nicht. Und ich habe den Code, um Informationsnachrichten zu drucken, um zu sehen, dass die gespeicherten Daten die exakt gleiche Form und alles haben, aber aus irgendeinem Grund funktioniert es nicht. In der vollständigen Implementierung wird der unabhängige Prozess erfolgreich abgeschlossen, jedoch bei q.get() eingefroren.

from PIL import Image 
from multiprocessing import Queue, Process 
import StringIO 
import numpy 

img = Image.open("/path/to/image.jpg") 
q = Queue() 
q2 = Queue() 
# 
# 
# MAX Individual Queue limit for 60x40 images in BW is 31,466. 
# Multiple individual Queues can be filled to the max limit of 31,466. 
# A single Queue can only take up to 31,466, even if split up in different puts. 
def rz(patch, qn1, qn2): 
    totalPatchCount = 20000 
    channels = 1 
    patch = patch.resize((60,40), Image.ANTIALIAS) 
    patch = patch.convert('L') 
    # ImgArray = numpy.asarray(im, dtype=numpy.float32) 
    list_im_arr = [] 
    # ----Create a 4D Array 
    # returnImageArray = numpy.zeros(shape=(totalPatchCount, channels, 40, 60)) 
    imgArray = numpy.asarray(patch, dtype=numpy.float32) 
    imgArray = imgArray[numpy.newaxis, ...] 
    # ----End 4D array 
    # list_im_arr2 = [] 
    for i in xrange(totalPatchCount): 
     # returnImageArray[i] = imgArray 
     list_im_arr.append(imgArray) 
    qn1.put(list_im_arr) 
    qn1.cancel_join_thread() 
    # qn2.cancel_join_thread() 
    print "PROGRAM Done" 

# rz(img,q,q2) 
# l = q.get() 

# 
p = Process(target=rz,args=(img, q, q2,)) 
p.start() 
p.join() 
# 
# # l = [] 
# # for i in xrange(1000): l.append(q.get()) 
# 
imdata = q.get() 
+0

Möchten Sie ein Bild in viele Patches (Kacheln) aufteilen und sie als Liste von Arrays speichern und mehrere Threads verwenden, um diesen Prozess zu beschleunigen? – user3667217

+0

Jedes Bild wird in viele Patches aufgeteilt und als eine einzige Liste von 20K-Patches pro Bild gespeichert. Dieser Teil ist alles in einem Prozess, ich brauche die Daten nicht zu teilen, ich muss nur diese spezifische Liste zurück zum Haupt-Thread erstellt bekommen. Also würden die mehreren Prozesse mehrere Listen von jeweils 20K Patches erstellen und sie an das Hauptprogramm zurücksenden, um nun zwei dieser Listen gleichzeitig auf meinen zwei GPUs zu verarbeiten. – alfredox

Antwort

6

Warteschlange ist für die Kommunikation zwischen Prozessen. In Ihrem Fall haben Sie nicht wirklich diese Art von Kommunikation. Sie können den Prozess einfach zurückgeben lassen und die Methode .get() verwenden, um sie zu sammeln. (Denken Sie daran if __name__ == "main": hinzufügen, finden Sie programming guideline)

from PIL import Image 
from multiprocessing import Pool, Lock 
import numpy 

img = Image.open("/path/to/image.jpg") 

def rz(): 
    totalPatchCount = 20000 
    imgArray = numpy.asarray(patch, dtype=numpy.float32) 
    list_im_arr = [imgArray] * totalPatchCount # A more elegant way than a for loop 
    return list_im_arr 

if __name__ == '__main__': 
    # patch = img.... Your code to get generate patch here 
    patch = patch.resize((60,40), Image.ANTIALIAS) 
    patch = patch.convert('L') 

    pool = Pool(2) 
    imdata = [pool.apply_async(rz).get() for x in range(2)] 
    pool.close() 
    pool.join() 

nun nach erste Antwort dieser post, Multiprozessing nur Objekte übergeben, die picklable ist. Das Beizen ist im Multiprocessing wahrscheinlich unvermeidlich, da Prozesse keinen Speicher teilen. Sie leben einfach nicht im selben Universum. (Sie erben Speicher, wenn sie zum ersten Mal erzeugt werden, aber sie können ihr eigenes Universum nicht erreichen). Das PIL-Bildobjekt selbst ist nicht pickbar. Sie können es einfügbar machen, indem Sie nur die darin gespeicherten Bilddaten extrahieren, wie dies post vorgeschlagen wird.

Da Ihr Problem hauptsächlich I/O-gebunden ist, können Sie auch Multi-Threading versuchen. Es könnte für Ihren Zweck noch schneller sein. Threads teilen alles, so dass kein Beizen erforderlich ist. Wenn Sie Python 3 verwenden, ist ThreadPoolExecutor ein wunderbares Werkzeug. Für Python 2 können Sie ThreadPool verwenden. Um eine höhere Effizienz zu erreichen, müssen Sie die Vorgehensweise neu anordnen, den Prozess aufteilen und verschiedene Threads ausführen lassen.

from PIL import Image 
from multiprocessing.pool import ThreadPool 
from multiprocessing import Lock 
import numpy 

img = Image.open("/path/to/image.jpg") 
lock = Lock(): 
totalPatchCount = 20000 

def rz(x): 
    patch = ... 
    return patch 

pool = ThreadPool(8) 
imdata = [pool.map(rz, range(totalPatchCount)) for i in range(2)] 
pool.close() 
pool.join() 
+0

Sie übergeben nur ein Argument in pool.apply_asyn (rz, args = (x,)), aber rz braucht zwei, ist das korrekt? Ich habe es auch versucht und einen Fehler bekommen, dass die Daten nicht gebeizt werden können. Selbst wenn Sie Daten über eine pool.get() -Methode erhalten, muss sie trotzdem gebeizt werden. – alfredox

+0

Nein, das ist nicht korrekt. Ich habe schnell ein paar Fehler gemacht. Ich habe meinen Code aktualisiert. Nichts sollte in meinem Code eingelegt werden. Können Sie die Zeile posten, die Ihnen den Fehler gibt?Diese – user3667217

+0

ist das, was ich bekomme: '' Attributetraceback (jüngste Aufforderung zuletzt) ​​ in () 18 imdata = [] ---> 20 mit Pool (Prozesse = 2) als Pool: 21 für x in Bereich (2): 22 res = pool.apply_asyn (rz, args = (Patch, x)) Attribute: __exit__' – alfredox

1

Sie sagen: „Anscheinend Warteschlangen haben eine begrenzte Menge an Daten, können sie sonst sparen, wenn Sie queue_obj.get() das Programm hängt nennen... "

Sie sind richtig und falsch gibt Es gibt eine begrenzte Menge an Informationen, die Queue ohne abgelassen halten Das Problem ist, dass, wenn Sie tun:

qn1.put(list_im_arr) 
qn1.cancel_join_thread() 

es plant die Kommunikation mit dem zugrundeliegenden Die qn1.cancel_join_thread() sagt dann "but it's cool if we exit without the scheduled put completing", und natürlich, ein paar Mikrosekunden später, wird die Worker-Funktion beendet und die Process beendet (ohne auf den Thread zu warten, der die Pipe füllt, um es tatsächlich zu tun; im besten Fall möglicherweise die ersten Bytes des Objekts gesendet haben, aber alles, was nicht in PIPE_BUF passt, wird fast sicher fallen gelassen; Du brauchst ein paar großartige Race Conditions, um überhaupt etwas zu bekommen, ganz zu schweigen von einem großen Objekt. So später, wenn Sie das tun:

imdata = q.get() 

nichts tatsächlich durch die (jetzt verlassen) Process gesendet. Wenn Sie q.get() aufrufen, wartet es auf Daten, die nie wirklich übertragen wurden.

Die andere Antwort ist richtig, dass im Falle der Berechnung und Übermittlung eines einzelnen Werts Queue s Overkill sind. Aber wenn Sie sie verwenden, müssen Sie sie richtig verwenden. Das Update wäre:

  1. den Anruf zu qn1.cancel_join_thread() entfernen, so dass die Process nicht beendet, bis die Daten über das Rohr übertragen wurde.
  2. Ihre Anrufe neu anordnen Deadlock

Rearranging zu vermeiden, ist nur dies:

p = Process(target=rz,args=(img, q, q2,)) 
p.start() 

imdata = q.get() 
p.join() 

p.join() nach q.get() zu bewegen; Wenn Sie zuerst join versuchen, wartet Ihr Hauptprozess darauf, dass das Kind beendet wird, und das Kind wartet darauf, dass die Warteschlange verbraucht wird, bevor es beendet wird (dies funktioniert möglicherweise, wenn die Leitung von Queue durch einen thread im Hauptprozess, aber es ist am besten, sich nicht auf Implementierungsdetails wie diese zu verlassen, dieses Formular ist korrekt, unabhängig von den Implementierungsdetails, solange put s und get s übereinstimmen.

+0

Ja ich habe es sowohl mit als auch ohne die qn1.cancel_join_thread() versucht, aber ich würde immer p anrufen.Join(), um auf den Abschluss des Prozesses vor dem Beenden zu warten. Vielen Dank für die tolle Erklärung, ich versuche immer noch, den vorgeschlagenen Code zum Laufen zu bringen. Ich werde dich wissen lassen, wie es läuft. – alfredox