【问题标题】:How to handle a bidirectional grpc stream asynchronously如何异步处理双向 grpc 流
【发布时间】:2021-03-29 12:15:35
【问题描述】:

我有一个游戏或任何远程用户界面,它带有一个服务器和多个应该通过网络进行通信的客户端。 客户端和服务器都应该能够异步发送更新

这似乎是一个非常自然的服务定义,让我们用 grpc 管理会话。

syntax = "proto3";

package mygame;

service Game {
    rpc participate(stream ClientRequest) returns (ServerResponse);
}

message ClientRequest {
    // Fields for the initial request and further updates
}

message ServerResponse {
    // Game updates
}

实现客户端是微不足道的(尽管下面的代码显然是不完整和简化的)。

class Client:
    def __init__(self):
        self.channel = grpc.insecure_channel("localhost:50051")
        self.stub = game_pb2_grpc.GameStub(channel)
        self.output_queue = queue.Queue()

    def output_iter(self):
        while True:
            client_output_msg = self.output_queue.get()
            self.output_queue.task_done()
            yield client_output_msg

    def do_work(self):
        for response in self.stub.participate(self.output_iter()):
            print(response)  # handle update


with grpc.insecure_channel("localhost:50051") as channel:
    client = Client()
    client.do_work()

在没有阻塞的情况下实现服务器似乎很困难。

class Game(game_pb2_grpc.GameServicer):
    def __init__(self):
        self.pending_events = queue.Queue()

    def participate(self, request_iter, context):
        for client_update in request_iter:
            print(client_update)
            # !!!
            # The next bit won't happen if the client has no updates
            # !!!
            try:
                while True:
                    server_update = self.pending_events.get_nowait()
                    yield server_update
            except queue.Empty:
                pass

server = grpc.server(ThreadPoolExecutor(max_workers=100))
game_pb2_grpc.add_GameServicer_to_server(Game(), server)
server.add_insecure_port("[::]:50051")
server.start()
server.wait_for_termination()

如代码中所述,如果客户端不不断发送请求,它将不会收到更新。 也许异步方法会更好,这也可能解决此设计中的其他问题。

PS:这个问题已经用 go here 中的 grpc 解决了,但是我不知道如何将它翻译成 python 的 grpc 实现。

如果有任何帮助,我将非常高兴!

【问题讨论】:

    标签: python python-3.x network-programming grpc grpc-python


    【解决方案1】:

    我终于可以使用python asynio api 让它工作了。 基本思想是使用asyncio.create_task将读写解耦成两个协程。 对于任何感兴趣的人,这里有一个解决方案。

    class Game(game_pb2_grpc.GameServicer):
        async def read_client_requests(self, request_iter):
            async for client_update in request_iter:
                print("Recieved message from client:", client_update, end="")
    
        async def write_server_responses(self, context):
            for i in range(15):
                await context.write(game_pb2.ServerResponse(dummy_value=str(i)))
                await asyncio.sleep(0.5)
    
        async def participate(self, request_iter, context):
            read_task = asyncio.create_task(self.read_client_requests(request_iter))
            write_task = asyncio.create_task(self.write_server_responses(context))
    
            await read_task
            await write_task
    
    
    async def serve():
        server = grpc.aio.server()
        game_pb2_grpc.add_GameServicer_to_server(Game(), server)
        server.add_insecure_port("[::]:50051")
        await server.start()
        await server.wait_for_termination()
    
    
    if __name__ == "__main__":
        asyncio.run(serve())
    

    请注意,除了 write 协程,yield 也足够了。

    【讨论】:

      猜你喜欢
      • 2019-07-28
      • 2022-10-14
      • 2019-04-08
      • 2020-09-30
      • 1970-01-01
      • 1970-01-01
      • 2020-03-08
      • 1970-01-01
      • 2015-11-18
      相关资源
      最近更新 更多