2015-09-15 7 views
7

Ich möchte Jobs von einem Thread an eine asyncio Ereignisschleife übergeben (genau wie run_in_executor aber umgekehrt).Senden Sie einen Job an eine asyncio Ereignisschleife

Hier ist, was die asyncio Dokumentation sagt über concurrency and multithreading:

von einem anderen Thread einen Rückruf zu planen, die BaseEventLoop.call_soon_threadsafe() Methode verwendet werden soll. Beispiel ein Koroutine von einem anderen Thread zu planen: loop.call_soon_threadsafe(asyncio.async, coro_func())

Das funktioniert gut, aber das Ergebnis der Koroutine verloren.

Stattdessen ist es möglich, eine Funktion zu verwenden, das einen getan Rückruf in die Zukunft von async (oder ensure_future), so dass der Faden das Ergebnis zugreifen kann durch ein concurrent.futures.Future zurückzuhinzufügt.

Gibt es einen bestimmten Grund, warum eine solche Funktion nicht in der Standardbibliothek implementiert ist? Oder habe ich einen einfacheren Weg vermisst, um das zu erreichen?

Antwort

6

Meine Anfrage hat seinen Weg gemacht und eine run_coroutine_threadsafe Funktion wurde implementiert here.

Beispiel:

def target(loop, timeout=None): 
    future = asyncio.run_coroutine_threadsafe(add(1, b=2), loop) 
    return future.result(timeout) 

async def add(a, b): 
    await asyncio.sleep(1) 
    return a + b 

loop = asyncio.get_event_loop() 
future = loop.run_in_executor(None, target, loop) 
assert loop.run_until_complete(future) == 3 

ich ursprünglich eine Unterklasse von concurrent.futures.Executor geschrieben, die noch so umgesetzt werden können:

class LoopExecutor(concurrent.futures.Executor): 
    """An Executor subclass that uses an event loop 
    to execute calls asynchronously.""" 

    def __init__(self, loop=None): 
     """Initialize the executor with a given loop.""" 
     self.loop = loop or asyncio.get_event_loop() 

    def submit(self, fn, *args, **kwargs): 
     """Schedule the callable, fn, to be executed as fn(*args **kwargs). 
     Return a Future object representing the execution of the callable.""" 
     coro = asyncio.coroutine(fn)(*args, **kwargs) 
     return asyncio.run_coroutine_threadsafe(coro, self.loop) 
+0

tun Sie dies in der Frage stellen wollen, so spielt es keine scheint eine Antwort zu sein –

+0

Nun, es ist eine Art von [eine Teilantwort auf meine eigene Frage] (http://StackOverflow.com/Help/self-answer), da es eine bessere Möglichkeit geben könnte, das gleiche zu erreichen. – Vincent

+0

wenn du es so siehst, ok :) –