【发布时间】:2022-01-09 11:22:08
【问题描述】:
我是响应式编程的新手,并且在 Spring Webflux 中编写了一个 kafka 消费者来消费事件,对其进行处理并处理成功和失败的场景。我想了解如何使用千分尺计算处理每个事件(成功和失败)所花费的时间指标。我知道我们可以使用 Micrometer 的 Timer 接口来计算这样的处理时间-
Timer timer = Timer.builder("kafka.consumer.time")
.tag("eventType", "Event A")
.register(meterRegistry);
timer.record(Duration.ofMillis(System.currentTimeMillis() - inTime));
inTime- 事件处理开始的时间。
但我无法弄清楚我应该如何在反应式编程场景中使用“inTime”,因为它是一个事件流,并且一次会处理多个事件。
@EventListener(ApplicationReadyEvent.class)
public void consume() {
kafkaReceiver
.receive()
.concatMap(res -> kafkaHelper.process(res)
.doOnError(error -> {
log.error("Error occurred);
}).retryWhen(Retry.backoff(3, Duration.ofSeconds(9)).jitter(0.5))
.onErrorResume(error -> {
log.error("Retry exhausted);
return Mono.empty();
})
.doOnSuccess(val -> {
res.receiverOffset().acknowledge();
})).subscribe();
}
请帮忙。提前致谢。
【问题讨论】:
标签: apache-kafka spring-webflux reactive flux micrometer