【发布时间】:2019-03-05 11:34:09
【问题描述】:
这是对 - Reading the same message several times from Kafka 的后续问题。如果有更好的方法可以在不发布新问题的情况下提出这个问题,请告诉我。在这篇文章中,Gary 提到了
“但是如果它们已经被检索到,你仍然会首先看到后面的消息,所以你也必须丢弃它们。”
在调用 seek() 之后,是否有一种干净的方法可以丢弃 poll() 已经读取的消息?我开始通过保存初始偏移量(在recordOffset中)来实现逻辑,并在成功时增加它。失败时,我调用 seek() 并将 recordOffset 的值设置为 record.offset()。然后对于每条新消息,我都会检查 record.offset() 是否大于 recordOffset。如果是,我只需调用确认(),从而“丢弃”所有先前读取的消息。这是代码 -
// in onMessage()...
if (record.offset() > recordOffset){
acknowledgment.acknowledge();
return;
}
try {
processRecord(record);
recordOffset = record.offset()+1;
acknowledgment.acknowledge();
} catch (Exception e) {
recordOffset = record.offset();
consumerSeekCallback.seek(record.topic(), record.partition(), record.offset());
}
这种方法的问题在于它会因多个分区而变得复杂。有没有更简单/更清洁的方法?
编辑 1 根据下面 Gary 的建议,我尝试添加这样的 errorHandler -
@KafkaListener(topicPartitions =
{@org.springframework.kafka.annotation.TopicPartition(topic = "${kafka.consumer.topic}", partitions = { "1" })},
errorHandler = "SeekToCurrentErrorHandler")
当我收到“无法解析方法'errorHandler'”时,此语法是否有问题?
编辑 2 在 Gary 解释了 2 个错误处理程序之后,我删除了上面的 errorHandler 并在下面添加到配置文件中 -
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(kafkaProps()));
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setErrorHandler(new SeekToCurrentErrorHandler());
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
return factory;
}
当我启动应用程序时,我现在收到此错误...
java.lang.NoSuchMethodError: org.springframework.util.Assert.state(ZLjava/util/function/Supplier;)V 在 org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.determineInferredType(MessagingMessageListenerAdapter.java:396)
这是第 396 行 -
Assert.state(!this.isConsumerRecordList || validParametersForBatch,
() -> String.format(stateMessage, "ConsumerRecord"));
Assert.state(!this.isMessageList || validParametersForBatch,
() -> String.format(stateMessage, "Message<?>"));
【问题讨论】: