【发布时间】:2022-10-19 10:05:19
【问题描述】:
我实现了一个简单的api。
rpc test(google.protobuf.Empty) returns (MyString);
message MyString {
string value = 1;
}
这是我的实现:
public void test(Empty request, StreamObserver<MyString> responseObserver) {
for (int i = 1; i <= 5; i++) {
MyString myString = Mono.just(MyString.newBuilder().setValue("Value "+i).build())
.delayElement(Duration.ofSeconds(1))
.log()
.block();
responseObserver.onNext(myString);
}
responseObserver.onCompleted();
}
我使用 BloomRPC 来调用 API。
这些是日志:
2022-07-19 10:57:19.244 INFO 9988 --- [ault-executor-4] reactor.Mono.DelayElement.21 : onSubscribe([Fuseable] MonoDelayElement.DelayElementSubscriber)
2022-07-19 10:57:19.244 INFO 9988 --- [ault-executor-4] reactor.Mono.DelayElement.21 : request(unbounded)
2022-07-19 10:57:20.250 INFO 9988 --- [ parallel-5] reactor.Mono.DelayElement.21 : onNext(value: "Value 1"
)
2022-07-19 10:57:20.251 INFO 9988 --- [ parallel-5] reactor.Mono.DelayElement.21 : onComplete()
2022-07-19 10:57:20.253 INFO 9988 --- [ault-executor-4] reactor.Mono.DelayElement.22 : onSubscribe([Fuseable] MonoDelayElement.DelayElementSubscriber)
2022-07-19 10:57:20.253 INFO 9988 --- [ault-executor-4] reactor.Mono.DelayElement.22 : request(unbounded)
2022-07-19 10:57:21.260 INFO 9988 --- [ parallel-6] reactor.Mono.DelayElement.22 : onNext(value: "Value 2"
)
2022-07-19 10:57:21.260 INFO 9988 --- [ parallel-6] reactor.Mono.DelayElement.22 : onComplete()
2022-07-19 10:57:21.260 WARN 9988 --- [ault-executor-4] io.grpc.internal.ServerCallImpl : Cancelling the stream with status Status{code=INTERNAL, description=Too many responses, cause=null}
2022-07-19 10:57:21.260 INFO 9988 --- [ault-executor-4] reactor.Mono.DelayElement.23 : onSubscribe([Fuseable] MonoDelayElement.DelayElementSubscriber)
2022-07-19 10:57:21.260 INFO 9988 --- [ault-executor-4] reactor.Mono.DelayElement.23 : request(unbounded)
2022-07-19 10:57:22.263 INFO 9988 --- [ parallel-7] reactor.Mono.DelayElement.23 : onNext(value: "Value 3"
)
2022-07-19 10:57:22.263 INFO 9988 --- [ parallel-7] reactor.Mono.DelayElement.23 : onComplete()
2022-07-19 10:57:22.263 WARN 9988 --- [ault-executor-4] io.grpc.internal.ServerCallImpl : Cancelling the stream with status Status{code=INTERNAL, description=Too many responses, cause=null}
2022-07-19 10:57:22.263 INFO 9988 --- [ault-executor-4] reactor.Mono.DelayElement.24 : onSubscribe([Fuseable] MonoDelayElement.DelayElementSubscriber)
2022-07-19 10:57:22.263 INFO 9988 --- [ault-executor-4] reactor.Mono.DelayElement.24 : request(unbounded)
2022-07-19 10:57:23.267 INFO 9988 --- [ parallel-8] reactor.Mono.DelayElement.24 : onNext(value: "Value 4"
)
2022-07-19 10:57:23.267 INFO 9988 --- [ parallel-8] reactor.Mono.DelayElement.24 : onComplete()
2022-07-19 10:57:23.267 WARN 9988 --- [ault-executor-4] io.grpc.internal.ServerCallImpl : Cancelling the stream with status Status{code=INTERNAL, description=Too many responses, cause=null}
2022-07-19 10:57:23.267 INFO 9988 --- [ault-executor-4] reactor.Mono.DelayElement.25 : onSubscribe([Fuseable] MonoDelayElement.DelayElementSubscriber)
2022-07-19 10:57:23.267 INFO 9988 --- [ault-executor-4] reactor.Mono.DelayElement.25 : request(unbounded)
2022-07-19 10:57:24.271 INFO 9988 --- [ parallel-1] reactor.Mono.DelayElement.25 : onNext(value: "Value 5"
)
2022-07-19 10:57:24.271 INFO 9988 --- [ parallel-1] reactor.Mono.DelayElement.25 : onComplete()
2022-07-19 10:57:24.271 WARN 9988 --- [ault-executor-4] io.grpc.internal.ServerCallImpl : Cancelling the stream with status Status{code=INTERNAL, description=Too many responses, cause=null}
预期:客户端发送一个请求,发送一个包含 5 个单个元素的流,每个元素之间的间隔为 2 秒。
结果:BloomRPC 处于无限加载状态,即使在调用 onComplete 之后也是如此。调用 onComplete 时,我收到错误 Cancelling the stream with status Status{code=INTERNAL, description=Too many responses, cause=null}
【问题讨论】:
标签: grpc-java