【问题标题】:How to create a Bulk consumer in Spring webflux and Kafka如何在 Spring webflux 和 Kafka 中创建 Bulk 消费者
【发布时间】:2021-04-26 08:28:27
【问题描述】:

我需要轮询 kafka 并批量处理事件。在 Reactor kafka 中,由于它是一个蒸汽 API,我将事件作为流获取。有没有办法组合并获得固定的最大事件大小。

这就是我目前正在做的事情。

final Flux<Flux<ConsumerRecord<String, String>>> receive = KafkaReceiver.create(eventReceiverOptions)
    .receiveAutoAck();
receive
    .concatMap(r -> r)
    .doOnEach(listSignal -> log.info("got one message"))
    .map(consumerRecords -> consumerRecords.value())
    .collectList()
    .flatMap(strings -> {
      log.info("Read messages of size {}", strings.size());
      return processBulkMessage(strings)
          .doOnSuccess(aBoolean -> log.info("Processed records"))
          .thenReturn(strings);
    }).subscribe();

但代码只是在 collectList 之后挂起,永远不会到达最后一个 flatMap。

提前致谢。

【问题讨论】:

    标签: apache-kafka spring-webflux reactor reactor-kafka


    【解决方案1】:

    您只需对普通的 .concatMap(r -&gt; r) 进行“扁平化”,因此您完全消除了最初由 receiveAutoAck() 构建的批处理。要为您的processBulkMessage() 处理列表流,请考虑将所有批处理逻辑移至concatMap()

    .concatMap(batch -> batch
                        .doOnEach(listSignal -> log.info("got one message"))
                        .map(ConsumerRecord::value)
                        .collectList())
                .flatMap(strings -> {
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-08-17
      • 1970-01-01
      • 2023-01-09
      • 1970-01-01
      • 2020-08-03
      相关资源
      最近更新 更多