Ich habe ein Phantom-Problem mit einem meiner Komponententests. Ich verwende ein ThreadPool
von multiprocessing
Paket zum Einwickeln stdout
und stderr
Funktionen aus meiner Klasse mit paramiko
. Während der Erstellung habe ich einige reale Tests mit dem unten stehenden Code durchgeführt und es funktioniert gut. Aber während des Schreibens des Einheitentests für diesen Code habe ich das Problem bekommen, dass diese Verwendung von ThreadPool
nicht funktioniert.ThreadPool von Python Multiprocessing hängt
Dieser Teil hängt für etwa 95 Prozent der Zeit und irgendwie wird manchmal richtig ausgeführt.
while not (self.__stdout_async_r.ready() and self.__stderr_async_r.ready()): time.sleep(WAIT_FOR_DATA)
Ich habe die Werte während des Debuggens überprüft und ich habe, dass manchmal herausgefunden, gibt es die eine oder andere Bedingung fertig gesetzt, aber der andere nicht. Aber beide Funktionen sind bereits beendet, so dass die Ergebnisse nur nach dem Zustand fragen, der in Zukunft nie geändert wird.
Der Code für reproduce (mit Funktionalität für dieses Problem erforderlich):
import time
from multiprocessing.pool import ThreadPool
class ExecResult(object):
def __init__(self, command=None, exit_status_func=None,
receive_stdout_func=None, receive_stderr_func=None,
connection=None):
self.connection = connection
self.stdout = None
self.stderr = None
self.ecode = None
self.ts_stop = None
self._exit_status_f = exit_status_func
self.result_available = False
self.__fetch_streams(receive_stdout_func, receive_stderr_func)
def wait_for_data(self):
WAIT_FOR_DATA = 0.1
if not self.result_available:
# Here it hangs out for 95 percent
while not (self.__stdout_async_r.ready() and self.__stderr_async_r.ready()):
time.sleep(WAIT_FOR_DATA)
self.result_available = True
self.ts_stop = time.time()
self.stdout = self.__stdout_async_r.get(timeout=2)
self.stderr = self.__stderr_async_r.get(timeout=2)
self.ecode = self._exit_status_f()
def __fetch_streams(self, stdout_func, stderr_func):
stdout_t = ThreadPool(processes=1)
stderr_t = ThreadPool(processes=1)
self.__stdout_async_r = stdout_t.apply_async(func=stdout_func)
self.__stderr_async_r = stderr_t.apply_async(func=stderr_func)
stdout_t.close()
stderr_t.close()
def stderr():
return "stderr"
def stdout():
return "stdout"
def exit():
return "0"
# actual reproduction
res = ExecResult(None, exit, stdout, stderr, None)
res.wait_for_data() #if are data available get them or wait
print res.stdout
print res.stderr
print res.ecode