【问题标题】:gRPC stream become canceledgRPC 流被取消
【发布时间】:2018-07-27 08:51:46
【问题描述】:

我正在尝试通过 gRPC 创建流式 API,但我的 StreamObserver 由于某种奇怪的原因被取消了。 这是我的.proto 声明:

service Service {
    rpc connect (ConnectionRequest) returns (stream StreamResponse) {}

    rpc act (stream ActRequest) returns (ActResponse) {}
}

这个想法是用户将connect,因此StreamResponse将为每个用户保存,然后在每个actStreamResponse将收到更新。这是Service的java实现

class ServiceImpl extends ServiceGrpc.ServiceImplBase {
    ............
    @Override
    public void connect(ConnectionRequest request, StreamObserver<StreamResponse> responseObserver) {
        observers.add(responseObserver);
    }

    @Override
    public StreamObserver<ActRequest> act(StreamObserver<ActResponse> responseObserver) {
        return return new StreamObserver<ActRequest>() {
            @Override
            public void onNext(ActRequest actRequest) {
                StreamResponse streamResponse = StreamResponse.newBuilder().build();
                observers.forEach(o -> o.onNext(streamResponse));
                actRequest..onCompleted();
            }
            ...............
        };
    }
    ..................
}

这里是客户端connect方法:

ServiceGrpc.ServiceStub stub = ServiceGrpc.newStub(channel);
new StreamObserver<PlayerResponse>() {
    ........
    @Override
    public void onNext(StreamResponse streamResponse) {
          logger.info("Act: " streamResponse.getData());
    }
    ........
}
ConnectionRequest request = ConnectionRequest.newBuilder().build();
stub.connect(request, streamObserver);

和客户端act方法:

StreamObserver<ActResponse> observer = new StreamObserver<ActResponse>() {
    @Override
    public void onNext(ActResponse actResponse) {
        logger.info("Status: " + actResponse.getSuccess());
    }
    .........
};
StreamObserver<ActRequest> act = stub.act(observer);
act.onNext(MoveRequest.newBuilder().build());
act.onCompleted();

当我启动客户端和服务器时,客户端可以调用connect。但是方法act只能在第一次调用。当客户端第二次调用act 时,我收到以下异常:

io.grpc.StatusRuntimeException: CANCELLED: call already cancelled
    at io.grpc.Status.asRuntimeException(Status.java:517)
    at io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:335)

在调试模式下我可以看到StreamObserver&lt;StreamResponse&gt; responseObservercanceled=true

【问题讨论】:

  • 您确定要在服务器的onNext() 方法中调用actRequest..onCompleted(); 吗?那里好像有问题。
  • @Ran 我想是的。 actRequest 不是一个动作流,它只是一个事件
  • 您正在调用 .onCompleted() 不确定是哪个对象。然后当下一个客户来电时,您正在调用o.onNext(streamResponse)。感觉这就是问题所在。
  • @Ran 我不会为StreamResponse 打电话给onCompleted,只为ActRequestActResponse 打电话。但取消成为StreamObserver&lt;StreamResponse&gt;。我什至尝试不为这两个对象调用onCompleted,但结果是一样的
  • act.onCompleted(); 在客户端代码中。也许这正在关闭连接。

标签: java protocol-buffers grpc grpc-java


【解决方案1】:

我发现了为什么会出现这个错误。在客户端,我使用Vaadin 来构建用户界面。所以在StreamObserver&lt;ActResponse&gt;#onNext 上,我正在调用一个引发异常的 Vaadin 方法。

我在开发过程中犯的错误是我将onError 留空,因为它是一个 PoC。但是这个错误花费了我几个小时的调试时间。 这是一个愚蠢的错误,但我会放在这里,也许它可以为某人节省一些时间。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-12-04
    • 2018-05-29
    • 2019-04-08
    • 1970-01-01
    • 2017-09-20
    相关资源
    最近更新 更多