【问题标题】:How do you handle Amazon Kinesis Record duplicates?您如何处理 Amazon Kinesis Record 重复项?
【发布时间】:2017-08-20 20:12:53
【问题描述】:

根据 Amazon Kinesis Streams documentation,一条记录可以传送多次。

确保每条记录只处理一次的唯一方法是将它们临时存储在支持完整性检查的数据库中(例如 DynamoDB、Elasticache 或 MySQL/PostgreSQL),或者只检查每个 Kinesis 分片的 RecordId。

你知道处理重复的更好/更有效的方法吗?

【问题讨论】:

    标签: amazon-web-services amazon-dynamodb amazon-kinesis amazon-elasticache amazon-elasticsearch


    【解决方案1】:

    在为移动应用构建遥测系统时,我们恰好遇到了这个问题。在我们的例子中,我们也不确定生产者在哪里只发送每条消息一次,因此对于每条接收到的记录,我们即时计算其 MD5 并检查它是否以某种形式的持久存储呈现,但实际上要使用的存储是最棘手的一点。

    首先,我们尝试了普通的关系数据库,但它很快成为整个系统的主要瓶颈,因为这不仅是读取繁重的情况,而且是写入繁重的情况,因为通过 Kinesis 的数据量非常大。

    我们最终得到了一个 DynamoDB 表,用于存储每条唯一消息的 MD5。我们遇到的问题是删除消息并不是那么容易 - 即使我们的表包含分区键和排序键,DynamoDB 也不允许删除具有给定分区键的所有记录,我们必须查询所有记录才能获得排序键值(这会浪费时间和容量)。不幸的是,我们不得不偶尔简单地放下整张桌子。另一种次优解决方案是定期轮换存储消息标识符的 DynamoDB 表。

    不过,最近 DynamoDB 引入了一个非常方便的功能 - Time To Live,这意味着现在我们可以通过启用基于每条记录的自动到期来控制表的大小。从这个意义上说,DynamoDB 似乎与 ElastiCache 非常相似,但是 ElastiCache(至少是 Memcached 集群)的持久性要差得多 - 那里没有冗余,并且在操作规模扩大或发生故障的情况下,驻留在终止节点上的所有数据都会丢失。

    【讨论】:

    • 嗨,德米特里。我正在使用类似于此处解释的 JustGiving 基础架构的东西运行几个基准测试:aws.amazon.com/blogs/compute/…。为什么要为 DDB 表计算 MD5 校验和而不是使用 Shardid + SequenceNumber?
    • 嗨@Antonio。在我们的例子中,生产者可能会多次发布相同的消息。如果是这种情况,那么 Kinesis 无论如何都会将它们视为不同的消息(仅仅是因为有 2 个或更多来自生产者的帖子)。因为我们知道每条消息都必须是唯一的,所以我们简单地忽略了 md5 已经看到的消息。此外,md5 由生产者计算,为消费者节省了一些计算时间(考虑到通过 Kinesis 的数据量相对较大)。
    • 只是想扔掉 - AWS 指出,由于错误情况,不同的生产者自然可以多次生成相同的记录,而且更常见的是,多个消费者可以提取同一组记录。我现在也在我们的系统上处理这个问题。我们使用elasticsearch,目前的计划是使用elastics内置的版本控制来确保同一记录不会同时更新,然后memozie应用到记录本身的记录的最近事件列表。
    【解决方案2】:

    您提到的事情是所有采用“至少一次”方法的队列系统的普遍问题。此外,不仅仅是队列系统,生产者和消费者都可能多次处理相同的消息(由于 ReadTimeout 错误等)。 Kinesis 和 Kafka 都使用这种范式。不幸的是,没有一个简单的答案。

    您也可以尝试使用“exactly-once”消息队列,采用更严格的事务方法。例如,AWS SQS 就是这样做的:https://aws.amazon.com/about-aws/whats-new/2016/11/amazon-sqs-introduces-fifo-queues-with-exactly-once-processing-and-lower-prices-for-standard-queues/。请注意,SQS 吞吐量远小于 Kinesis。

    要解决您的问题,您应该了解您的应用程序域并尝试按照您的建议在内部解决它(数据库检查)。尤其是当你与外部服务(比如邮件服务器)通信时,你应该能够恢复操作状态以防止重复处理(因为在电子邮件服务器示例中重复发送,可能会导致多个副本收件人邮箱中的相同帖子)。

    另请参阅以下概念;

    1. 至少一次交付:http://www.cloudcomputingpatterns.org/at_least_once_delivery/
    2. 一次性交付:http://www.cloudcomputingpatterns.org/exactly_once_delivery/
    3. 幂等处理器:http://www.cloudcomputingpatterns.org/idempotent_processor/

    【讨论】:

    • 感谢您的回答。由于高吞吐量,我无法使用 SQS。高吞吐量也是我使用不同的持久存储(Mysql / PgSQL / Aurora / ElasticSearch / DynamoDB)对几种解决方案进行基准测试的原因。临时存储事件 ID 的最佳方式是 Redis,但 ElastiCache 无法授予您数据持久性。这就是为什么我一直在寻找替代方法。
    • Redis 为您提供严格的 tx 跟踪,但它是单节点,RDS 太慢,您是对的。 DynamoDB 似乎是您唯一的 PaaS 解决方案。但是,如果您想管理 EC2 实例,可以尝试内存集群解决方案,例如 Hazelcast 或 VoltDB(在许多 r3 节点上)?
    • 内存数据库不耐用。如果您的 Hazelcast 集群失败,您将无法了解您已经处理了哪些消息。 :(
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-13
    • 2018-07-27
    • 1970-01-01
    • 1970-01-01
    • 2021-01-13
    • 1970-01-01
    相关资源
    最近更新 更多