【问题标题】:gRPC services's Context CancellationListener is not fired when client cancels a service call当客户端取消服务调用时,gRPC 服务的 Context CancellationListener 不会被触发
【发布时间】:2018-09-12 15:45:19
【问题描述】:

我有一个流媒体服务,它可以无限期地从服务器流式传输到客户端,直到客户端取消。

在服务器端,我有一个线程使用来自数据库的数据填充 ehcache。

Ehcache 提供缓存事件的回调,即添加项目时,删除项目时等。我只关心在将元素放入缓存时通知客户端,所以当客户端连接到我的 gRPC 服务时,我在缓存中注册了一个notifyElementPut() 回调,它引用了连接的客户端StreamObserver

public class GrpcAwareCacheEventListener extends CacheEventListenerAdapter {


  private StreamObserver<FooUpdateResponse> responseObserver;

  public GrpcAwareCacheEventListener(
      StreamObserver<FooUpdateResponse> responseObserver) {
    this.responseObserver = responseObserver;
  }


  @Override
  public void notifyElementPut(Ehcache cache, Element element) throws CacheException {

    Foo foo = (Foo) element.getObjectValue();
    if (foo != null) {
      responseObserver.onNext(
          FooResponse.newBuilder().setFoo(foo).build());
    }
  }
}

我的流媒体 foo 服务如下:

    public void streamFooUpdates(Empty request,
              StreamObserver<FooResponse> responseObserver) {

            final CacheEventListener eventListener = new GrpcAwareCacheEventListener(responseObserver);
            fooCache.getCacheEventNotificationService().registerListener(eventListener);
            Context.current().withCancellation().addListener(new CancellationListener() {

              public void cancelled(Context context) {
    log.info("inside context cancelled callback");      
  fooCache.getCacheEventNotificationService().unregisterListener(eventListener);
              }

            }, ForkJoinPool.commonPool());



          }

这一切都很好,只要客户端连接,就会通知客户端所有 foo 更新。

但是,在客户端断开连接或显式取消调用后,我希望服务器的 Context 的取消侦听器会触发,从而取消注册缓存中的回调。

不管客户端是关闭通道,还是显式取消调用,情况都不是这样。 (我希望服务器端取消的上下文会为这两个事件触发)。我想知道我在客户端的取消语义是否不正确,这是我的客户端代码,取自一个测试用例:

Channel channel = ManagedChannelBuilder.forAddress("localhost", 25001)
        .usePlaintext().build();

    FooServiceGrpc.FooService stub = FooServiceGrpc
        .newStub(channel);


    ClientCallStreamObserver<FooResponse> cancellableObserver = new ClientCallStreamObserver<FooResponse>(){
      public void onNext(FooResponse response) {
        log.info("received foo: {}", response.getFoo());
      }

      public void onError(Throwable throwable) {

      }

      public void onCompleted() {

      }

      public boolean isReady() {
        return false;
      }

      public void setOnReadyHandler(Runnable runnable) {

      }

      public void disableAutoInboundFlowControl() {

      }

      public void request(int i) {

      }

      public void setMessageCompression(boolean b) {

      }

      public void cancel(@Nullable String s, @Nullable Throwable throwable) {

      }
    };

    stub.streamFooUpdates(Empty.newBuilder().build(), cancellableObserver);
    Thread.sleep(10000); // sleep 10 seconds while messages are received.
    cancellableObserver.cancel("cancelling from test", null); //explicit cancel
    ((ManagedChannel) chan).shutdown().awaitTermination(5, TimeUnit.SECONDS); //shutdown as well, for good measure.

    Thread.sleep(7000); //channel should be shutdown by now.

  }

我想知道为什么服务器没有触发“上下文取消”回调。

谢谢!

【问题讨论】:

    标签: grpc grpc-java


    【解决方案1】:

    您没有正确取消客户端呼叫。 stub.streamFooUpdates() 的第二个参数上的 StreamObserver 是您的回调。你不应该在 StreamObserver 上打电话。

    有两种方法可以从客户端取消呼叫。

    选项 1:传递一个ClientResponseObserver 作为第二个参数,实现beforeStart(),它给你一个ClientCallStreamObserver,你可以在上面调用cancel()

    选项 2:在 CancellableContext 内运行 stub.streamFooUpdates(),并取消 Context 以取消调用。请注意,CancellableContext 必须始终被取消,这就是 finally 块的用途。

    CancellableContext withCancellation = Context.current().withCancellation();
    try {
      withCancellation.run(() -> {
          stub.streamFooUpdates(...);
          Thread.sleep(10000);
          withCancellation.cancel(null);
      });
    } finally {
      withCancellation.cancel(null);
    }
    

    【讨论】:

      猜你喜欢
      • 2017-09-20
      • 2020-11-20
      • 2020-08-14
      • 2021-04-10
      • 2019-07-02
      • 1970-01-01
      • 1970-01-01
      • 2022-01-13
      • 2019-03-29
      相关资源
      最近更新 更多