【问题标题】:Checking Offset of Kafka topic for a storm consumer为风暴消费者检查 Kafka 主题的偏移量
【发布时间】:2018-10-27 15:04:44
【问题描述】:

我正在使用storm-kafka-client 1.2.1并为KafkaTridentSpoutOpaque创建我的spout配置,如下所示

            kafkaSpoutConfig = KafkaSpoutConfig.builder(brokerURL, kafkaTopic)
                .setProp(ConsumerConfig.GROUP_ID_CONFIG,"storm-kafka-group")
                .setProcessingGuarantee(ProcessingGuarantee.AT_MOST_ONCE)
                .setProp(ConsumerConfig.CLIENT_ID_CONFIG,InetAddress.getLocalHost().getHostName())

我在 Kafka 和 Zookeeper 中都找不到我的组 ID 和偏移量。通过 Zookeeper,我尝试了 zkCli.sh 并尝试了 ls /consumers 但没有,因为我认为 Kafka 本身现在正在维护偏移量而不是 zookeeper。

我也用下面的命令尝试过 Kafka

bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand  --list  --bootstrap-server localhost:9092
Note: This will not show information about old Zookeeper-based consumers.
console-consumer-20130
console-consumer-82696
console-consumer-6106
console-consumer-67393
console-consumer-14333
console-consumer-21174
console-consumer-64550

谁能帮我找到我的偏移量,如果我重新启动拓扑,它会再次在 Kafka 中重播我的事件吗?

【问题讨论】:

    标签: apache-kafka apache-zookeeper apache-storm offset trident


    【解决方案1】:

    Trident 不在 Kafka 中存储偏移量,而是在 Storm 的 Zookeeper 中。如果您使用 Storm 的 Zookeeper 配置的默认设置运行,Storm 的 Zookeeper 中的路径将类似于 /coordinator/<your-topology-id>/meta

    该路径下的对象将包含第一个和最后一个偏移量,以及每个批次的主题分区。所以例如/coordinator/<your-topology-id>/meta/15 将包含第 15 批发出的第一个和最后一个偏移量。

    spout 重启后是否重放偏移量由您在KafkaSpoutConfig 中设置的FirstPollOffsetStrategy 控制。默认值为UNCOMMITTED_EARLIEST,它不会在重新启动时重新开始。请参阅https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L126 上的 Javadoc。

    【讨论】:

    • 我可以在这里找到偏移量——通过在 Storm ls /transactional/<MYSpout>/coordinator/meta @Stig 的 ZkCli 中运行命令如何使用主题分区 (=5) 使用来自 Kafka 的 Trident Storm 并行消费消息?关于增加吞吐量的任何建议?
    猜你喜欢
    • 2018-01-24
    • 2019-05-01
    • 2016-03-05
    • 1970-01-01
    • 2017-05-12
    • 2019-04-04
    • 2019-06-10
    • 2018-07-29
    • 1970-01-01
    相关资源
    最近更新 更多