【问题标题】:How to monitor kafka consumer lag for transactional consumers如何监控交易消费者的 kafka 消费者滞后
【发布时间】:2019-06-11 09:38:32
【问题描述】:

spring-kafka 中有一个用于监控 Kafka Consumer 滞后的有用指标,称为 kafka_consumer_records_lag_max_records。但该指标不适用于交易型消费者。是否有特定配置可以为交易消费者启用滞后指标?

我已将我的消费者组配置为使用隔离级别 read_committed,并且指标包含 kafka_consumer_records_lag_max_records{client_id="listener-1",} -Inf

【问题讨论】:

  • 我添加了 apache-kafka 标签,因为 Spring 和 Micrometer 所做的只是提供对 kafka-clients 的 MBeans 的访问,-Infinity 值的来源。

标签: apache-kafka spring-kafka spring-micrometer


【解决方案1】:

“不起作用”是什么意思?我刚刚测试了它,它工作正常......

@SpringBootApplication
public class So56540759Application {

    public static void main(String[] args) throws IOException {
        ConfigurableApplicationContext context = SpringApplication.run(So56540759Application.class, args);
        System.in.read();
        context.close();
    }

    private MetricName lagNow;

    private MetricName lagMax;

    @Autowired
    private MeterRegistry meters;

    @KafkaListener(id = "so56540759", topics = "so56540759", clientIdPrefix = "so56540759",
            properties = "max.poll.records=1")
    public void listen(String in, Consumer<?, ?> consumer) {
        Map<MetricName, ? extends Metric> metrics = consumer.metrics();
        Metric currentLag = metrics.get(this.lagNow);
        Metric maxLag = metrics.get(this.lagMax);
        System.out.println(in
                + " lag " + currentLag.metricName().name() + ":" + currentLag.metricValue()
                + " max " + maxLag.metricName().name() + ":" + maxLag.metricValue());
        Gauge gauge = meters.get("kafka.consumer.records.lag.max").gauge();
        System.out.println("lag-max in Micrometer: " + gauge.value());
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so56540759", 1, (short) 1);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        Set<String> tags = new HashSet<>();
        FetcherMetricsRegistry registry = new FetcherMetricsRegistry(tags, "consumer");
        MetricNameTemplate temp = registry.recordsLagMax;
        this.lagMax = new MetricName(temp.name(), temp.group(), temp.description(),
                Collections.singletonMap("client-id", "so56540759-0"));
        temp = registry.partitionRecordsLag;
        Map<String, String> tagsMap = new LinkedHashMap<>();
        tagsMap.put("client-id", "so56540759-0");
        tagsMap.put("topic", "so56540759");
        tagsMap.put("partition", "0");
        this.lagNow = new MetricName(temp.name(), temp.group(), temp.description(), tagsMap);

        return args -> IntStream.range(0, 10).forEach(i -> template.send("so56540759", "foo" + i));
    }

}
2019-06-11 12:13:45.803  INFO 32187 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [localhost:9092]
    check.crcs = true
    client.id = so56540759-0
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = so56540759
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_committed
    ...
    transaction.timeout.ms = 60000
    ...

2019-06-11 12:13:45.840  INFO 32187 --- [o56540759-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so56540759-0]
foo0 lag records-lag:9.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo1 lag records-lag:8.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo2 lag records-lag:7.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo3 lag records-lag:6.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo4 lag records-lag:5.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo5 lag records-lag:4.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo6 lag records-lag:3.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo7 lag records-lag:2.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo8 lag records-lag:1.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo9 lag records-lag:0.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0

EDIT2

确实看到它会在 MBean 中转到 -Infinity 如果事务超时 - 即如果在我的测试中侦听器没有在 60 秒内退出。

【讨论】:

  • 该组的导出值为kafka_consumer_records_lag_max_records{client_id="listener-1",} -Inf,它似乎适用于非事务性kafka_consumer_records_lag_max_records{client_id="batcher-1",} 0。我正在从执行器端点获取普罗米修斯的值。您可以尝试在事务中发布事件吗?因为似乎有交易时它不会导出价值。 @GaryRussell
  • 我更新了我的应用程序以从 Micrometer 获取指标,它仍然可以正常工作。
  • 好的,我将做一些实验,然后用一个更具体的例子告诉你。谢谢!
  • 如果我在侦听器中设置断点并且事务超时(导致重新平衡),我确实看到它会无穷大。查看编辑。
  • 我还看到它的初始值为-Infinity,直到第一次获取this.recordsFetchLag.add(metrics.metricInstance(metricsRegistry.recordsLagMax), new Max()),其中Max() 被初始化为-Infinity(但非事务性消费者也是如此)。跨度>
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-08-28
  • 2020-08-08
  • 1970-01-01
  • 2023-04-11
  • 2019-09-27
  • 1970-01-01
相关资源
最近更新 更多