【发布时间】:2020-11-07 21:54:41
【问题描述】:
TL;DR;
有没有办法根据下游健康状况自动调整 Project Reactor 中元素之间的延迟?
更多详情
我有一个应用程序,它从 Kafka 主题读取记录,为每个记录发送一个 HTTP 请求,并将结果写入另一个 Kafka 主题。从/向 Kafka 读取和写入快速且容易,但第三方 HTTP 服务很容易被淹没,所以我使用 delayElements() 和属性文件中的值,这意味着该值在应用程序运行时不会改变。这是一个代码示例:
kafkaReceiver.receiveAutoAck()
.concatMap(identity())
.delayElements(ofMillis(delayElement))
.flatMap(message -> recordProcessingFunction.process(message.value()), messageRate)
.onErrorContinue(handleError())
.map(this::getSenderRecord)
.flatMap(kafkaSender::send)
但是,第三方服务的超时执行可能会有所不同,我希望能够相应地调整此延迟。比方说,如果我看到超过 5% 的请求在 10 秒内失败,我会增加延迟。如果超过 10 秒低于 5%,那么我会再次减少延迟。
Reactor 中是否有现有的机制?我可以从我的角度想出一些创造性的解决方案,但想知道他们(或其他人)是否已经实施了。
【问题讨论】:
-
你有用于 kafka 和 HTTP 调用的响应式驱动程序吗?这些问题应该通过来自 HTTP 客户端的背压(或整体背压)来解决,而不是延迟单个元素
-
@arap 您能否提供此类驱动程序的示例?
-
看看这个用于 Kafka 的:github.com/reactor/reactor-kafka 和用于 HTTP 的 Spring Webclient
-
@arap 这是我正在使用的确切库
标签: java apache-kafka reactive-programming project-reactor circuit-breaker