【问题标题】:Reactor - Delay Flux elements in case of processing errorsReactor - 在处理错误的情况下延迟通量元素
【发布时间】:2021-04-08 17:08:05
【问题描述】:

我有与this question 类似的问题,但我没有看到可接受的答案。我研究了一遍,没有得到满意的答案。

我有一个轮询量为“x”的反应式 Kafka 使用者(Spring Reactor),应用程序使用反应式 Web 客户端将轮询的消息推送到反应式端点。这里的问题是外部服务的超时执行可能不同,当我们看到很多故障时,我将不得不调整 Kafka 消费者以在断路器打开(或启动背压)时轮询更少的消息。目前的反应堆有没有办法自动

  1. 在断路器处于打开状态时做出反应,减少轮询量或减慢消耗。
  2. 在电路关闭时将轮询数量增加到之前的状态(如果它关闭,外部服务将按比例增加)。

我不想使用delayElementsdelayUntil,因为它们本质上大多是静态的,并且希望应用程序在运行时做出反应。如何配置这些端到端背压?当电路关闭、部分关闭和在应用配置中打开时,我会为消费者提供值。

【问题讨论】:

    标签: java reactive-programming project-reactor circuit-breaker reactive-kafka


    【解决方案1】:

    由于背压是基于消费者的缓慢,实现这一点的一种方法是将某些异常类型转换为延迟。为此,您可以使用onErrorResume,如下所示:

    long start = System.currentTimeMillis();
    
    Flux.range(1, 1000)
            .doOnNext(item -> System.out.println("Elpased " + (System.currentTimeMillis() - start) + " millis for item: " + item))
            .flatMap(item -> process(item).onErrorResume(this::slowDown), 5) // concurrency limit for demo
            .blockLast();
    
    System.out.println("Flow took " + (System.currentTimeMillis() - start) + " milliseconds.");
    
    private Mono<Integer> process(Integer item) {
        // simulate error for some items
        if (item >= 50 && item <= 100) {
            return Mono.error(new RuntimeException("Downstream failed."));
        }
    
        // normal processing
        return Mono.delay(Duration.ofMillis(10))
                .thenReturn(item);
    }
    
    private Mono<Integer> slowDown(Throwable e) {
        if (e instanceof RuntimeException) { // you could check for circuit breaker exception
            return Mono.delay(Duration.ofMillis(1000)).then(Mono.empty()); // delay to slow down
        }
    
        return Mono.empty(); // no delay for other errors
    }
    

    如果您检查此代码的输出,您会看到项目 50 和 100 之间有一些减速,但它在之前和之后以正常速度运行。

    请注意,我的示例没有使用 Kafka。由于您使用的是支持背压的 reactor-kafka 库,因此它的工作方式应该与这个虚拟示例相同。

    此外,由于 Flux 可能会同时处理项目,因此不会立即减速,它会在适当减速之前尝试处理一些额外的项目。

    【讨论】:

    • 感谢您的信息。 QQ - 在这种情况下,反应式 kafka 是否会减慢主题记录的消耗或将其缓冲在内存中并缓慢向下传递?我必须在这里指定背压策略吗?
    • 我对这个库不是很熟悉,但我坚信它会减慢消耗,否则它不会是被动的。 projectreactor.io/docs/kafka/release/reference/#back-pressure
    • 我用 Kafka 反应器消费者试过这个,但它不起作用。卡夫卡只是重新平衡并将消费者踢出群体。理想情况下,它应该调用 pause 方法,并且在下一个信号之前不应该消耗,对吧?你对此有什么想法吗?谢谢
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-10-09
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多