2012-09-18 8 views
7

Ich verstehe nicht, warum Pipes unsicher sind, wenn es mehrere Absender und Empfänger gibt.Warum sind Python-Multiprocessing-Pipes unsicher?

Wie kann der folgende Code mit Queues in Code umgewandelt werden, wenn dies der Fall ist? Queues werfen Sie nicht EOFError wenn geschlossen, damit meine Prozesse nicht aufhören können. Sollte ich endlos "Poison" -Botschaften senden, um ihnen zu sagen, dass sie aufhören sollen (auf diese Weise bin ich sicher, dass alle meine Prozesse mindestens ein Gift erhalten)?

Ich möchte die Pipe p1 offen halten, bis ich anders beschließe (hier ist es, wenn ich die 10 Nachrichten gesendet habe).


from multiprocessing import Pipe, Process 
from random import randint, random 
from time import sleep 

def job(name, p_in, p_out): 
    print(name + ' starting') 
    nb_msg = 0 
    try: 
     while True: 
      x = p_in.recv() 
      print(name + ' receives ' + x) 
      nb_msg = nb_msg + 1 
      p_out.send(x) 
      sleep(random()) 
    except EOFError: 
     pass 
    print(name + ' ending ... ' + str(nb_msg) + ' message(s)') 

if __name__ == '__main__': 
    p1_in, p1_out = Pipe() 
    p2_in, p2_out = Pipe() 

    proc = [] 

    for i in range(3): 
     p = Process(target=job, args=(str(i), p1_out, p2_in)) 
     p.start() 
     proc.append(p) 

    for x in range(10): 
     p1_in.send(chr(97+x)) 
    p1_in.close() 
    for p in proc: 
     p.join() 
    p1_out.close() 
    p2_in.close() 

    try: 
     while True: 
      print(p2_out.recv()) 
    except EOFError: 
     pass 

    p2_out.close() 

Antwort

13

Im Wesentlichen ist das Problem, dass, um eine Plattform Pipe definierten Pipeobjekt eine dünne Hülle ist. recv empfängt einfach wiederholt einen Puffer von Bytes, bis ein vollständiges Python-Objekt erhalten wird. Wenn zwei Threads oder Prozesse recv auf demselben Rohr verwenden, können die Lesevorgänge verschachtelt werden, was jeden Prozess mit einem halben gebeizten Objekt belässt und somit die Daten korrumpiert. Queue s führen eine korrekte Synchronisation zwischen Prozessen durch, auf Kosten höherer Komplexität.

Da die multiprocessing Dokumentation sagt:

Beachten Sie, dass Daten in einem Rohr beschädigt werden können, wenn zwei Prozesse (oder Threads) versuchen, zu lesen oder zu gleicher Zeit auf das gleiche Ende des Rohres schreiben . Natürlich gibt es keine Gefahr von Verfälschungen durch Prozesse, bei denen unterschiedliche Enden des Rohrs gleichzeitig verwendet werden.

Sie müssen nicht endlos Giftpillen senden; eine pro Arbeiter ist alles was du brauchst. Jeder Arbeiter nimmt genau eine Giftpille auf, bevor er aussteigt. Es besteht also keine Gefahr, dass ein Arbeiter die Nachricht irgendwie vermisst.

Sie sollten auch multiprocessing.Pool verwenden, statt das "Worker-Prozess" -Modell neu zu implementieren - hat viele Methoden, die das Verteilen von Arbeit über mehrere Threads sehr einfach machen.

+0

Was passiert, wenn ich 'multiprocessing.Lock()' benutze, wenn 'recv' und' send' einer Pipe verwendet werden? Wird es sicher (und effizient)? – thuzhf

+0

Wenn Sie das tun, erhalten Sie am Ende eine 'Queue' -' multiprocessing.Queue' ist eine 'Pipe' mit einem Paar von Schlössern (eines für jede Richtung). Es wäre also sicher und einigermaßen effizient, aber Sie würden auch das Rad neu erfinden - warum nicht einfach 'Queue' verwenden? – nneonneo

7

Ich verstehe nicht, warum Pipes unsicher sind, wenn es mehrere Absender und Empfänger gibt.

Stellen Sie sich vor, dass Sie gleichzeitig Wasser aus der Quelle A und B in ein Rohr einfüllen. Am anderen Ende der Leitung wird es unmöglich sein herauszufinden, welcher Teil des Wassers von A oder B kam, oder? :)

Ein Pipe transportiert einen Datenstrom auf Byte-Ebene. Ohne ein Kommunikationsprotokoll darüber weiß es nicht, was eine Nachricht ist und kann daher die Nachrichtenintegrität nicht sicherstellen. Daher ist es nicht nur unsicher, Pipes mit mehreren Sendern zu verwenden. Dies ist ein wesentlicher Konstruktionsfehler und wird höchstwahrscheinlich zu Kommunikationsproblemen führen.

Warteschlangen sind jedoch auf einer höheren Ebene implementiert. Sie sind für die Kommunikation Nachrichten (oder auch abstrakte Objekte) konzipiert. Warteschlangen dienen dazu, eine Nachricht/ein Objekt eigenständig zu halten. Mehrere Quellen können Objekte in eine Warteschlange stellen, und mehrere Konsumenten können diese Objekte ziehen, während sie 100% sicher sind, dass alles, was als Einheit in die Warteschlange gelangt, auch als Einheit daraus hervorgeht.

bearbeitet nach einer Weile:

ich, dass in dem Byte-Stream hinzufügen sollte, alle Bytes in der gleichen Reihenfolge abgerufen werden, wie gesendet (garantiert). Das Problem mit mehreren Sendern besteht darin, dass die Sendereihenfolge (die Reihenfolge der Eingabe) möglicherweise bereits unklar oder zufällig ist, d. H. Mehrere Streams könnten sich auf unvorhersehbare Weise mischen.

Eine allgemeine Warteschlangenimplementierung garantiert, dass einzelne Nachrichten intakt bleiben, auch wenn mehrere Absender vorhanden sind. Nachrichten werden in der Reihenfolge, wie sie gesendet wird, ebenfalls abgerufen. Bei mehreren konkurrierenden Sendern und ohne weitere Synchronisationsmechanismen gibt es jedoch wiederum keine Garantie über die Reihenfolge der Eingangsnachrichten.