【问题标题】:Cancelling the stream with status Status{code=INTERNAL, description=Too many responses, cause=null}取消状态为 Status{code=INTERNAL, description=Too many response, cause=null} 的流
【发布时间】:2022-10-19 10:05:19
【问题描述】:

我实现了一个简单的api。

    rpc test(google.protobuf.Empty) returns (MyString);
    
    message MyString {
      string value = 1;
    }

这是我的实现:

    public void test(Empty request, StreamObserver<MyString> responseObserver) {
        for (int i = 1; i <= 5; i++) {
            MyString myString = Mono.just(MyString.newBuilder().setValue("Value "+i).build())
                .delayElement(Duration.ofSeconds(1))
                .log()
                .block();
            responseObserver.onNext(myString);
        }
        responseObserver.onCompleted();
    }

我使用 BloomRPC 来调用 API。

这些是日志:

2022-07-19 10:57:19.244  INFO 9988 --- [ault-executor-4] reactor.Mono.DelayElement.21             : onSubscribe([Fuseable] MonoDelayElement.DelayElementSubscriber)
2022-07-19 10:57:19.244  INFO 9988 --- [ault-executor-4] reactor.Mono.DelayElement.21             : request(unbounded)
2022-07-19 10:57:20.250  INFO 9988 --- [     parallel-5] reactor.Mono.DelayElement.21             : onNext(value: "Value 1"
)
2022-07-19 10:57:20.251  INFO 9988 --- [     parallel-5] reactor.Mono.DelayElement.21             : onComplete()
2022-07-19 10:57:20.253  INFO 9988 --- [ault-executor-4] reactor.Mono.DelayElement.22             : onSubscribe([Fuseable] MonoDelayElement.DelayElementSubscriber)
2022-07-19 10:57:20.253  INFO 9988 --- [ault-executor-4] reactor.Mono.DelayElement.22             : request(unbounded)
2022-07-19 10:57:21.260  INFO 9988 --- [     parallel-6] reactor.Mono.DelayElement.22             : onNext(value: "Value 2"
)
2022-07-19 10:57:21.260  INFO 9988 --- [     parallel-6] reactor.Mono.DelayElement.22             : onComplete()
2022-07-19 10:57:21.260  WARN 9988 --- [ault-executor-4] io.grpc.internal.ServerCallImpl          : Cancelling the stream with status Status{code=INTERNAL, description=Too many responses, cause=null}
2022-07-19 10:57:21.260  INFO 9988 --- [ault-executor-4] reactor.Mono.DelayElement.23             : onSubscribe([Fuseable] MonoDelayElement.DelayElementSubscriber)
2022-07-19 10:57:21.260  INFO 9988 --- [ault-executor-4] reactor.Mono.DelayElement.23             : request(unbounded)
2022-07-19 10:57:22.263  INFO 9988 --- [     parallel-7] reactor.Mono.DelayElement.23             : onNext(value: "Value 3"
)
2022-07-19 10:57:22.263  INFO 9988 --- [     parallel-7] reactor.Mono.DelayElement.23             : onComplete()
2022-07-19 10:57:22.263  WARN 9988 --- [ault-executor-4] io.grpc.internal.ServerCallImpl          : Cancelling the stream with status Status{code=INTERNAL, description=Too many responses, cause=null}
2022-07-19 10:57:22.263  INFO 9988 --- [ault-executor-4] reactor.Mono.DelayElement.24             : onSubscribe([Fuseable] MonoDelayElement.DelayElementSubscriber)
2022-07-19 10:57:22.263  INFO 9988 --- [ault-executor-4] reactor.Mono.DelayElement.24             : request(unbounded)
2022-07-19 10:57:23.267  INFO 9988 --- [     parallel-8] reactor.Mono.DelayElement.24             : onNext(value: "Value 4"
)
2022-07-19 10:57:23.267  INFO 9988 --- [     parallel-8] reactor.Mono.DelayElement.24             : onComplete()
2022-07-19 10:57:23.267  WARN 9988 --- [ault-executor-4] io.grpc.internal.ServerCallImpl          : Cancelling the stream with status Status{code=INTERNAL, description=Too many responses, cause=null}
2022-07-19 10:57:23.267  INFO 9988 --- [ault-executor-4] reactor.Mono.DelayElement.25             : onSubscribe([Fuseable] MonoDelayElement.DelayElementSubscriber)
2022-07-19 10:57:23.267  INFO 9988 --- [ault-executor-4] reactor.Mono.DelayElement.25             : request(unbounded)
2022-07-19 10:57:24.271  INFO 9988 --- [     parallel-1] reactor.Mono.DelayElement.25             : onNext(value: "Value 5"
)
2022-07-19 10:57:24.271  INFO 9988 --- [     parallel-1] reactor.Mono.DelayElement.25             : onComplete()
2022-07-19 10:57:24.271  WARN 9988 --- [ault-executor-4] io.grpc.internal.ServerCallImpl          : Cancelling the stream with status Status{code=INTERNAL, description=Too many responses, cause=null}

预期:客户端发送一个请求,发送一个包含 5 个单个元素的流,每个元素之间的间隔为 2 秒。

结果:BloomRPC 处于无限加载状态,即使在调用 onComplete 之后也是如此。调用 onComplete 时,我收到错误 Cancelling the stream with status Status{code=INTERNAL, description=Too many responses, cause=null}

【问题讨论】:

    标签: grpc-java


    【解决方案1】:

    尝试按如下方式放置原型:

    rpc test(google.protobuf.Empty) returns (stream MyString);
    

    【讨论】:

      猜你喜欢
      • 2022-06-19
      • 1970-01-01
      • 1970-01-01
      • 2011-06-08
      • 1970-01-01
      • 2015-10-20
      • 1970-01-01
      • 2011-08-05
      • 2020-04-24
      相关资源
      最近更新 更多