【问题标题】:Avoid duplication of Kafka producer message避免 Kafka 生产者消息的重复
【发布时间】:2020-08-11 06:23:49
【问题描述】:

我正在使用 Spring boot.Java 8 中的KafkaTemplate

我的主要目标是消费者不应该两次消费消息。

1) 调用表获取100行并发送给kafka

2)假设我处理了 70 行(我得到了成功 ack)然后 Kafka 宕机了(Kafka 没有在 RETRY 机制定时内恢复)

所以当我重新启动 spring boot 应用程序时,我如何确保不再发送这 70 条消息。

一个选项是我可以在 DB 表消息is_sent = Y or N 中有标志。

还有其他有效的方法吗?

【问题讨论】:

    标签: java apache-kafka spring-kafka kafka-producer-api


    【解决方案1】:

    我会使用JDBC source connector(取决于您当前使用的数据库)和Kafka Connect 来正确处理这种情况。


    如果您仍然想编写自己的生产者,Kafka FAQ 的this section 应该很有用:

    如何从 Kafka 获取一次性消息?

    Exactly once 语义有两部分:避免数据期间的重复 生产 并避免在数据消费过程中出现重复。

    有两种方法可以在数据期间获得恰好一次的语义 生产:

    1. 每个分区和每次获得网络时使用单个写入器 错误检查该分区中的最后一条消息以查看您的最后一条消息是否 写入成功
    2. 在 消息并在消费者上删除重复数据。

    如果你做这些事情之一,Kafka 托管的日志将是 无重复。然而,没有重复的阅读取决于一些 也来自消费者的合作。如果消费者定期 检查点它的位置然后如果它失败并重新启动它将 从检查点位置重新开始。因此,如果数据输出和 检查点不是以原子方式编写的,它可以得到 这里也重复。这个问题是您的存储所特有的 系统。例如,如果您使用的是数据库,您可以提交 这些一起在一个事务中。 LinkedIn 的 HDFS 加载器 Camus 为 Hadoop 负载编写了类似的操作。另一种选择 不需要事务的是将偏移量与 使用主题/分区/偏移量加载和重复数据删除 组合。

    我认为有两个改进可以让这变得更容易:

    1. 生产者幂等性可以自动完成,而且成本更低 通过可选地在服务器上集成对此的支持。
    2. 现有的 高级消费者不会暴露很多更细粒度的 控制偏移量(例如重置您的位置)。我们将工作 很快就到了

    【讨论】:

    • 我不是消费者而是生产者
    • 好的,但我不知道如何使用 Kafka Connect。也许我会坚持我在 DB 中创建标志的想法。谢谢您的回答。
    • @amitjain 我向您保证,Kafka Connect 可以为您节省大量时间和精力 :)
    • 我想制作幂等生产者。所以你的意思是制作幂等生产者会解决我的问题/?
    • 幂等生产者不起作用..我只是尝试用相同的消息值对密钥进行硬编码。所有重复的消息都去了
    【解决方案2】:

    对于 Kafka,我已经看到了存储指向 id 的指针以跟踪您在主题中的位置,并使用某种分布式存储在集群级别跟踪这一点的实现。我在那里没有做太多的工作,所以我将尝试提供我们与 SQS 一起使用的用于 dup 检测的解决方案。 Kafka 可能有一个比这个更好的解决方案来解决重复问题,只是想在那里添加,以便您也可以查看替代解决方案。

    我在使用 AWS SQS 处理点对点消息传递用例时遇到了同样的问题,因为它提供了至少一次交付保证,而不是一次且仅一次。

    我们最终使用 Redis 及其分布式锁定策略来解决这个问题。我在这里写了https://angularthinking.blogspot.com/

    高级方法是创建一个分布式锁,以根据您的用例将具有适当 TTL 的条目放入缓存中。我们使用 LUA 脚本执行 putIfNotExists() 方法,如上面的博客所示。扩展是我们关注的问题之一,通过上述实现,我们能够在 SQS 中每秒处理成千上万条消息而没有任何问题,并且 redis 扩展得很好。我们必须根据吞吐量和缓存增长将 TTL 调整为最佳值。我们确实有复制窗口的好处是 24 小时或更短,所以这个决定取决于 redis 是可以的。如果您有更长的窗口,可能会在几天或几个月内发生重复,那么 redis 选项可能不适合。

    我们还研究了 DynamoDB 来实现 putIfNotExists(),但对于这个用例,redis 似乎性能更高,尤其是在其使用 LUA 脚本的原生 putIfNotExists 实现中。

    祝您搜索顺利。

    【讨论】:

      猜你喜欢
      • 2022-08-16
      • 1970-01-01
      • 2015-06-21
      • 1970-01-01
      • 1970-01-01
      • 2023-01-21
      • 1970-01-01
      • 2017-01-04
      • 1970-01-01
      相关资源
      最近更新 更多