【发布时间】:2019-10-03 23:29:52
【问题描述】:
我已经使用 Kafka 几个月了,我意识到一些核心概念对我来说还不是很清楚。我的疑问与consumerId、groupId 和offsets 之间的关系有关。在我们的应用程序中,我们需要 Kafka 使用发布 - 订阅范式工作,因此我们为每个消费者使用不同的组 ID,这些 ID 是随机生成的。
我以前认为设置auto.offset.reset = latest我的消费者总是会收到他们还没有收到的消息,但最近我学会了that is not the case。这仅在消费者尚未提交偏移量时才有效。在任何其他情况下,消费者将继续接收偏移量大于其提交的最后偏移量的消息。
由于我总是使用随机组 id 创建新的消费者,我意识到我的消费者“没有记忆”,他们是新消费者,他们永远不会提交偏移量,所以auto.offset.reset = latest 策略将始终适用。这就是我的怀疑开始的地方。 假设以下场景:
- 我有两个客户端应用程序,A 和 B,每个都有一个使用者,以发布 - 订阅方式工作(因此,使用不同的组 ID)。两个消费者都订阅了主题
my-topic。auto.offset.resetsetting 对于两个消费者来说都是latest。 - 一些生产者(或多个生产者)将消息 M1、M2 和 M3 发布到主题
my-topic。 - A 和 B 都接收 M1、M2 和 M3。
- 现在我关闭应用程序 B。
- 生产者产生消息 M4 和 M5。
- 应用程序 A 接收消息 M4 和 M5。
- 现在我重新启动应用程序 B。记住,
groupId是随机的,我没有设置任何消费者 ID,这意味着这是一个新消费者(对吗?)。应用程序 B 没有收到任何消息。 - 生产者发布消息 M6 和 M7。
- 应用程序 A 和 B 都接收消息 M6 和 M7。
所以,总结一下,如果我没记错的话,A 收到了所有消息,但 B 错过了 M4 和 M5。我用kafka-console-consumer.sh 试过这个,它的行为是这样的。
那么,如何让应用程序 B 在关闭时接收发布的消息?现在,如果我启动它时分配与最初启动时相同的 groupId,它将读取消息 M4 和 M5,但这是设置组 ID。是否也可以设置消费者 ID 并获得相同的行为?
或者换一种说法,重新启动同一个消费者是什么意思?如果两个消费者有相同的groupId、相同的consumerId,那么两个消费者是同一个消费者?
对了,consumerId和属性client.id是一样的吗?
【问题讨论】:
标签: apache-kafka kafka-consumer-api publish-subscribe