2014-09-24 4 views
8

Wird im ThreadPoolExecutor (TPE) der Rückruf immer garantiert im selben Thread wie die übergebene Funktion ausgeführt?Python ThreadPoolExecutor - wird der Callback garantiert im selben Thread wie die übergebene Funktion ausgeführt?

Zum Beispiel habe ich dies mit dem folgenden Code getestet. Ich lief es viele Male und es schien wie func und callback lief immer im selben Thread.

import concurrent.futures 
import random 
import threading 
import time 

executor = concurrent.futures.ThreadPoolExecutor(max_workers=3) 

def func(x): 
    time.sleep(random.random()) 
    return threading.current_thread().name 

def callback(future): 
    time.sleep(random.random()) 
    x = future.result() 
    cur_thread = threading.current_thread().name 
    if (cur_thread != x): 
     print(cur_thread, x) 

print('main thread: %s' % threading.current_thread()) 
for i in range(10000): 
    future = executor.submit(func, i) 
    future.add_done_callback(callback) 

jedoch schien es nicht, wenn ich die time.sleep(random.random()) Aussagen entfernt, das heißt wenigstens einige func Funktionen und callbackstat nicht Lauf im selben Thread.

Für ein Projekt, an dem ich arbeite, muss der Callback immer auf dem gleichen Thread wie die übergebene Funktion laufen, also wollte ich sicher sein, dass dies von TPE garantiert wird. (Und auch die Ergebnisse des Tests ohne den zufälligen Schlaf schienen rätselhaft).

Ich schaute auf die source code for executors und es scheint nicht wie wir den Thread auf den Haupt-Thread wechseln, bevor wir den Rückruf ausführen. Aber ich wollte nur sicher sein.

Antwort

6

Der Rückruf für Future, der an eine ThreadPoolExecutor gesendet wurde, wird in demselben Thread ausgeführt, in dem die Aufgabe ausgeführt wird, aber nur, wenn der Rückruf vor dem Abschluss der Aufgabe zu Future hinzugefügt wird. Wenn Sie den Rückruf nach den Future abgeschlossen ist hinzuzufügen, wird ausgeführt, der Rückruf in was auch immer Thread aufgerufen Sie add_done_callback Du dies, indem man die add_done_callback Quelle sehen.

def add_done_callback(self, fn): 
    """Attaches a callable that will be called when the future finishes. 

    Args: 
     fn: A callable that will be called with this future as its only 
      argument when the future completes or is cancelled. The callable 
      will always be called by a thread in the same process in which 
      it was added. If the future has already completed or been 
      cancelled then the callable will be called immediately. These 
      callables are called in the order that they were added. 
    """ 
    with self._condition: 
     if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]: 
      self._done_callbacks.append(fn) 
      return 
    fn(self) 

Wenn der Zustand des Future gibt es storniert oder Fertig, fn wird gerade im aktuellen Thread der Ausführung sofort aufgerufen. Andernfalls wird es zu einer internen Liste von Callbacks hinzugefügt, die ausgeführt werden sollen, wenn die Future abgeschlossen ist.

Zum Beispiel:

>>> def func(*args): 
... time.sleep(5) 
... print("func {}".format(threading.current_thread())) 
>>> def cb(a): print("cb {}".format(threading.current_thread())) 
... 
>>> fut = ex.submit(func) 
>>> func <Thread(Thread-1, started daemon 140084551563008)> 
>>> fut = e.add_done_callback(cb) 
cb <_MainThread(MainThread, started 140084622018368)> 
+0

Und was 'ProcessPoolExecutor'? Callbacks "get_ident" ist auch anders – Winand