【发布时间】:2019-12-15 10:08:40
【问题描述】:
我有一个使用以下属性定义的 Kafka 消费者:
session.timeout.ms = 60000
heartbeat.interval.ms = 6000
我们注意到大约 2000 条消息的延迟,并看到消费者多次使用同一条消息(通过我们的应用日志)。此外,注意到一些消息需要大约 10 秒才能完全处理。我们的怀疑是消费者没有正确提交偏移量(或重复提交相同的旧偏移量),因此消费者收到了相同的消息。
为了解决这个问题,我们引入了更多属性:
auto.commit.interval.ms=20000 //To ensure that commit is happening only after processing of message is completed
max.poll.records=10 //To make the consumer pick only 10 messages in one go
And, we set the concurrency to 1.
这解决了我们的问题。滞后开始减少,最终为 0。
但是,我仍然不清楚为什么会首先出现问题。 据我了解,默认情况下:
enable.auto.commit = true
auto.commit.interval.ms=5000
因此,理想情况下,消费者应该每 5 秒提交一次。如果在此时间范围内没有完全处理消息,会发生什么?消费者正在提交什么补偿?问题是否是由于投票记录大小过大(默认为 500)引起的
另外,关于 poll() 方法,我读到:
poll() 调用在设置的 auto.commit.interval.ms 的后台发出。
那么,最初如果 poll() 早于每 5 秒发生一次(默认为 auto.commit.interval),为什么它不提交最新的偏移量?因为消费者还没有做完处理吗?然后,它应该在接下来的 5 秒提交该偏移量。
有人可以回答这些问题并解释为什么会出现最初的问题吗?
【问题讨论】:
标签: spring spring-boot apache-kafka kafka-consumer-api spring-kafka