2016-07-15 11 views
3

Ich muss eine Sellerie-Aufgabe für jede GRPC-Anforderung aufrufen und das Ergebnis zurückgeben. In der Standard-GRPC-Implementierung wird jede Anforderung in einem separaten Thread von einem Threadpool verarbeitet.Wie implementiert man einen asynchronen Grpc Python Server?

In meinem Fall soll der Server ~ 400 Anfragen im Batch-Modus pro Sekunde verarbeiten. Daher muss eine Anforderung möglicherweise 1 Sekunde auf das Ergebnis aufgrund der Stapelverarbeitung warten, was bedeutet, dass die Größe des Threadpools größer als 400 sein muss, um Blockierungen zu vermeiden.

Kann dies asynchron durchgeführt werden? Vielen Dank.

class EventReporting(ss_pb2.BetaEventReportingServicer, ss_pb2.BetaDeviceMgtServicer): 
    def ReportEvent(self, request, context): 
    res = tasks.add.delay(1,2) 
    result = res.get() ->here i have to block 
    return ss_pb2.GeneralReply(message='Hello, %s!' % result.message) 

Antwort

3

Es kann asynchron erfolgen, wenn auf Ihrem Anruf res.get asynchron durchgeführt werden kann (wenn es mit dem async Schlüsselwort definiert ist).

While grpc.server says it requires a futures.ThreadPoolExecutor, it will actually work with any futures.Executor that calls the behaviors submitted to it on some thread other than the one on which they were passed. Wenn Sie an grpc.server ein futures.Executor von Ihnen implementiert übergeben, die nur einen Thread verwendet, um vierhundert (oder mehr) gleichzeitige Aufrufe an EventReporting.ReportEvent auszuführen, sollte Ihr Server die Art der Blockierung vermeiden, die Sie beschreiben.

1

Meiner Meinung nach ist gute einfache Implementierung async Grpc Server, der gleiche wie http auf aiohttp basiert.

import asyncio 
from concurrent import futures 
import functools 
import inspect 
import threading 

from grpc import _server 

def _loop_mgr(loop: asyncio.AbstractEventLoop): 

    asyncio.set_event_loop(loop) 
    loop.run_forever() 

    # If we reach here, the loop was stopped. 
    # We should gather any remaining tasks and finish them. 
    pending = asyncio.Task.all_tasks(loop=loop) 
    if pending: 
     loop.run_until_complete(asyncio.gather(*pending)) 


class AsyncioExecutor(futures.Executor): 

    def __init__(self, *, loop=None): 

     super().__init__() 
     self._shutdown = False 
     self._loop = loop or asyncio.get_event_loop() 
     self._thread = threading.Thread(target=_loop_mgr, args=(self._loop,), 
             daemon=True) 
     self._thread.start() 

    def submit(self, fn, *args, **kwargs): 

     if self._shutdown: 
      raise RuntimeError('Cannot schedule new futures after shutdown') 

     if not self._loop.is_running(): 
      raise RuntimeError("Loop must be started before any function can " 
           "be submitted") 

     if inspect.iscoroutinefunction(fn): 
      coro = fn(*args, **kwargs) 
      return asyncio.run_coroutine_threadsafe(coro, self._loop) 

     else: 
      func = functools.partial(fn, *args, **kwargs) 
      return self._loop.run_in_executor(None, func) 

    def shutdown(self, wait=True): 
     self._loop.stop() 
     self._shutdown = True 
     if wait: 
      self._thread.join() 


# --------------------------------------------------------------------------- # 


async def _call_behavior(rpc_event, state, behavior, argument, request_deserializer): 
    context = _server._Context(rpc_event, state, request_deserializer) 
    try: 
     return await behavior(argument, context), True 
    except Exception as e: # pylint: disable=broad-except 
     with state.condition: 
      if e not in state.rpc_errors: 
       details = 'Exception calling application: {}'.format(e) 
       _server.logging.exception(details) 
       _server._abort(state, rpc_event.operation_call, 
         _server.cygrpc.StatusCode.unknown, _server._common.encode(details)) 
     return None, False 

async def _take_response_from_response_iterator(rpc_event, state, response_iterator): 
    try: 
     return await response_iterator.__anext__(), True 
    except StopAsyncIteration: 
     return None, True 
    except Exception as e: # pylint: disable=broad-except 
     with state.condition: 
      if e not in state.rpc_errors: 
       details = 'Exception iterating responses: {}'.format(e) 
       _server.logging.exception(details) 
       _server._abort(state, rpc_event.operation_call, 
         _server.cygrpc.StatusCode.unknown, _server._common.encode(details)) 
     return None, False 

async def _unary_response_in_pool(rpc_event, state, behavior, argument_thunk, 
            request_deserializer, response_serializer): 
    argument = argument_thunk() 
    if argument is not None: 
     response, proceed = await _call_behavior(rpc_event, state, behavior, 
               argument, request_deserializer) 
     if proceed: 
      serialized_response = _server._serialize_response(
       rpc_event, state, response, response_serializer) 
      if serialized_response is not None: 
       _server._status(rpc_event, state, serialized_response) 

async def _stream_response_in_pool(rpc_event, state, behavior, argument_thunk, 
            request_deserializer, response_serializer): 
    argument = argument_thunk() 
    if argument is not None: 
     # Notice this calls the normal `_call_behavior` not the awaitable version. 
     response_iterator, proceed = _server._call_behavior(
      rpc_event, state, behavior, argument, request_deserializer) 
     if proceed: 
      while True: 
       response, proceed = await _take_response_from_response_iterator(
        rpc_event, state, response_iterator) 
       if proceed: 
        if response is None: 
         _server._status(rpc_event, state, None) 
         break 
        else: 
         serialized_response = _server._serialize_response(
          rpc_event, state, response, response_serializer) 
         print(response) 
         if serialized_response is not None: 
          print("Serialized Correctly") 
          proceed = _server._send_response(rpc_event, state, 
                serialized_response) 
          if not proceed: 
           break 
         else: 
          break 
       else: 
        break 

_server._unary_response_in_pool = _unary_response_in_pool 
_server._stream_response_in_pool = _stream_response_in_pool 


if __name__ == '__main__': 
    server = grpc.server(AsyncioExecutor()) 
    # Add Servicer and Start Server Here 

Link zur Vorlage:
https://gist.github.com/seglberg/0b4487b57b4fd425c56ad72aba9971be

+0

Ich reparierte Kommentar. Bitte entfernen Sie die negative Bewertung, weil ich denke, dass dies eine gute Umsetzung ist – Vetos