【问题标题】:How to get message from a kafka topic with a specific offset如何从具有特定偏移量的 kafka 主题获取消息
【发布时间】:2022-01-30 11:41:41
【问题描述】:

我们有一个带有 3 个 kafka 代理的 HDP 集群(来自 hortonworks)

我们想运行 kafka 控制台消费者,以便从具有特定偏移量的主题中获取一条消息

/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper zoo01:2181  --topic lopet.lo.pm--partition 0 --offset 34537263 --max-messages 1

但我们得到以下结果:

我们错在哪里?

Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
Partition-offset based consumption is supported in the new consumer only.
Option                                   Description
------                                   -----------
--blacklist <blacklist>                  Blacklist of topics to exclude from
                                           consumption.
--bootstrap-server <server to connect    REQUIRED (unless old consumer is
  to>                                      used): The server to connect to.
--consumer-property <consumer_prop>      A mechanism to pass user-defined
                                           properties in the form key=value to
                                           the consumer.
--consumer.config <config file>          Consumer config properties file. Note
                                           that [consumer-property] takes
                                           precedence over this config.
--csv-reporter-enabled                   If set, the CSV metrics reporter will
                                           be enabled
--delete-consumer-offsets                If specified, the consumer path in
                                           zookeeper is deleted when starting up
--enable-systest-events                  Log lifecycle events of the consumer
                                           in addition to logging consumed
                                           messages. (This is specific for
                                           system tests.)
--formatter <class>                      The name of a class to use for
                                           formatting kafka messages for
                                           display. (default: kafka.tools.
                                           DefaultMessageFormatter)
--from-beginning                         If the consumer does not already have
                                           an established offset to consume
                                           from, start with the earliest
                                           message present in the log rather
                                           than the latest message.
--key-deserializer <deserializer for
  key>
--max-messages <Integer: num_messages>   The maximum number of messages to
                                           consume before exiting. If not set,
                                           consumption is continual.
--metrics-dir <metrics directory>        If csv-reporter-enable is set, and
                                           this parameter isset, the csv
                                           metrics will be outputed here
--new-consumer                           Use the new consumer implementation.
                                           This is the default.
--offset <consume offset>                The offset id to consume from (a non-
                                           negative number), or 'earliest'
                                           which means from beginning, or
                                           'latest' which means from end
                                           (default: latest)

【问题讨论】:

标签: apache-kafka kafka-consumer-api


【解决方案1】:

仅新消费者支持基于分区偏移的消费。

kafka-console-consumer 应使用 --bootstrap-server,如警告所述。

你在--partition之前缺少一个空格

但除此之外,--partition x --offset y 是正确的。


完整的命令

kafka-console-consumer \
  --bootstrap-server kafka0:9092 \
  --topic lopet.lo.pm \
  --partition 0 \
  --offset 34537263 \
  --max-messages 1

使用kcat 是另一种选择,如果你想安装它

【讨论】:

  • 很好的答案,Spring Kafka 可以吗?
  • @RCv 我不确定我明白你在问什么。无论框架如何,您都可以将任何消费者寻找到任何偏移量,并从该分区消费有限数量的消息
  • 谢谢,@One,是的,我的问题并没有真正集中。我的问题是我们可以使用 spring Kafka active listeners 来消费这些消息。是否可以使用这个 spring Kafka 库获取 Kafka 的特定消息?。
【解决方案2】:

如果有人想以编程方式使用 spring java。你可以阅读我的博客来了解这个想法。

https://rcvaram.medium.com/kafka-customer-get-what-needs-only-45d95f9b1105

Spring-Kafka 通过监听器为消费者提供一个抽象层。

要使用特定的偏移量消息,您必须按照以下步骤操作。

  1. 我们必须使用消费者类的consumer.assign 方法将消费者分配到主题分区。
  2. 分配消费者后,我们必须在使用consume.seek 恢复消费者时找到它需要查找的偏移量。
  3. 我们可以恢复消费者,并在 poll 方法的帮助下,它将消费来自 kafka 的消息。在此之前,我们的consumerFactory 应该已经将max.poll.records 设置为1。那么consumer 将只返回一条记录给我们。
  4. 最后,我们可以提交和暂停消费者。

如果您要使用 Kafka 消费者,那么您还应该注意并发进程,因为 Kafka 消费者不是线程安全的。

我的代码在my GitHub repository 中,我也在the maven repository 中发布了此代码。

【讨论】:

  • 这不是使用 Spring,不过,您只需要 kafka-clients.jar。另外,你应该仍然可以在使用订阅方法后进行搜索;您只是无法保证访问特定分区
  • 是的,来自客户端 jar 的消费者,但是,我使用 spring Kafka 消费者工厂来创建消费者。我用 AbstractSeekConsumerAware 类扩展了我的类,因为它处理了很多底层的复杂性。在这种情况下,此消费者是弹簧配置 ryt 的产品。这就是为什么我说,这是给 spring java 用户的。更多详情可以查看我的repo
  • 我没有得到你的最后一点。 @OneCricketeer。如果您提供更多详细信息或一些参考资料,那对我来说会很棒。谢谢
  • 1) AFAICT,工厂毫无意义,因为您只是使用它来构造具有硬编码属性映射的生产者,并且您从未在代码中设置/覆盖 AbstractSeekConsumerAware 方法 2) 您说“我们必须.assign消费者。这对于在分区中获取特定记录是正确的,当然,但仅获取任何单个记录是不正确的,这可以通过.subscribe
  • 谢谢 OneCricketeer,我明白了你的观点,但不确定。一旦我检查,我会更新我的答案。非常感谢
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2017-05-12
  • 1970-01-01
  • 1970-01-01
  • 2019-06-16
  • 1970-01-01
  • 1970-01-01
  • 2017-12-12
相关资源
最近更新 更多