2016-04-19 7 views
0

Ich habe ein Phantom-Problem mit einem meiner Komponententests. Ich verwende ein ThreadPool von multiprocessing Paket zum Einwickeln stdout und stderr Funktionen aus meiner Klasse mit paramiko. Während der Erstellung habe ich einige reale Tests mit dem unten stehenden Code durchgeführt und es funktioniert gut. Aber während des Schreibens des Einheitentests für diesen Code habe ich das Problem bekommen, dass diese Verwendung von ThreadPool nicht funktioniert.ThreadPool von Python Multiprocessing hängt

Dieser Teil hängt für etwa 95 Prozent der Zeit und irgendwie wird manchmal richtig ausgeführt.

while not (self.__stdout_async_r.ready() and self.__stderr_async_r.ready()): 
    time.sleep(WAIT_FOR_DATA) 

Ich habe die Werte während des Debuggens überprüft und ich habe, dass manchmal herausgefunden, gibt es die eine oder andere Bedingung fertig gesetzt, aber der andere nicht. Aber beide Funktionen sind bereits beendet, so dass die Ergebnisse nur nach dem Zustand fragen, der in Zukunft nie geändert wird.

Der Code für reproduce (mit Funktionalität für dieses Problem erforderlich):

import time 
from multiprocessing.pool import ThreadPool 

class ExecResult(object): 
    def __init__(self, command=None, exit_status_func=None, 
       receive_stdout_func=None, receive_stderr_func=None, 
       connection=None): 
    self.connection = connection 
    self.stdout = None 
    self.stderr = None 
    self.ecode = None 
    self.ts_stop = None 
    self._exit_status_f = exit_status_func 
    self.result_available = False 
    self.__fetch_streams(receive_stdout_func, receive_stderr_func) 

    def wait_for_data(self): 
    WAIT_FOR_DATA = 0.1 

    if not self.result_available: 

     # Here it hangs out for 95 percent 
     while not (self.__stdout_async_r.ready() and self.__stderr_async_r.ready()): 
     time.sleep(WAIT_FOR_DATA) 

     self.result_available = True 
     self.ts_stop = time.time() 
     self.stdout = self.__stdout_async_r.get(timeout=2) 
     self.stderr = self.__stderr_async_r.get(timeout=2) 
     self.ecode = self._exit_status_f() 


    def __fetch_streams(self, stdout_func, stderr_func): 
    stdout_t = ThreadPool(processes=1) 
    stderr_t = ThreadPool(processes=1) 

    self.__stdout_async_r = stdout_t.apply_async(func=stdout_func) 
    self.__stderr_async_r = stderr_t.apply_async(func=stderr_func) 
    stdout_t.close() 
    stderr_t.close() 

def stderr(): 
    return "stderr" 

def stdout(): 
    return "stdout" 

def exit(): 
    return "0" 

# actual reproduction 
res = ExecResult(None, exit, stdout, stderr, None) 
res.wait_for_data() #if are data available get them or wait 
print res.stdout 
print res.stderr 
print res.ecode 

Antwort

1

Wie es gewöhnlich der Fall ist, fand ich für diese eine Antwort aus nach einiger Zeit verbrachte Fluchen und einen Tee zu tun.

Lösung ist dies nach dem Schließen Methoden hinzuzufügen:

stdout_t.join() 
stderr_t.join() 

Das ist also das reparierte Teil als Ganzes:

def __fetch_streams(self, stdout_func, stderr_func): 
    stdout_t = ThreadPool(processes=1) 
    stderr_t = ThreadPool(processes=1) 

    self.__stdout_async_r = stdout_t.apply_async(func=stdout_func) 
    self.__stderr_async_r = stderr_t.apply_async(func=stderr_func) 
    stdout_t.close() 
    stderr_t.close() 
    stdout_t.join() 
    stderr_t.join()