【问题标题】:Cannot see offsets committed to __consumer_offsets topic in Kafka在 Kafka 中看不到提交给 __consumer_offsets 主题的偏移量
【发布时间】:2018-11-09 04:42:21
【问题描述】:

我有一个 Spark Streaming Scala 应用程序,它从 Kafka 主题读取数据并将其放置在 HDFS 上。我希望应用程序将读取消息的偏移量存储到 __consumer_offsets 主题,以便在应用程序失败时开始从中读取。该应用程序运行良好(我可以看到 HDFS 上的数据),但我看不到它对 __consumer_offsets 的提交。

这是我的 KafkaParams:

val kafkaParams = Map(
      "metadata.broker.list" -> "xx.xxx.x.xx:6667",
      "enable.auto.commit" -> "true",
      "group.id" -> "reading_telemetry",
      "offsets.storage" -> "kafka"
    )

我用来从 __consumer_offsets 获取已提交偏移量的命令如下:

$ /usr/hdp/3.0.0.0-1634/kafka/bin/kafka-console-consumer.sh --consumer.config /tmp/consumer.config   --zookeeper xx.xxx.x.xx:2181   --topic __consumer_offsets --from-beginning --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"

我得到了一些关于表单提交偏移量的信息

[test1,test,0]::[OffsetMetadata[55,NO_METADATA],CommitTime 1539603328309,过期时间6723603328309]

但我没有看到“reading_telemetry”组 ID 的任何提交。有什么想法,为什么?

我的环境:

卡夫卡:1.0.1 火花:2.3.1 斯卡拉:2.11.8

【问题讨论】:

    标签: apache-spark apache-kafka


    【解决方案1】:

    使用 kafka-consumer-groups.sh 脚本,如下所示:

    kafka-consumer-groups.sh  --bootstrap-server <BootStrapServerIP:port> --describe --group telemetryGroup
    

    它将返回以下格式的信息:

    GROUP                  TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG       OWNER
    telemetryGroup        test-topic      0          15              15              0         telemetryGroup-1/127.0.0.1
    telemetryGroup        test-topic      1          14              15              1         telemetryGroup-2_/127.0.0.1
    

    【讨论】:

      【解决方案2】:

      您不应直接阅读__consumer_offsets 主题。这是一个内部主题,您应该改用工具来检索已提交的偏移量。

      最简单的方法是运行kafka-consumer-groups 工具:

      kafka-consumer-groups.sh \
          --bootstrap-server [BOOTSTRAP_SERVERS] \
          --describe \
          --group reading_telemetry
      

      CURRENT-OFFSET 列包含提交的偏移量。

      【讨论】:

      • 感谢您的建议。刚试过 - 得到这个结果:错误:消费者组'reading_telemetry'不存在。这很奇怪,因为我在 KafkaParams 对象中指定了组 ID。知道从这里去哪里吗?
      • 在 Kafka 1.0 中,默认情况下,提交的偏移量仅保留 24 小时 (offsets.retention.minutes)。可能是您的应用程序没有在这个时间范围内提交吗?如果不是,那么恐怕我们需要消费者日志来确定原因。
      猜你喜欢
      • 2015-11-21
      • 2021-11-14
      • 1970-01-01
      • 2018-11-28
      • 2015-02-14
      • 2019-07-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多