【发布时间】:2021-10-07 10:43:34
【问题描述】:
我目前正在开发一个 grpc 服务器,它将接收来自第一台服务器的流式 grpc 调用并将这些调用重定向到第二台服务器,并将来自第二台服务器的响应作为流重定向到第一个服务器。
我有 2 个原型文件
第一个文件:
syntax = "proto3";
package first.proto.pack;
service FirstProtoService {
rpc StreamingCall(stream RequestToFirstServer) returns (stream ResponseForFirstServer){}
}
message RequestToFirstServer {
oneof firstStreamingRequest {
int32 x = 1;
int32 y = 2;
}
}
message ResponseForFirstServer {
string someprocessedinformation = 1;
}
第二个文件:
syntax = "proto3";
package second.proto.pack;
service SecondProtoService {
rpc StreamingCall(stream RequestToSecondServer) returns (stream ResponseFromSecondServer){}
}
message RequestToSecondServer {
oneof secondStreamingRequest {
int32 processedX = 1;
int32 processedY = 2;
}
}
message ResponseFromSecondServer {
string computedInformation = 1;
}
第一个服务器知道第一个 proto 文件,但不知道第二个。
第二个服务器知道第二个 proto 文件,但不知道第一个。
中间服务器知道第一个和第二个原型。
需要编写一个服务器,将来自一台服务器的请求从一台服务器传输到另一台服务器
我开始用 Python 编写它。但是面临向第二台服务器发送大量请求的问题
我的服务中间实现在 Python 上的样子:
import grpc
import socket
import logging
from concurrent import futures
import first_pb2 as fasr
import first_pb2_grpc as fasr_srv
import second_pb2 as sasr
import second_pb2_grpc as sasr_srv
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip_address = s.getsockname()[0]
s.close()
_THREAD_CONCURRENCY = 24
channel = grpc.insecure_channel('localhost:50051')
client = sasr_srv.SecondProtoServiceStub(channel)
class FirstProtoService(fasr_srv.FirstProtoServiceServicer):
def StreamingCall(self, request_iterator: fasr.RequestToFirstServer, context):
try:
for request in request_iterator:
if request.x:
request = sasr.RequestToSecondServer(processedX=request.processedX)
if request.y:
request = sasr.RequestToSecondServer(processedY=request.processedY)
responses = client.StreamingCall(request)
yield responses
except StopIteration as error:
logging.error(f'Exception occurred {error}.', exc_info=True)
raise RuntimeError("Failed to receive request")
except Exception as e:
logging.error(f'-------Exception occurred: {e}.', exc_info=True)
def serve(host: str, port: str) -> None:
server = grpc.server(futures.ThreadPoolExecutor(max_workers=_THREAD_CONCURRENCY))
fasr_srv.add_FirstProtoServiceServicer_to_server(FirstProtoService(), server)
server.add_insecure_port(f'{host}:{port}')
server.start()
logging.info(f'Starting server on : {host}:{port}')
server.wait_for_termination()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
serve(ip_address, '7777')
在中间服务器端的第一个客户端发出请求后,我收到以下错误:
line 29, in StreamingCall
request = sasr.RequestToSecondServer(processedX=request.processedX)
TypeError: Parameter to MergeFrom() must be instance of same class: expected second.proto.pack.processedX got first.proto.pack.x
【问题讨论】:
标签: python protocol-buffers grpc proto