【问题标题】:How to solve the queue multi-consuming concurrency problem?如何解决队列多消费并发问题?
【发布时间】:2021-08-14 13:32:03
【问题描述】:

我们的程序正在使用队列。 多个消费者正在处理消息。

消费者执行以下操作:

  1. 从队列接收 onoff 状态消息。
  2. 从存储库获取最新状态。
  3. 比较存储库的状态和从消息中收到的状态。
  4. 如果开/关状态不同,请更新数据。 (此时,其他相关数据也在更新。)

假设这个过程由多个消费者处理,预计会出现以下问题。

  1. 生产者发送消息 1:开启、2:关闭和 3:开启。
  2. 消费者 A 收到消息 #1 并将消息 #1 存储在存储中,因为没有最新数据。
  3. 消费者 A 收到消息 #2。
  4. 此时,消费者B同时收到消息#3。
  5. 消费者 A 和 B 同时从存储中读取最新数据(消息 1)。
  6. 消费者 B 先完成处理。不要更新存储库,因为开/关状态没有改变。(1:开,3:开)
  7. 然后消费者 A 完成处理。开/关状态已更改,因此它处理并保存工作。 (1:开,2:关)

在正常情况下,数据库中剩余的最新数据应开启
(这是因为消息是按照开启 -> 关闭 -> 开启的顺序发送的。)
但是,根据上述情况,off 仍然是最新数据。

有什么好的方法可以解决这个问题吗? 作为参考,我们使用的队列使用 AWS Amazon MQ,而存储使用的是 AWS dynamoDB。并使用 Spring Boot。

【问题讨论】:

  • 这是一个高并发系统 -> 意味着您可能无法减少消费者?如果是,那么您需要某种锁定机制,即只有一个消费者可以更新特定行,其他消费者需要按顺序等待,直到锁被释放
  • 这是否意味着数据库锁定?
  • 好吧,您可以为您的消息引入时间戳,并且仅当当前消息具有更新的时间戳时才更新数据库。这可以在更新查询中检查,例如类似于“... SET state = x WHERE timestamp 3 < 2 为假。
  • 由于状态变化很重要,我不能跳过第2步。
  • 阅读this 了解如何在 ActiveMQ 中保证交付顺序(可能选择 JMSXGroupID)。您可能需要JmsTemplate,因为您正在使用 Spring。对于非平凡的情况,您可能还需要消息幂等性。 This 很有趣,可以帮助您入门。

标签: java spring concurrency queue jms


【解决方案1】:

这里的根本问题是您需要按顺序使用这些“状态”消息,但是您使用的并发消费者会导致竞争条件和无序消息处理。简而言之,您使用并发消费者的基本架构导致了这个问题。

您可以按照 cmets 中的建议在数据库中使用时间戳制定某种解决方案,但这对于客户端和存储在数据库中的额外数据来说将是额外的工作,这并不是绝对必要的。

解决问题的最简单方法是串行使用消息,而不是同时使用。有几种不同的方法可以做到这一点,例如:

  • 使用“状态”消息为队列定义一个消费者。
  • 使用 ActiveMQ 的“exclusive consumer”功能确保只有一个消费者接收消息。
  • 使用message groups 将所有“状态”消息组合在一起,以确保按顺序(即按顺序)处理它们。

【讨论】:

    猜你喜欢
    • 2015-02-02
    • 1970-01-01
    • 2023-03-20
    • 2013-06-09
    • 1970-01-01
    • 2012-08-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多