【问题标题】:Debounce similar requests with reactor-grpc使用 reactor-grpc 去抖动类似的请求
【发布时间】:2021-04-07 05:13:57
【问题描述】:

为了卸载我的数据库,我想在 gRPC 服务中消除类似请求(例如,它们共享请求的相同 id 部分),该服务服务于对延迟没有严格要求的 API .我知道如何使用 vanilla gRPC 做到这一点,但我确定我可以使用哪种类型的 Mono API。

直接调用db的API如下:

public Mono<Blob> getBlob(
      Mono<MyRequest> request) {
    return request.
       map(reader.getBlob(request.getId()));

我觉得我应该使用delaySubscription,但似乎groupBy 不是gRPC 服务处理的Mono API 的一部分。

【问题讨论】:

    标签: java grpc project-reactor


    【解决方案1】:

    不使用反应式操作符检测重复是完全可以的:

    // Guava cache as example.
    private final Cache<String, Boolean> duplicatesCache = CacheBuilder.newBuilder()
        .expireAfterWrite(Duration.ofMinutes(1))
        .build();
    
    public Mono<Blob> getBlob(Mono<MyRequest> request) {
        return request.map(req -> {
            var id = req.getId();
            var cacheKey = extractSharedIdPart(id);
            if (duplicatesCache.getIfPresent(cacheKey) == null) {
                duplicatesCache.put(cacheKey, true);
                return reader.getBlob(id);
            } else {
                return POISON_PILL; // Any object that represents debounce hit.
                                    // Or use flatMap() + Mono.error() instead.
            }
        });
    }
    

    如果出于某种原因您绝对想使用反应式运算符,那么首先您需要将传入的 grpc 请求转换为Flux。这可以使用第三方库(如salesforce/reactive-grpc)或直接实现:

    
    class MyService extends MyServiceGrpc.MyServiceImplBase {
    
        private FluxSink<Tuple2<MyRequest, StreamObserver<MyResponse>>> sink;
    
        private Flux<Tuple2<MyRequest, StreamObserver<MyResponse>>> flux;
    
        MyService() {
            flux = Flux.create(sink -> this.sink = sink);
        }
    
    
        @Override
        public void handleRequest(MyRequest request, StreamObserver<MyResponse> responseObserver) {
            sink.next(Tuples.of(request, responseObserver));
        }
    
        Flux<Tuple2<MyRequest, StreamObserver<MyResponse>>> getFlux() {
            return flux;
        }
    }
    
    

    接下来你订阅这个flux并使用你喜欢的操作符:

    public static void main(String[] args) {
        var mySvc = new MyService();
        var server = ServerBuilder.forPort(DEFAULT_PORT)
            .addService(mySvc)
            .build();
        server.start();
        mySvc.getFlux()
            .groupBy(...your grouping logic...)
            .flatMap(group -> {
                return group.sampleTimeout(...your debounce logic...);
            })
            .flatMap(...your handling logic...)
            .subscribe();
    }
    

    但要小心使用 groupBy 与许多不同的共享 id 部分:

    为了 groupBy 正常工作,需要在下游排出和消耗这些组。值得注意的是,当标准产生大量组时,如果这些组没有在下游适当地消耗(例如,由于 flatMap 的 maxConcurrency 参数设置得太低),可能会导致挂起。

    【讨论】:

    • 不幸的是,您建议的缓存没有我想要的行为:我真的需要从数据库读取的请求提供服务第一个请求之后。我会尝试通量的想法谢谢。我不确定为什么您在使用通量的示例中没有使用相同的生成代码(grpc 处理程序的参数是“MyRequest request, StreamObserver responseObserver”而不是“Mono”。
    猜你喜欢
    • 2020-01-26
    • 1970-01-01
    • 2018-06-03
    • 2021-07-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-08-29
    相关资源
    最近更新 更多