【问题标题】:KafkaHandler for multiple JSON types in Single Kafka Topic - Process in batch单个 Kafka 主题中多种 JSON 类型的 KafkaHandler - 批量处理
【发布时间】:2021-05-15 02:30:45
【问题描述】:

TL/DR;是否可以在批处理模式下为不同的 JSON 类型使用单独的 KafkaHandler?

我正在使用一个包含多个不同 JSON 消息的主题。我正在处理数据并将其插入数据库,因此我将批量处理,并在将所有内容插入数据库后手动执行 Kafka 提交。

所以我有我的工厂,其中包括

Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

原厂设置

factory.setBatchListener(true);

为了测试,我有两个 POJO,它们是带有字符串 var1 和 var2 的 TypeA 和带有字符串(var3 和 var4)的 TypeB。然后我的消费者有一个方法,它本质上是:

@KafkaListener(topics = "${kafka.topics}");
public void receive (List<Object> data, Acknowledgement ack) {
  for (int i = 0; i < data.size(); i++) {
    Object d = data.get(i);
    if (d instanceof TypeA) {
      LOGGER.info("We have Type A - '{}' - '{}'", a.getVar1(), a.getVar2());
    }
    if (d instancef TypeB) {
      LOGGER.info("We have Type B - '{}' - '{}'", b.getVar3(), a.getVar4());
    }
  }
  ack.acknowledge();
}

这可行,但我一直在尝试使用 KafkaHandlers 为每种类型而不是使用 instanceof。

如果我删除该行以启用批处理并将KafkaListener 注释移动到类级别,我可以创建单独的处理程序

@KafkaHandler
public void receiveA(@Payload TypeA) {
 LOGGER.info("We have Type A - '{}' - '{}'", a.getVar1(), a.getVar2());
}
@KafkaHandler
public void receiveB(@Payload TypeA) {
  LOGGER.info("We have Type B - '{}' - '{}'", a.getVar3(), a.getVar4());
}

这很好,但我失去了批处理能力。

如果我启用批处理模式,那么它似乎只需要 ArrayList 的处理程序,并且您不能为不同类型设置单独的处理程序。

这里有一些中间立场吗?有什么方法可以使用KafkaHandler 处理单个记录,但是一旦处理程序处理了所有记录(以处理确认和数据库提交)就会触发一些东西,或者有比在第一个代码使用很多 if 语句实例?

【问题讨论】:

    标签: apache-kafka spring-kafka


    【解决方案1】:

    @KafkaHandler 目前不支持批处理侦听器; 请打开GitHub issue - 我们应该能够从泛型参数类型中正确检测到泛型列表内容类型。

    您也许可以使用自定义BatchToRecordAdapter 来调用记录级侦听器,并在批处理中的最后一条消息中设置一些标志以表明它是最后一条。

    https://docs.spring.io/spring-kafka/docs/current/reference/html/#transactions-batch

    编辑

    支持@KafkaHandler 没有意义 - 批处理可能包含混合类型。

    【讨论】:

    • 查看我对BatchToRecordAdapter的评论。
    • 经过深思熟虑;在这里支持@KafkaHandler 没有意义 - 批处理可能包含混合类型。
    猜你喜欢
    • 2018-12-28
    • 1970-01-01
    • 2019-05-27
    • 2018-07-15
    • 1970-01-01
    • 2019-10-26
    • 1970-01-01
    • 2016-03-29
    • 2021-06-08
    相关资源
    最近更新 更多