【发布时间】:2021-05-15 02:30:45
【问题描述】:
TL/DR;是否可以在批处理模式下为不同的 JSON 类型使用单独的 KafkaHandler?
我正在使用一个包含多个不同 JSON 消息的主题。我正在处理数据并将其插入数据库,因此我将批量处理,并在将所有内容插入数据库后手动执行 Kafka 提交。
所以我有我的工厂,其中包括
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
原厂设置
factory.setBatchListener(true);
为了测试,我有两个 POJO,它们是带有字符串 var1 和 var2 的 TypeA 和带有字符串(var3 和 var4)的 TypeB。然后我的消费者有一个方法,它本质上是:
@KafkaListener(topics = "${kafka.topics}");
public void receive (List<Object> data, Acknowledgement ack) {
for (int i = 0; i < data.size(); i++) {
Object d = data.get(i);
if (d instanceof TypeA) {
LOGGER.info("We have Type A - '{}' - '{}'", a.getVar1(), a.getVar2());
}
if (d instancef TypeB) {
LOGGER.info("We have Type B - '{}' - '{}'", b.getVar3(), a.getVar4());
}
}
ack.acknowledge();
}
这可行,但我一直在尝试使用 KafkaHandlers 为每种类型而不是使用 instanceof。
如果我删除该行以启用批处理并将KafkaListener 注释移动到类级别,我可以创建单独的处理程序
@KafkaHandler
public void receiveA(@Payload TypeA) {
LOGGER.info("We have Type A - '{}' - '{}'", a.getVar1(), a.getVar2());
}
@KafkaHandler
public void receiveB(@Payload TypeA) {
LOGGER.info("We have Type B - '{}' - '{}'", a.getVar3(), a.getVar4());
}
这很好,但我失去了批处理能力。
如果我启用批处理模式,那么它似乎只需要 ArrayList 的处理程序,并且您不能为不同类型设置单独的处理程序。
这里有一些中间立场吗?有什么方法可以使用KafkaHandler 处理单个记录,但是一旦处理程序处理了所有记录(以处理确认和数据库提交)就会触发一些东西,或者有比在第一个代码使用很多 if 语句实例?
【问题讨论】: