【发布时间】:2018-07-27 08:51:46
【问题描述】:
我正在尝试通过 gRPC 创建流式 API,但我的 StreamObserver 由于某种奇怪的原因被取消了。
这是我的.proto 声明:
service Service {
rpc connect (ConnectionRequest) returns (stream StreamResponse) {}
rpc act (stream ActRequest) returns (ActResponse) {}
}
这个想法是用户将connect,因此StreamResponse将为每个用户保存,然后在每个act上StreamResponse将收到更新。这是Service的java实现
class ServiceImpl extends ServiceGrpc.ServiceImplBase {
............
@Override
public void connect(ConnectionRequest request, StreamObserver<StreamResponse> responseObserver) {
observers.add(responseObserver);
}
@Override
public StreamObserver<ActRequest> act(StreamObserver<ActResponse> responseObserver) {
return return new StreamObserver<ActRequest>() {
@Override
public void onNext(ActRequest actRequest) {
StreamResponse streamResponse = StreamResponse.newBuilder().build();
observers.forEach(o -> o.onNext(streamResponse));
actRequest..onCompleted();
}
...............
};
}
..................
}
这里是客户端connect方法:
ServiceGrpc.ServiceStub stub = ServiceGrpc.newStub(channel);
new StreamObserver<PlayerResponse>() {
........
@Override
public void onNext(StreamResponse streamResponse) {
logger.info("Act: " streamResponse.getData());
}
........
}
ConnectionRequest request = ConnectionRequest.newBuilder().build();
stub.connect(request, streamObserver);
和客户端act方法:
StreamObserver<ActResponse> observer = new StreamObserver<ActResponse>() {
@Override
public void onNext(ActResponse actResponse) {
logger.info("Status: " + actResponse.getSuccess());
}
.........
};
StreamObserver<ActRequest> act = stub.act(observer);
act.onNext(MoveRequest.newBuilder().build());
act.onCompleted();
当我启动客户端和服务器时,客户端可以调用connect。但是方法act只能在第一次调用。当客户端第二次调用act 时,我收到以下异常:
io.grpc.StatusRuntimeException: CANCELLED: call already cancelled
at io.grpc.Status.asRuntimeException(Status.java:517)
at io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:335)
在调试模式下我可以看到StreamObserver<StreamResponse> responseObserver 是canceled=true。
【问题讨论】:
-
您确定要在服务器的
onNext()方法中调用actRequest..onCompleted();吗?那里好像有问题。 -
@Ran 我想是的。
actRequest不是一个动作流,它只是一个事件 -
您正在调用
.onCompleted()不确定是哪个对象。然后当下一个客户来电时,您正在调用o.onNext(streamResponse)。感觉这就是问题所在。 -
@Ran 我不会为
StreamResponse打电话给onCompleted,只为ActRequest和ActResponse打电话。但取消成为StreamObserver<StreamResponse>。我什至尝试不为这两个对象调用onCompleted,但结果是一样的 -
act.onCompleted();在客户端代码中。也许这正在关闭连接。
标签: java protocol-buffers grpc grpc-java