【问题标题】:How to delete the consumer offset of a group for one specific topic如何删除一个特定主题的组的消费者偏移量
【发布时间】:2021-05-12 23:32:28
【问题描述】:

假设我有两个主题(都有两个分区和无限保留):

  • my_topic_a
  • my_topic_b

和一个消费者群体:

  • my_consumer

在某个时候,它同时消耗了这两个主题,但由于一些变化,它不再对 my_topic_a 感兴趣,所以它停止了消耗它,现在正在累积延迟:

kafka-consumer-groups.sh --bootstrap-server=kafka.core-kafka.svc.cluster.local:9092 --group my_consumer --describe
TOPIC                                PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                  HOST            CLIENT-ID
my_topic_a                           0          300000          400000          100000          -                                                            -               -
my_topic_a                           1          300000          400000          100000          -                                                            -               -
my_topic_b                           0          500000          500000          0               -                                                            -               -
my_topic_b                           1          500000          500000          0               -                                                            -               -

这种延迟让我很恼火,因为:

  • 我在 Grafana 中的消费者滞后图被污染了。
  • 触发了自动警报,提醒我有消费者滞后太多。

因此,我想摆脱my_topic_amy_consumer 的偏移量,以达到my_consumer 从未消耗过my_topic_a 的状态。

以下尝试失败:

kafka-consumer-groups.sh --bootstrap-server kafka:9092 --group my_consumer_group --delete --topic domain.user

有了这个输出:

The consumer does not support topic-specific offset deletion from a consumer group.

我怎样才能实现我的目标? (在我的用例中,暂时停止该组的所有消费者将是一个可行的选择。)

(我使用的是Kafka版本2.2.0。)


我的猜测是,可以通过在主题__consumer_offsets 上写一些东西来完成某些事情,但我不知道会是什么。目前,该主题如下所示(再次简化):

kafka-console-consumer.sh --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --bootstrap-server kafka:9092 --topic __consumer_offsets --from-beginning
...
[my_consumer_group,my_topic_a,0]::OffsetAndMetadata(offset=299999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000000000, expireTimestamp=None)
[my_consumer_group,my_topic_a,0]::OffsetAndMetadata(offset=300000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000100000, expireTimestamp=None)
...
[my_consumer_group,my_topic_a,1]::OffsetAndMetadata(offset=299999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000000000, expireTimestamp=None)
[my_consumer_group,my_topic_a,1]::OffsetAndMetadata(offset=300000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000100000, expireTimestamp=None)
...
[my_consumer_group,my_topic_b,0]::OffsetAndMetadata(offset=499999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000000000, expireTimestamp=None)
[my_consumer_group,my_topic_b,0]::OffsetAndMetadata(offset=500000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000100000, expireTimestamp=None)
...
[my_consumer_group,my_topic_b,1]::OffsetAndMetadata(offset=499999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000000000, expireTimestamp=None)
[my_consumer_group,my_topic_b,1]::OffsetAndMetadata(offset=500000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000100000, expireTimestamp=None)

【问题讨论】:

  • 没有办法从 Grafana 仪表板中过滤掉数据吗?或者可能在偏移保留期内静音主题标签上的警报?
  • @OneCricketeer 当然,我可能会找到一种方法来调整所有依赖项(Grafana 的 Prometheus、警报管理器等)的配置,以过滤掉这个过时的偏移量。然后每次再次执行此操作,其他一些消费者组停止消费其主题之一。但我更喜欢更简洁、更有说服力的解决方案。
  • @OneCricketeer 另外,据我了解,这不仅仅适用于偏移保留期,因为这仅适用于停止消费所有主题的消费者。我的集群有一天的offsets.retention.minutes,而my_consumer 有几周没有消耗topic_a,但由于它仍在积极阅读其他主题,因此没有删除任何内容,并且滞后仍然存在。
  • 该主题被压缩,这意味着只有关闭的日志段会被清理。默认段大小为 1G,因为 OffsetAndMetadata 是一种紧凑的二进制格式,所以那里有很多数据。但这也意味着,假设[my_consumer_group,my_topic_b,0] 的密钥超过 1 天没有被看到,并且在当前打开的日志段中不存在,那么它将被删除。
  • @OneCricketeer 感谢您的解释。由于我使用log.cleaner.max.compaction.lag.ms=86400000,所以我认为即使日志段没有达到log.segment.bytes[my_consumer_group,my_topic_a,...]也应该在一天后删除。然而,实际上,就我而言,经过数周后它仍然存在。所以在我的集群中似乎有些东西没有按预期工作。

标签: apache-kafka


【解决方案1】:

与此同时(Kafka 2.8),kafka-consumer-groups.sh 的新 --delete-offsets 参数已成为可能。 :-)

【讨论】:

    【解决方案2】:

    给你的输出:

    “消费者不支持从消费者组中删除特定主题的偏移量。”

    表示无法从消费者组中删除特定主题。

    您可以将新应用程序的消费者组更改为只读my_topic_b,重新启动应用程序,然后完全删除旧的和空闲的消费者组。通过这种方法,您将能够跟踪消费者的滞后,而不会出现任何分心和警报。当使用新的 consumerGroup 重新启动应用程序时,通常最好在重新启动期间停止主题“b”的生产者,以确保您不会丢失任何消息。

    我真的会避免手动玩弄主题__consumer_offsets

    作为替代方案,您可以定期运行 Kafka 附带的命令行工具来减少 ConsumerGroup 的滞后:

    > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group my_consumer --topic my_topic_a --to-latest 
    

    您可能需要添加--execute 选项。

    【讨论】:

    • 是的,如果停止生产者是可行的,它已经使用您描述的方式简单地切换到一个新的消费者组名称。可悲的是,在我的用例中,这不是一个可行的选择。
    • 好的,也许我刚刚添加到我的答案中的替代方案可能会帮助您解决问题。尽管如此,这是另一种解决方法......但我还没有看到任何可靠的解决方案来以您需要的方式操纵 consumer_offsets 主题。
    • 谢谢,这个解决方案非常实用,但我发现它存在三个问题。首先,它必须维护该虚假脚本的调度,从而增加了复杂性。其次,在未来类似的情况下,我必须添加更多这样的虚假脚本。第三,图表看起来消费者仍在消费这个话题,而实际上并非如此,这可能会导致未来得出错误的结论。
    猜你喜欢
    • 2018-07-29
    • 2020-12-21
    • 1970-01-01
    • 2017-07-22
    • 2015-03-25
    • 2019-05-01
    • 2017-07-22
    • 2017-10-20
    • 2016-12-06
    相关资源
    最近更新 更多