【问题标题】:Multithreading consumption of an ordered queue有序队列的多线程消费
【发布时间】:2015-09-08 02:43:47
【问题描述】:

我有一个来自第三方消息队列的对象增量流(即描述对其他对象的更改的 JSON 对象)。我需要将这些应用于数据库中的适当对象(将增量转换为状态)。增量本质上是有序的。

事实上,我打算将这些增量通过管道传输到我们自己的 RabbitMQ 集群中,从那里一组 Java 服务器将提取它们,然后将它们应用到数据库(Java 是数据库更新逻辑的集中位置)。

增量的应用程序需要是多线程的,但我想确保给定对象的增量始终按顺序应用。为了真正保证这一点,只有一个线程可以处理给定对象的增量。

为此,当我从第三方队列中读取它们并将它们放入 RabbitMQ 之前,我想我可以通过相应对象的 uuid 将增量拆分为队列。基本上,每个增量都有一个object_uuid 字段,我会将该 uuid 取模为 50,然后将结果用作路由键,这样我将在 RabbitMQ 中有 50 个有序增量队列。

那时,只需(呵呵)确保每个队列有一个消费者(尽管每个消费者仍然可以有多个队列)。我认为 AMQP 中队列声明的“独占”参数可能会给我所需的行为,而且确实如此,但不幸的是,它带来了令人望而却步的副作用,即当消费者断​​开连接时队列被删除(这是一组 Java每次发布都会启动和关闭的服务器 - 队列必须在发布之间持续存在)。

这不会是一个罕见的困境,但我没有看到任何完全适合这个问题的东西。 RabbitMQ 或 AMQP 中是否没有我可以在这里使用的构造?有没有办法我可以重新考虑这个问题来避免这个问题?还是我需要查看分布式锁定解决方案?

【问题讨论】:

  • object deltas 到底是什么意思?您是否有机会举例说明这些增量是什么以及它们是否都适用于单个对象或者它们适用于多个对象,每个对象都有多个增量?
  • @kha,我已经更新了问题以澄清。每个增量都有一个对象的外键,增量需要应用到该对象。
  • 所以基本上你的问题归结为如果消费者断开连接,是否有一种方法可以让 rabbitmq 持久保存数据?
  • 我感觉到这里有相互冲突的要求:“增量本质上是有序的……增量的应用程序需要是多线程的。”必须按特定顺序完成的事情应该由单个线程按该顺序完成。也许使用单个线程将增量与它们应用的对象进行匹配是有意义的,然后将 (delta, object) 元组交给工作线程进行实际处理。
  • @jameslarge 这实际上是我的第四段所描述的。单个进程根据 uuid 将 deltas 拆分为 RabbitMQ 队列。

标签: java multithreading rabbitmq amqp


【解决方案1】:

这是我从您的问题中了解到的:您基本上有 N 对象,每个对象都有 M* 状态。您希望每个N 对象在不同的​​进程/线程上运行,但属于n(来自N)的M* 状态按顺序应用。

您提出的解决方案对我来说看起来不错。我要做的是:

为每个对象创建一个单独的队列(称为N'),这基本上是您帖子中对象的 UUID。

然后你就有了一个服务器/分发器,它服务于三个目的:

  1. 创建一个持久队列N'
  2. 将您的一个代理池随机分配给该主题并与他们交流
  3. 每隔一段时间收听来自这些代理的心跳消息,以确保它们还活着。如果不是,请随机选择另一个可用的代理,然后将队列分配给他们。

为了使这个分发器也能安全地崩溃/重新启动,它可能应该在启动时检查现有队列并确保将它们分配给某个代理。如果没有,它应该尽快分配它们。

您的代理人自己负责:

  1. 监听分配器的队列分配
  2. 处理分配队列中的项目(M' 个状态)
  3. 向分发服务器发送心跳

确保每个代理都是exclusive 的工作由分销商负责。这不应该是其他任何人的责任,而且您不能真正安全地将这项工作交给消息传递技术,或者至少我不知道有任何队列可以处理这个问题,但上述解决方案有望以某种方式帮助您解决问题。

可能出现的另一个问题是何时删除队列。当代理处理完所有内容后,您可以在代理自己中执行此操作,但您需要确保他们只在与分发者共享锁中执行此操作。您不希望删除一个您认为为空但分发器即将写入的队列。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-08-30
    • 1970-01-01
    • 1970-01-01
    • 2021-11-14
    • 2011-03-13
    相关资源
    最近更新 更多