【问题标题】:Kafka 0.11 how to reset offsetsKafka 0.11 如何重置偏移量
【发布时间】:2018-01-22 01:56:27
【问题描述】:

我正在尝试使用最新的 Kafka CLI 工具重置消费者偏移量。

kafka-consumer-groups.bat --bootstrap-server kafka-host:9092 --group my-group --reset-offsets --to-earliest --all-topics

结果我看到了这个输出:

TOPIC                            PARTITION  NEW-OFFSET
FirstTopic                       0          0
SecondTopic                      0          0

但是再次运行命令:

kafka-consumer-groups.bat --bootstrap-server kafka-host:9092 --group my-group --describe

输出结果:

Consumer group 'my-group' has no active members.

TOPIC              PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
FirstTopic         0          1230            1230            0  
SecondTopic        0          1022            1022            0

我尝试了其他选项,例如重置为显式偏移或直接指定主题,但结果相同。输出提示操作成功,同时使用 describe 命令检查偏移量或调试显示偏移量未更改。

任何人都可以在非 Zookeeper 代理中成功重置消费者偏移量。

【问题讨论】:

    标签: apache-kafka


    【解决方案1】:

    默认情况下,--reset-offsets 只打印操作的结果。要实际执行操作,您需要将--execute 添加到您的命令中:

    kafka-consumer-groups.bat --bootstrap-server kafka-host:9092 --group
    my-group --reset-offsets --to-earliest --all-topics --execute
    

    【讨论】:

    • 还有其他选项可以重置为特定的偏移量,比如上述问题中 FirstTopic 的 1200 吗?具体来说,我希望我的消费者重置到特定的时间点(无论当时的偏移量是多少)
    • 我也想知道如何通过 Kafka Java API 来完成这些。
    【解决方案2】:

    虽然接受的答案完美地回答了 OP 问题,但还有更多参数可用于重置偏移量。所以添加这个答案来扩展接受的答案。

    将所有主题的偏移量重置为消费者组中最早的偏移量

    kafka-consumer-groups.sh --bootstrap-server <kafka_broker_host:9091> --group
        <group_name> --reset-offsets --to-earliest --all-topics --execute
    

    将特定主题的偏移量重置为消费组中最早的偏移量

    kafka-consumer-groups.sh --bootstrap-server <kafka_broker_host:9091> --group
        <group_name> --reset-offsets --to-earliest --topic <my-topic> --execute
    

    将特定主题的偏移量重置为消费者组中的特定偏移量

    kafka-consumer-groups.sh --bootstrap-server <kafka_broker_host:9091> --group
        <group_name> --reset-offsets --to-offset 1000 --topic <my-topic> --execute
    

    其他支持的参数:

    --shift-by [正整数或负整数] - 从给定整数向前或向后移动偏移量。

    --to-current--to-latest--to-offset--to 相同- 最早的

    --to-datetime [日期时间格式为yyyy-MM-ddTHH:mm:ss.xxx]

    kafka-consumer-groups.sh --bootstrap-server <kafka_broker_host:9091> --group
        <group_name> --reset-offsets --to-datetime 2017-08-04T00:00:00.000 [ --all-topics or --topic <topic-name> ] --execute
    

    --by-duration [格式为 PnDTnHnMnS]

    kafka-consumer-groups.sh --bootstrap-server <kafka_broker_host:9091> --group
        <group_name> --reset-offsets --by-duration PT0H10M0S [ --all-topics or --topic <topic-name> ] --execute
    

    重置为从当前时间戳偏移持续时间。

    如何验证?

    使用下面的命令来检查当前/结束的偏移量并确认重置已完成。

    kafka-consumer-groups.sh --bootstrap-server <kafka_host:port> --group <group_id> --describe
    

    示例输出:

    Consumer group 'group1' has no active members.
    
    TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
    intro           0          0               99              99              -               -               -
    

    可能的错误:

    错误:只有在组“[group_name]”处于非活动状态但当前状态为稳定时才能重置分配。

    '稳定'意味着,有一个活跃的消费者正在为这个组运行。所以首先你必须停止活跃的消费者并重试重置偏移量。

    如果消费者组有活跃的消费者,则无法重置偏移量。

    【讨论】:

    • ,是否有任何官方文档可用?
    猜你喜欢
    • 1970-01-01
    • 2018-05-03
    • 2017-03-09
    • 2018-02-03
    • 2016-06-01
    • 2021-10-16
    • 2017-07-22
    • 1970-01-01
    • 2017-03-27
    相关资源
    最近更新 更多