【问题标题】:How to implement a async grpc python server?如何实现异步 grpc python 服务器?
【发布时间】:2016-11-18 03:39:27
【问题描述】:

我需要为每个 GRPC 请求调用一个 celery 任务,并返回结果。 在默认的 GRPC 实现中,每个请求都在来自线程池的单独线程中处理。

在我的例子中,服务器应该每秒以批处理模式处理约 400 个请求。所以一个请求可能因为批处理需要等待 1 秒才能得到结果,这意味着线程池的大小必须大于 400 才能避​​免阻塞。

这可以异步完成吗? 非常感谢。

class EventReporting(ss_pb2.BetaEventReportingServicer, ss_pb2.BetaDeviceMgtServicer):
  def ReportEvent(self, request, context):
    res = tasks.add.delay(1,2)
    result = res.get() ->here i have to block
    return ss_pb2.GeneralReply(message='Hello, %s!' % result.message)

【问题讨论】:

    标签: python grpc


    【解决方案1】:

    如果您对res.get 的调用可以异步完成(如果使用async 关键字定义),则可以异步完成。

    While grpc.server says it requires a futures.ThreadPoolExecutor, it will actually work with any futures.Executor that calls the behaviors submitted to it on some thread other than the one on which they were passed。如果您将一个由您实现的futures.Executor 传递给grpc.server,该futures.Executor 只使用一个线程对EventReporting.ReportEvent 执行四百(或更多)个并发调用,您的服务器应该避免您描述的那种阻塞。

    【讨论】:

      【解决方案2】:

      在我看来是很好的简单实现异步grpc服务器,就像基于aiohttp的http。

      import asyncio
      from concurrent import futures
      import functools
      import inspect
      import threading
      
      from grpc import _server
      
      def _loop_mgr(loop: asyncio.AbstractEventLoop):
      
          asyncio.set_event_loop(loop)
          loop.run_forever()
      
          # If we reach here, the loop was stopped.
          # We should gather any remaining tasks and finish them.
          pending = asyncio.Task.all_tasks(loop=loop)
          if pending:
              loop.run_until_complete(asyncio.gather(*pending))
      
      
      class AsyncioExecutor(futures.Executor):
      
          def __init__(self, *, loop=None):
      
              super().__init__()
              self._shutdown = False
              self._loop = loop or asyncio.get_event_loop()
              self._thread = threading.Thread(target=_loop_mgr, args=(self._loop,),
                                              daemon=True)
              self._thread.start()
      
          def submit(self, fn, *args, **kwargs):
      
              if self._shutdown:
                  raise RuntimeError('Cannot schedule new futures after shutdown')
      
              if not self._loop.is_running():
                  raise RuntimeError("Loop must be started before any function can "
                                     "be submitted")
      
              if inspect.iscoroutinefunction(fn):
                  coro = fn(*args, **kwargs)
                  return asyncio.run_coroutine_threadsafe(coro, self._loop)
      
              else:
                  func = functools.partial(fn, *args, **kwargs)
                  return self._loop.run_in_executor(None, func)
      
          def shutdown(self, wait=True):
              self._loop.stop()
              self._shutdown = True
              if wait:
                  self._thread.join()
      
      
      # --------------------------------------------------------------------------- #
      
      
      async def _call_behavior(rpc_event, state, behavior, argument, request_deserializer):
          context = _server._Context(rpc_event, state, request_deserializer)
          try:
              return await behavior(argument, context), True
          except Exception as e:  # pylint: disable=broad-except
              with state.condition:
                  if e not in state.rpc_errors:
                      details = 'Exception calling application: {}'.format(e)
                      _server.logging.exception(details)
                      _server._abort(state, rpc_event.operation_call,
                             _server.cygrpc.StatusCode.unknown, _server._common.encode(details))
              return None, False
      
      async def _take_response_from_response_iterator(rpc_event, state, response_iterator):
          try:
              return await response_iterator.__anext__(), True
          except StopAsyncIteration:
              return None, True
          except Exception as e:  # pylint: disable=broad-except
              with state.condition:
                  if e not in state.rpc_errors:
                      details = 'Exception iterating responses: {}'.format(e)
                      _server.logging.exception(details)
                      _server._abort(state, rpc_event.operation_call,
                             _server.cygrpc.StatusCode.unknown, _server._common.encode(details))
              return None, False
      
      async def _unary_response_in_pool(rpc_event, state, behavior, argument_thunk,
                                        request_deserializer, response_serializer):
          argument = argument_thunk()
          if argument is not None:
              response, proceed = await _call_behavior(rpc_event, state, behavior,
                                                       argument, request_deserializer)
              if proceed:
                  serialized_response = _server._serialize_response(
                      rpc_event, state, response, response_serializer)
                  if serialized_response is not None:
                      _server._status(rpc_event, state, serialized_response)
      
      async def _stream_response_in_pool(rpc_event, state, behavior, argument_thunk,
                                         request_deserializer, response_serializer):
          argument = argument_thunk()
          if argument is not None:
              # Notice this calls the normal `_call_behavior` not the awaitable version.
              response_iterator, proceed = _server._call_behavior(
                  rpc_event, state, behavior, argument, request_deserializer)
              if proceed:
                  while True:
                      response, proceed = await _take_response_from_response_iterator(
                          rpc_event, state, response_iterator)
                      if proceed:
                          if response is None:
                              _server._status(rpc_event, state, None)
                              break
                          else:
                              serialized_response = _server._serialize_response(
                                  rpc_event, state, response, response_serializer)
                              print(response)
                              if serialized_response is not None:
                                  print("Serialized Correctly")
                                  proceed = _server._send_response(rpc_event, state,
                                                           serialized_response)
                                  if not proceed:
                                      break
                              else:
                                  break
                      else:
                          break
      
      _server._unary_response_in_pool = _unary_response_in_pool
      _server._stream_response_in_pool = _stream_response_in_pool
      
      
      if __name__ == '__main__':
          server = grpc.server(AsyncioExecutor())
          # Add Servicer and Start Server Here
      

      原文链接:
      https://gist.github.com/seglberg/0b4487b57b4fd425c56ad72aba9971be

      【讨论】:

      • 我修正了评论。请去掉负面评价,因为我认为这是一个很好的实现
      【解决方案3】:

      正如@Michael 在评论中指出的那样,从 1.32 版开始,gRPC 现在在其Python API 中支持 asyncio。如果您使用的是早期版本,您仍然可以通过实验性 API 使用 asyncio API:from grpc.experimental import aio。 asyncio hello world example 也已添加到 gRPC 存储库中。以下代码是示例服务器的副本:

      import logging                                                                  
      import asyncio                                                                  
      from grpc import aio                                                            
                                                                                      
      import helloworld_pb2                                                           
      import helloworld_pb2_grpc                                                      
                                                                                      
                                                                                      
      class Greeter(helloworld_pb2_grpc.GreeterServicer):                             
                                                                                      
          async def SayHello(self, request, context):                                 
              return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name)    
                                                                                      
                                                                                      
      async def serve():                                                              
          server = aio.server()                                                       
          helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)        
          listen_addr = '[::]:50051'                                                  
          server.add_insecure_port(listen_addr)                                       
          logging.info("Starting server on %s", listen_addr)                          
          await server.start()                                                        
          await server.wait_for_termination()                                         
                                                                                      
                                                                                      
      if __name__ == '__main__':                                                      
          logging.basicConfig(level=logging.INFO)                                     
          asyncio.run(serve())
      

      有关如何实现客户端,请参阅my other answer

      【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-01-16
      • 1970-01-01
      • 2021-02-22
      • 1970-01-01
      • 2017-06-03
      • 1970-01-01
      • 2021-06-01
      • 2018-01-11
      相关资源
      最近更新 更多