【问题标题】:How to calculate Kafka consumer processing time in Spring Webflux and Micrometer如何在 Spring Webflux 和 Micrometer 中计算 Kafka 消费者处理时间
【发布时间】:2022-01-09 11:22:08
【问题描述】:

我是响应式编程的新手,并且在 Spring Webflux 中编写了一个 kafka 消费者来消费事件,对其进行处理并处理成功和失败的场景。我想了解如何使用千分尺计算处理每个事件(成功和失败)所花费的时间指标。我知道我们可以使用 Micrometer 的 Timer 接口来计算这样的处理时间-

Timer timer = Timer.builder("kafka.consumer.time")
                    .tag("eventType", "Event A")
                    .register(meterRegistry);
    timer.record(Duration.ofMillis(System.currentTimeMillis() - inTime));

inTime- 事件处理开始的时间。

但我无法弄清楚我应该如何在反应式编程场景中使用“inTime”,因为它是一个事件流,并且一次会处理多个事件。

 @EventListener(ApplicationReadyEvent.class)
public void consume() {
    kafkaReceiver
            .receive()
            .concatMap(res -> kafkaHelper.process(res)
                    .doOnError(error -> {
                        log.error("Error occurred);
                    }).retryWhen(Retry.backoff(3, Duration.ofSeconds(9)).jitter(0.5))
                    .onErrorResume(error -> {
                        log.error("Retry exhausted);
                        return Mono.empty();
                    })
                    .doOnSuccess(val -> {
                        res.receiverOffset().acknowledge();
                    })).subscribe();

}

请帮忙。提前致谢。

【问题讨论】:

    标签: apache-kafka spring-webflux reactive flux micrometer


    【解决方案1】:

    围绕这个问题空间,您可以研究三件事:

    1. Reactor has micrometer support for publishers 因此您可以命名您的 Flux 并请求记录指标,如下所示:
    receive()
        .name("kafka.events") 
        .metrics() 
        .doOnNext(System.out::println)
        .subscribe();
    
    1. 你不应该像那样测量经过的时间 (System.currentTimeMillis()), here's why
    2. Micrometer 有 support for Kafka 可以直接从 Kafka 客户端提供一堆额外的指标,如果你碰巧使用 spring-kafka,你可以在这些之上获得额外的东西

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-04-04
      • 2019-11-25
      • 2017-08-13
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-08-17
      • 2019-01-20
      相关资源
      最近更新 更多