【问题标题】:Spring Kafka: How to discard messages already retrieved by poll() after doing a seek()?Spring Kafka:如何在执行 seek() 后丢弃 poll() 已检索到的消息?
【发布时间】:2019-03-05 11:34:09
【问题描述】:

这是对 - Reading the same message several times from Kafka 的后续问题。如果有更好的方法可以在不发布新问题的情况下提出这个问题,请告诉我。在这篇文章中,Gary 提到了

“但是如果它们已经被检索到,你仍然会首先看到后面的消息,所以你也必须丢弃它们。”

在调用 seek() 之后,是否有一种干净的方法可以丢弃 poll() 已经读取的消息?我开始通过保存初始偏移量(在recordOffset中)来实现逻辑,并在成功时增加它。失败时,我调用 seek() 并将 recordOffset 的值设置为 record.offset()。然后对于每条新消息,我都会检查 record.offset() 是否大于 recordOffset。如果是,我只需调用确认(),从而“丢弃”所有先前读取的消息。这是代码 -

    // in onMessage()...
    if (record.offset() > recordOffset){
        acknowledgment.acknowledge();
        return;
    }

    try {
        processRecord(record);
        recordOffset = record.offset()+1;
        acknowledgment.acknowledge();
    } catch (Exception e) {
        recordOffset = record.offset();
        consumerSeekCallback.seek(record.topic(), record.partition(), record.offset());
    }

这种方法的问题在于它会因多个分区而变得复杂。有没有更简单/更清洁的方法?

编辑 1 根据下面 Gary 的建议,我尝试添加这样的 errorHandler -

@KafkaListener(topicPartitions =
        {@org.springframework.kafka.annotation.TopicPartition(topic = "${kafka.consumer.topic}", partitions = { "1" })},
        errorHandler = "SeekToCurrentErrorHandler")

当我收到“无法解析方法'errorHandler'”时,此语法是否有问题?

编辑 2 在 Gary 解释了 2 个错误处理程序之后,我删除了上面的 errorHandler 并在下面添加到配置文件中 -

@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(kafkaProps()));
    factory.getContainerProperties().setAckOnError(false);
    factory.getContainerProperties().setErrorHandler(new SeekToCurrentErrorHandler());
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
    return factory;
}

当我启动应用程序时,我现在收到此错误...

java.lang.NoSuchMethodError: org.springframework.util.Assert.state(ZLjava/util/function/Supplier;)V 在 org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.determineInferredType(MessagingMessageListenerAdapter.java:396)

这是第 396 行 -

Assert.state(!this.isConsumerRecordList || validParametersForBatch,
            () -> String.format(stateMessage, "ConsumerRecord"));
Assert.state(!this.isMessageList || validParametersForBatch,
            () -> String.format(stateMessage, "Message<?>"));

【问题讨论】:

    标签: apache-kafka spring-kafka


    【解决方案1】:

    从2.0.1版本开始,如果容器的ErrorHandlerRemainingRecordsErrorHandler,比如SeekToCurrentErrorHandler,剩下的记录(包括失败的记录)将被发送到错误处理程序而不是监听器。

    这允许SeekToCurrentErrorHandler 重新定位每个分区,以便下一次轮询将返回未处理的记录。

    /**
     * An error handler that seeks to the current offset for each topic in the remaining
     * records. Used to rewind partitions after a message failure so that it can be
     * replayed.
     *
     * @author Gary Russell
     * @since 2.0.1
     *
     */
    public class SeekToCurrentErrorHandler implements RemainingRecordsErrorHandler 
    

    编辑

    有两种类型的错误处理程序。 KafkaListenerErrorHandler(在注释中指定)在侦听器级别工作;它连接到调用 @KafkaListener 注释的侦听器适配器,因此只能访问当前记录。

    第二个错误处理程序(在侦听器容器上配置)在容器级别工作,因此可以访问剩余的记录。 SeekToCurrentErrorHandler 是容器级错误处理程序。

    在容器工厂的容器属性上配置...

    @Bean
    public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(this.consumerFactory);
        factory.getContainerProperties().setAckOnError(false);
        factory.getContainerProperties().setErrorHandler(new SeekToCurrentErrorHandler());
        factory.getContainerProperties().setAckMode(AckMode.RECORD);
        return factory;
    }
    

    【讨论】:

    • 加里 - 感谢您的建议。添加 errorHandler 时出现错误(发布在已编辑问题中)。另外,我是 gradle 新手,不确定仅更新 build.gradle 中的版本是否足以升级到 2.0.1。我添加了-编译'org.springframework.kafka:spring-kafka:2.0.1.RELEASE'。我会继续努力......只是想让你知道我在哪里被困住了。
    • Gary - 我更新了我的代码以在容器级别设置错误处理程序,如您所示。现在我在启动时遇到断言失败。我在问题中添加了一些细节。知道为什么吗?
    • Spring Kafka 2.x 需要 Spring Framework 5(当前为 5.0.2)。
    • 升级到 2.0.1 花了一整天的时间......但最后我能够测试它并且它有效!接下来我会用更多的消息做一些更彻底的测试,以确保万无一失。谢谢加里!
    • 我可能做错了什么,但上面的代码在spring-kafka-2.2.9.RELEASE 中不起作用。错误是The method setErrorHandler(SeekToCurrentErrorHandler) is undefined for the type ContainerProperties
    【解决方案2】:

    你走对了,是的,你还必须处理不同的分区。有FilteringMessageListenerAdapter,但还是要自己写逻辑。

    【讨论】:

    • 另见SeekToCurrentErrorHandler(在我的回答中解释)。
    • 如果由于某种原因我无法将我们所有的服务迁移到 2.0.1,很高兴知道我正在考虑这个权利。我宁愿不必实现这个逻辑......它肯定是一团糟。
    猜你喜欢
    • 2021-10-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多