【问题标题】:Spring Kafka Prevent Duplicate ReadsSpring Kafka 防止重复读取
【发布时间】:2021-03-18 17:56:41
【问题描述】:

我使用 spring kafka 2.x 来读取来自 kafka 的消息。

我的问题是如果我的方法在我的 kafkalistener 方法中花费了一段时间,那一次 kafka 发送相同的消息两次,这对我来说是个问题。如果我当时从 kafka 读取消息我想要什么 kafka 没有第二次向我发送此消息像 max.poll.interval.ms 这样的值不能保证一次读取消息。在 Spring Boot 中实现一次读取策略的正确方法是什么。在我的消息中,我没有密钥,因此我无法控制。

@KafkaListener(topics = "mytopics",groupId = "mygroup",concurrency = 3",containerFactory = "MyListenerContainerFactory")
void messageReceiver(@Payload String data, @Headers MessageHeaders headers) {

  String receivedTopic=  headers.get(KafkaHeaders.RECEIVED_TOPIC).toString();

        //DO something
 
}

【问题讨论】:

    标签: spring-boot apache-kafka kafka-consumer-api spring-kafka


    【解决方案1】:

    您需要在max.poll.interval.ms 内处理投票结果。所以增加它,或者减少max.poll.records

    如果处理时间过长,Kafka 没有防止重新投递的机制。

    【讨论】:

    • 嗨,gary,谢谢你的回复,baeldung.com/kafka-exactly-once 我读了一些关于但我有点困惑的文章,你确定吗?
    • 这是一个常见的混淆。当readwrite 都是Kafka 时,"Exactly Once" with Kafka 适用于完整的read->process->write 序列。保证完整的序列只会被处理一次。如果processwrite 步骤失败,readprocess 将重复(至少一次)。
    • 关键点是什么?另外我随时都在写kafka,我只想读一次这条消息?这是否意味着(Exactly Once)一种读取一个主题并同时写另一个主题的方法交易?
    • 是的,这就是 Kafka 中的唯一含义。完整的序列将只执行一次,但读取/处理部分可以重试。
    • 我猜你的应用程序需要处理重复的交付,例如通过跟踪收到的 messageIds 并撤回传入的进程内消息的副本,请参阅chrisrichardson.net/post/microservices/patterns/2020/10/16/…
    猜你喜欢
    • 2017-08-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-11-27
    • 1970-01-01
    • 1970-01-01
    • 2010-11-09
    • 2011-04-23
    相关资源
    最近更新 更多