【问题标题】:Automatic rate adjustment in ReactorReactor 中的自动速率调整
【发布时间】:2020-11-07 21:54:41
【问题描述】:

TL;DR;

有没有办法根据下游健康状况自动调整 Project Reactor 中元素之间的延迟?

更多详情

我有一个应用程序,它从 Kafka 主题读取记录,为每个记录发送一个 HTTP 请求,并将结果写入另一个 Kafka 主题。从/向 Kafka 读取和写入快速且容易,但第三方 HTTP 服务很容易被淹没,所以我使用 delayElements() 和属性文件中的值,这意味着该值在应用程序运行时不会改变。这是一个代码示例:

kafkaReceiver.receiveAutoAck()
            .concatMap(identity())
            .delayElements(ofMillis(delayElement))
            .flatMap(message -> recordProcessingFunction.process(message.value()), messageRate)
            .onErrorContinue(handleError())
            .map(this::getSenderRecord)
            .flatMap(kafkaSender::send)

但是,第三方服务的超时执行可能会有所不同,我希望能够相应地调整此延迟。比方说,如果我看到超过 5% 的请求在 10 秒内失败,我会增加延迟。如果超过 10 秒低于 5%,那么我会再次减少延迟。

Reactor 中是否有现有的机制?我可以从我的角度想出一些创造性的解决方案,但想知道他们(或其他人)是否已经实施了。

【问题讨论】:

  • 你有用于 kafka 和 HTTP 调用的响应式驱动程序吗?这些问题应该通过来自 HTTP 客户端的背压(或整体背压)来解决,而不是延迟单个元素
  • @arap 您能否提供此类驱动程序的示例?
  • 看看这个用于 Kafka 的:github.com/reactor/reactor-kafka 和用于 HTTP 的 Spring Webclient
  • @arap 这是我正在使用的确切库

标签: java apache-kafka reactive-programming project-reactor circuit-breaker


【解决方案1】:

您可以使用指数退避添加重试。像这样的东西:

influx()
.flatMap(x -> Mono.just(x)
    .map(data -> apiCall(data))
    .retryWhen(
            Retry.backoff(Integet.MAX_VALUE, Duration.ofSeconds(30))
                .filter(err -> err instanceof RuntimeException)
                .doBeforeRetry(
                    s -> log.warn("Retrying for err {}", s.failure().getMessage()))
                .onRetryExhaustedThrow((spec, sig) -> new RuntimeException("ex")))
                .onErrorResume(err -> Mono.empty()),
        concurrency_val,
        prefetch_val)

这将重试失败的请求 Integet.MAX_VALUE 次,每次重试之间的最短时间为 30 秒。随后的重试实际上会被可配置的抖动因子(默认值 = 0.5)抵消,从而导致连续重试之间的持续时间增加。

Retry.backoff 上的文档说:

一个 RetryBackoffSpec 预配置为带有抖动的指数回退策略,给定最大重试尝试次数和最小回退持续时间。

此外,由于整个操作都映射在 flatMap 中,因此您可以更改默认的 concurrencyprefetch 值,以考虑在任何给定时间可能失败的最大请求数,而整个管道等待 RetryBackOffSpec 成功完成。

最坏的情况是,您的 concurrency_val 请求数失败并等待 30 多秒以重试发生。如果下游系统没有及时恢复,整个操作可能会停止(仍在等待下游的成功),这可能是不可取的。最好将 backOff 限​​制从 Integer.MAX_VALUE 替换为可管理的内容,超过该限制它只会记录错误并继续下一个事件。

【讨论】:

    【解决方案2】:

    如果您正在寻找延迟元素取决于元素处理速度,您可以使用delayUntil

      Flux.range(1, 100)
          .doOnNext(i -> System.out.println("Kafka Receive :: " + i))
          .delayUntil(i -> Mono.fromSupplier(() -> i)
                                .map(k -> {
                                    // msg processing
                                    return k * 2;
                                })
                                .delayElement(Duration.ofSeconds(1)) // msg processing simulation
                                .doOnNext(k -> System.out.println("Kafka send :: " + k)))
          .subscribe();
    

    【讨论】:

    • 这不是我想要的。如果我开始收到 5XX 错误,我需要能够更慢地向下游 HTTP 服务发送元素,然后在错误停止时更快地恢复
    【解决方案3】:

    我认为任何 HTTP 客户端都没有提供背压,包括 netty。一种选择是切换到 RSocket,但如果您正在调用第三方服务,我猜这可能不是一个选择。您可以调整在一天中大部分时间都有效的速率,并使用 doOnError 或类似方法将错误消息发送到另一个主题。另一个接收者可以以更高的延迟处理这些消息,如果消息再次出错,则将消息放回同一个主题并重试计数,以便您最终可以停止处理它们。

    【讨论】:

    • 但是如果我将它们发送到另一个主题或使用内置的重试机制,初始客户端将继续以相同的速率发送请求,导致性能更加下降
    猜你喜欢
    • 1970-01-01
    • 2017-10-03
    • 1970-01-01
    • 2018-12-13
    • 1970-01-01
    • 2013-11-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多