【发布时间】:2021-05-12 23:32:28
【问题描述】:
假设我有两个主题(都有两个分区和无限保留):
my_topic_amy_topic_b
和一个消费者群体:
my_consumer
在某个时候,它同时消耗了这两个主题,但由于一些变化,它不再对 my_topic_a 感兴趣,所以它停止了消耗它,现在正在累积延迟:
kafka-consumer-groups.sh --bootstrap-server=kafka.core-kafka.svc.cluster.local:9092 --group my_consumer --describe
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my_topic_a 0 300000 400000 100000 - - -
my_topic_a 1 300000 400000 100000 - - -
my_topic_b 0 500000 500000 0 - - -
my_topic_b 1 500000 500000 0 - - -
这种延迟让我很恼火,因为:
- 我在 Grafana 中的消费者滞后图被污染了。
- 触发了自动警报,提醒我有消费者滞后太多。
因此,我想摆脱my_topic_a 的my_consumer 的偏移量,以达到my_consumer 从未消耗过my_topic_a 的状态。
以下尝试失败:
kafka-consumer-groups.sh --bootstrap-server kafka:9092 --group my_consumer_group --delete --topic domain.user
有了这个输出:
The consumer does not support topic-specific offset deletion from a consumer group.
我怎样才能实现我的目标? (在我的用例中,暂时停止该组的所有消费者将是一个可行的选择。)
(我使用的是Kafka版本2.2.0。)
我的猜测是,可以通过在主题__consumer_offsets 上写一些东西来完成某些事情,但我不知道会是什么。目前,该主题如下所示(再次简化):
kafka-console-consumer.sh --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --bootstrap-server kafka:9092 --topic __consumer_offsets --from-beginning
...
[my_consumer_group,my_topic_a,0]::OffsetAndMetadata(offset=299999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000000000, expireTimestamp=None)
[my_consumer_group,my_topic_a,0]::OffsetAndMetadata(offset=300000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000100000, expireTimestamp=None)
...
[my_consumer_group,my_topic_a,1]::OffsetAndMetadata(offset=299999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000000000, expireTimestamp=None)
[my_consumer_group,my_topic_a,1]::OffsetAndMetadata(offset=300000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000100000, expireTimestamp=None)
...
[my_consumer_group,my_topic_b,0]::OffsetAndMetadata(offset=499999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000000000, expireTimestamp=None)
[my_consumer_group,my_topic_b,0]::OffsetAndMetadata(offset=500000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000100000, expireTimestamp=None)
...
[my_consumer_group,my_topic_b,1]::OffsetAndMetadata(offset=499999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000000000, expireTimestamp=None)
[my_consumer_group,my_topic_b,1]::OffsetAndMetadata(offset=500000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000100000, expireTimestamp=None)
【问题讨论】:
-
没有办法从 Grafana 仪表板中过滤掉数据吗?或者可能在偏移保留期内静音主题标签上的警报?
-
@OneCricketeer 当然,我可能会找到一种方法来调整所有依赖项(Grafana 的 Prometheus、警报管理器等)的配置,以过滤掉这个过时的偏移量。然后每次再次执行此操作,其他一些消费者组停止消费其主题之一。但我更喜欢更简洁、更有说服力的解决方案。
-
@OneCricketeer 另外,据我了解,这不仅仅适用于偏移保留期,因为这仅适用于停止消费所有主题的消费者。我的集群有一天的
offsets.retention.minutes,而my_consumer有几周没有消耗topic_a,但由于它仍在积极阅读其他主题,因此没有删除任何内容,并且滞后仍然存在。 -
该主题被压缩,这意味着只有关闭的日志段会被清理。默认段大小为 1G,因为
OffsetAndMetadata是一种紧凑的二进制格式,所以那里有很多数据。但这也意味着,假设[my_consumer_group,my_topic_b,0]的密钥超过 1 天没有被看到,并且在当前打开的日志段中不存在,那么它将被删除。 -
@OneCricketeer 感谢您的解释。由于我使用
log.cleaner.max.compaction.lag.ms=86400000,所以我认为即使日志段没有达到log.segment.bytes,[my_consumer_group,my_topic_a,...]也应该在一天后删除。然而,实际上,就我而言,经过数周后它仍然存在。所以在我的集群中似乎有些东西没有按预期工作。
标签: apache-kafka