【问题标题】:How does Zookeeper retrive the consumer offsets from __consumer_offsets topic?Zookeeper 如何从 __consumer_offsets 主题中检索消费者偏移量?
【发布时间】:2020-10-25 14:19:27
【问题描述】:

这是“Where do zookeeper store Kafka cluster and related information?”的后续问题,基于 Armando Ballaci 提供的答案。

现在很明显,消费者偏移量存储在 Kafka 集群中一个名为 __consumer_offsets 的特殊主题中。没关系,我只是想知道这些偏移量的检索是如何工作的。

主题不像 RDBS,我们可以基于某个谓词查询任意数据。例如 - 如果数据存储在 RDBMS 中,可能像下面这样的查询将获取某个消费者组的特定消费者的主题的特定分区的消费者偏移量。

select consumer_offset__read, consumer_offset__commited from consumer_offset_table where consumer-grp-id="x" and partitionid="y"

但显然这种检索在 Kafka 主题上是不可能的。那么从主题中检索机制是如何工作的呢?有人可以详细说明吗?

(来自 Kafka 分区的数据在 FIFO 中读取,如果遵循 Kafka 消费者模型来检索特定的偏移量,则必须处理大量额外的数据,而且速度会很慢。所以我想知道它是否在某些情况下完成其他方式...)

【问题讨论】:

    标签: apache-kafka apache-zookeeper kafka-consumer-api kafka-topic


    【解决方案1】:

    当我在日常工作中偶然发现这一点时,我可以在网上找到一些描述如下:

    在 Kafka 到 0.8.1.1 的版本中,消费者将他们的偏移量提交给 ZooKeeper。当存在大量偏移(即消费者计数 * 分区计数)时,ZooKeeper 无法很好地扩展(尤其是对于写入)。幸运的是,Kafka 现在提供了一种存储消费者偏移量的理想机制。消费者可以通过将偏移量写入持久(复制)和高可用性主题来提交他们在 Kafka 中的偏移量。消费者可以通过读取这个主题来获取偏移量(尽管我们提供了一个内存中的偏移量缓存以便更快地访问)。即,偏移提交是常规的生产者请求(成本低廉),而偏移提取是快速内存查找。

    Kafka 官方文档描述了该功能的工作原理以及如何将偏移量从 ZooKeeper 迁移到 Kafka。本 wiki 提供了示例代码,展示了如何使用新的基于 Kafka 的偏移存储机制。

    try {
            BlockingChannel channel = new BlockingChannel("localhost", 9092,
                    BlockingChannel.UseDefaultBufferSize(),
                    BlockingChannel.UseDefaultBufferSize(),
                    5000 /* read timeout in millis */);
            channel.connect();
            final String MY_GROUP = "demoGroup";
            final String MY_CLIENTID = "demoClientId";
            int correlationId = 0;
            final TopicAndPartition testPartition0 = new TopicAndPartition("demoTopic", 0);
            final TopicAndPartition testPartition1 = new TopicAndPartition("demoTopic", 1);
            channel.send(new ConsumerMetadataRequest(MY_GROUP, ConsumerMetadataRequest.CurrentVersion(), correlationId++, MY_CLIENTID));
            ConsumerMetadataResponse metadataResponse = ConsumerMetadataResponse.readFrom(channel.receive().buffer());
     
            if (metadataResponse.errorCode() == ErrorMapping.NoError()) {
                Broker offsetManager = metadataResponse.coordinator();
                // if the coordinator is different, from the above channel's host then reconnect
                channel.disconnect();
                channel = new BlockingChannel(offsetManager.host(), offsetManager.port(),
                                              BlockingChannel.UseDefaultBufferSize(),
                                              BlockingChannel.UseDefaultBufferSize(),
                                              5000 /* read timeout in millis */);
                channel.connect();
            } else {
                // retry (after backoff)
            }
        }
        catch (IOException e) {
            // retry the query (after backoff)
        }
    

    【讨论】:

    • 有点棘手的问题,如果偏移量存储在 __consumer_offset 主题中,kafka 会这样做。存储的主题 __consumer_offset 的最后读取偏移量在哪里。 (除非消费者想要读取 __consumer_offset 中的每个日志。???)。
    • 这可能会有所帮助 - medium.com/@felipedutratine/…
    【解决方案2】:

    在 Kafka 到 0.8.1.1 的版本中,消费者将他们的偏移量提交给 ZooKeeper。当存在大量偏移(即消费者计数 * 分区计数)时,ZooKeeper 无法很好地扩展(尤其是对于写入)。幸运的是,Kafka 现在提供了一种存储消费者偏移量的理想机制。消费者可以通过将偏移量写入持久(复制)和高可用性主题来提交他们在 Kafka 中的偏移量。消费者可以通过读取这个主题来获取偏移量(尽管我们提供了一个内存中的偏移量缓存以便更快地访问)。即,偏移提交是常规的生产者请求(成本低廉),而偏移提取是快速内存查找。

    Kafka 官方文档描述了该功能的工作原理以及如何将偏移量从 ZooKeeper 迁移到 Kafka。

    这个想法是,如果您需要您描述的这种功能,您需要将数据存储在 RDBS 或 NoSQL 数据库或 ELK Stack 中。一个好的模式是使用 Sink 连接器通过 Kafka Connect。 Kafka 中的正常消息处理是通过消费者或流定义完成的,这些定义在事件发生时对其做出反应。在某些情况下,您当然可以寻求偏移或时间戳,这是完全可能的......

    在最新版本的 Kafka 中,偏移量不再保存在 Zookeeper 中。所以 Zookeeper 不参与消费者偏移处理。

    【讨论】:

    • 我看到了一种“鸡和蛋”的问题——如果偏移量存储在 __consumer_offset 主题中,kafka 会这样做。存储的主题 __consumer_offset 的最后读取偏移量在哪里。 (除非消费者想要读取 __consumer_offset 中的每个日志。???)。
    • 消费者自己管理它的偏移量,所以如果你用“auto.offset reset”配置它来定义消费者在没有提交位置时的行为(当组是首次初始化)或当偏移量超出范围时。您可以选择将位置重置为“最早”偏移或“最新”偏移(默认)。
    猜你喜欢
    • 2019-01-19
    • 2018-10-27
    • 2017-05-12
    • 2015-04-03
    • 1970-01-01
    • 2018-07-29
    • 2016-03-05
    • 2019-05-01
    • 2020-03-10
    相关资源
    最近更新 更多