【发布时间】:2021-08-14 13:32:03
【问题描述】:
我们的程序正在使用队列。 多个消费者正在处理消息。
消费者执行以下操作:
- 从队列接收 on 或 off 状态消息。
- 从存储库获取最新状态。
- 比较存储库的状态和从消息中收到的状态。
- 如果开/关状态不同,请更新数据。 (此时,其他相关数据也在更新。)
假设这个过程由多个消费者处理,预计会出现以下问题。
- 生产者发送消息 1:开启、2:关闭和 3:开启。
- 消费者 A 收到消息 #1 并将消息 #1 存储在存储中,因为没有最新数据。
- 消费者 A 收到消息 #2。
- 此时,消费者B同时收到消息#3。
- 消费者 A 和 B 同时从存储中读取最新数据(消息 1)。
- 消费者 B 先完成处理。不要更新存储库,因为开/关状态没有改变。(1:开,3:开)
- 然后消费者 A 完成处理。开/关状态已更改,因此它处理并保存工作。 (1:开,2:关)
在正常情况下,数据库中剩余的最新数据应开启。
(这是因为消息是按照开启 -> 关闭 -> 开启的顺序发送的。)
但是,根据上述情况,off 仍然是最新数据。
有什么好的方法可以解决这个问题吗? 作为参考,我们使用的队列使用 AWS Amazon MQ,而存储使用的是 AWS dynamoDB。并使用 Spring Boot。
【问题讨论】:
-
这是一个高并发系统 -> 意味着您可能无法减少消费者?如果是,那么您需要某种锁定机制,即只有一个消费者可以更新特定行,其他消费者需要按顺序等待,直到锁被释放
-
这是否意味着数据库锁定?
-
好吧,您可以为您的消息引入时间戳,并且仅当当前消息具有更新的时间戳时才更新数据库。这可以在更新查询中检查,例如类似于“... SET state = x WHERE timestamp 3 < 2 为假。
-
由于状态变化很重要,我不能跳过第2步。
-
阅读this 了解如何在 ActiveMQ 中保证交付顺序(可能选择 JMSXGroupID)。您可能需要JmsTemplate,因为您正在使用 Spring。对于非平凡的情况,您可能还需要消息幂等性。 This 很有趣,可以帮助您入门。
标签: java spring concurrency queue jms