2009-07-15 2 views
34

Ich habe ein Multithread-Programm, wo ich eine Generatorfunktion erstellen und dann an neue Threads übergeben. Ich möchte, dass es in der Natur geteilt/global ist, so dass jeder Thread den nächsten Wert vom Generator erhalten kann.Sind Generatoren Threadssafe?

Ist es sicher, einen Generator wie diesen zu verwenden, oder bekomme ich Probleme/Bedingungen beim Zugriff auf den geteilten Generator aus mehreren Threads?

Wenn nicht, gibt es einen besseren Weg, das Problem anzugehen? Ich brauche etwas, das eine Liste durchläuft und den nächsten Wert für den Thread erzeugt, der es aufruft.

Antwort

49

Es ist nicht Thread-Safe; gleichzeitige Aufrufe können verschachteln und mit den lokalen Variablen umgehen.

Der übliche Ansatz besteht darin, das Master-Slave-Muster zu verwenden (jetzt Bauer-Arbeiter-Muster im PC genannt). Erstellen Sie einen dritten Thread, der Daten generiert, und fügen Sie eine Warteschlange zwischen dem Master und den Slaves hinzu, in denen die Slaves aus der Warteschlange lesen und der Master darauf schreiben wird. Das Standard-Warteschlangenmodul stellt die erforderliche Thread-Sicherheit bereit und veranlasst, den Master zu blockieren, bis die Slaves bereit sind, mehr Daten zu lesen.

+7

Definitiv +1 für Queue.Queue, gute Möglichkeit, Threading-System zu organisieren, wenn anwendbar (das ist die meiste Zeit und definitiv für diese Aufgabe). –

-7

Es hängt davon ab, welche Python-Implementierung Sie verwenden. In CPython macht die GIL alle Operationen an Python-Objekten threadsicher, da nur ein Thread zu einem bestimmten Zeitpunkt Code ausführen kann.

http://en.wikipedia.org/wiki/Global_Interpreter_Lock

+1

"die GIL macht alle Operationen auf Python-Objekten threadsafe" - huh? Alle Operationen sind nicht atomar –

+6

Dies ist gefährlich irreführend. Die GIL bedeutet nur, dass Python-Code den Python-Status in einer Multithread-Umgebung nicht beschädigt: Sie können Threads nicht mitten in einem Bytecode-Op ändern. (Sie können z. B. ein freigegebenes Diktat ändern, ohne es zu beschädigen.) Sie können weiterhin Threads zwischen zwei beliebigen Bytecode-Ops ändern. –

40

Edited Benchmark unten hinzuzufügen.

Sie können einen Generator mit einem Schloss umwickeln. Zum Beispiel

import threading 
class LockedIterator(object): 
    def __init__(self, it): 
     self.lock = threading.Lock() 
     self.it = it.__iter__() 

    def __iter__(self): return self 

    def next(self): 
     self.lock.acquire() 
     try: 
      return self.it.next() 
     finally: 
      self.lock.release() 

gen = [x*2 for x in [1,2,3,4]] 
g2 = LockedIterator(gen) 
print list(g2) 

Locking 50ms auf meinem System nimmt, nimmt Queue 350ms. Die Warteschlange ist nützlich, wenn Sie wirklich eine Warteschlange haben. Wenn Sie beispielsweise eingehende HTTP-Anforderungen haben und diese für die Verarbeitung durch Arbeitsthreads in die Warteschlange stellen möchten. (Das passt nicht in das Python-Iterator-Modell - wenn ein Iterator keine Elemente mehr hat, ist es fertig.) Wenn Sie wirklich einen Iterator haben, dann ist LockedIterator ein schneller und einfacher Weg, um Thread-sicher zu machen.

from datetime import datetime 
import threading 
num_worker_threads = 4 

class LockedIterator(object): 
    def __init__(self, it): 
     self.lock = threading.Lock() 
     self.it = it.__iter__() 

    def __iter__(self): return self 

    def next(self): 
     self.lock.acquire() 
     try: 
      return self.it.next() 
     finally: 
      self.lock.release() 

def test_locked(it): 
    it = LockedIterator(it) 
    def worker(): 
     try: 
      for i in it: 
       pass 
     except Exception, e: 
      print e 
      raise 

    threads = [] 
    for i in range(num_worker_threads): 
     t = threading.Thread(target=worker) 
     threads.append(t) 
     t.start() 

    for t in threads: 
     t.join() 

def test_queue(it): 
    from Queue import Queue 
    def worker(): 
     try: 
      while True: 
       item = q.get() 
       q.task_done() 
     except Exception, e: 
      print e 
      raise 

    q = Queue() 
    for i in range(num_worker_threads): 
     t = threading.Thread(target=worker) 
     t.setDaemon(True) 
     t.start() 

    t1 = datetime.now() 

    for item in it: 
     q.put(item) 

    q.join() 

start_time = datetime.now() 
it = [x*2 for x in range(1,10000)] 

test_locked(it) 
#test_queue(it) 
end_time = datetime.now() 
took = end_time-start_time 
print "took %.01f" % ((took.seconds + took.microseconds/1000000.0)*1000) 
+1

Weniger effizient als mit einer Queue.Queue, aber schön gemacht. – gooli