【发布时间】:2021-01-29 13:04:09
【问题描述】:
我正在编写一个消费者应用程序来从 kafka 流中挑选记录并使用 spring-kafka 处理它。 我的处理步骤如下:
Getting records from stream --> dump it into a table --> Fetch records and call API --> API will update records into a table --> calling Async Commit()
似乎在某些情况下,API 处理需要更多时间,因为正在获取更多记录并且我们遇到了错误?
会员 consumer-prov-em-1-399ede46-9e12-4388-b5b8-f198a4e6a5bc 向协调员 apslt2555.uhc.com:9095 (id: 2147483577 rack: null) 由于消费者轮询超时已过期。这 意味着后续调用 poll() 之间的时间比 配置了 max.poll.interval.ms,这通常意味着 poll 循环花费太多时间处理消息。你可以解决 这可以通过增加 max.poll.interval.ms 或减少 使用 max.poll.records 在 poll() 中返回的批次的最大大小。
org.apache.kafka.clients.consumer.CommitFailedException: 提交不能 已完成,因为该组已经重新平衡并分配了 分区到另一个成员。这意味着之间的时间 对 poll() 的后续调用比配置的要长 max.poll.interval.ms,这通常意味着轮询循环是 花费太多时间处理消息。您可以解决这个问题 通过增加 max.poll.interval.ms 或减少最大大小 使用 max.poll.records 在 poll() 中返回的批次。
我知道这可以通过减少 max.poll.records 或增加 max.poll.interval.ms 来解决。如果我将 max.poll.records 设置为 10,那么我想了解什么 poll() 行为?是否会从流中提取 10 条记录等待这些记录被提交,然后再提交下 10 条记录?当下一次轮询发生时?它是否也会影响性能,因为我们将 max.poll.records 从默认的 500 减少到 10。
我是否也必须增加 max.poll.interval.ms。大概10分钟吧。在更改这些值时,我应该注意哪些负面影响?除了这些参数,还有其他方法可以处理这些错误吗?
【问题讨论】:
-
This 可能有帮助