【发布时间】:2015-09-08 02:43:47
【问题描述】:
我有一个来自第三方消息队列的对象增量流(即描述对其他对象的更改的 JSON 对象)。我需要将这些应用于数据库中的适当对象(将增量转换为状态)。增量本质上是有序的。
事实上,我打算将这些增量通过管道传输到我们自己的 RabbitMQ 集群中,从那里一组 Java 服务器将提取它们,然后将它们应用到数据库(Java 是数据库更新逻辑的集中位置)。
增量的应用程序需要是多线程的,但我想确保给定对象的增量始终按顺序应用。为了真正保证这一点,只有一个线程可以处理给定对象的增量。
为此,当我从第三方队列中读取它们并将它们放入 RabbitMQ 之前,我想我可以通过相应对象的 uuid 将增量拆分为队列。基本上,每个增量都有一个object_uuid 字段,我会将该 uuid 取模为 50,然后将结果用作路由键,这样我将在 RabbitMQ 中有 50 个有序增量队列。
那时,只需(呵呵)确保每个队列有一个消费者(尽管每个消费者仍然可以有多个队列)。我认为 AMQP 中队列声明的“独占”参数可能会给我所需的行为,而且确实如此,但不幸的是,它带来了令人望而却步的副作用,即当消费者断开连接时队列被删除(这是一组 Java每次发布都会启动和关闭的服务器 - 队列必须在发布之间持续存在)。
这不会是一个罕见的困境,但我没有看到任何完全适合这个问题的东西。 RabbitMQ 或 AMQP 中是否没有我可以在这里使用的构造?有没有办法我可以重新考虑这个问题来避免这个问题?还是我需要查看分布式锁定解决方案?
【问题讨论】:
-
object deltas到底是什么意思?您是否有机会举例说明这些增量是什么以及它们是否都适用于单个对象或者它们适用于多个对象,每个对象都有多个增量? -
@kha,我已经更新了问题以澄清。每个增量都有一个对象的外键,增量需要应用到该对象。
-
所以基本上你的问题归结为如果消费者断开连接,是否有一种方法可以让 rabbitmq 持久保存数据?
-
我感觉到这里有相互冲突的要求:“增量本质上是有序的……增量的应用程序需要是多线程的。”必须按特定顺序完成的事情应该由单个线程按该顺序完成。也许使用单个线程将增量与它们应用的对象进行匹配是有意义的,然后将 (delta, object) 元组交给工作线程进行实际处理。
-
@jameslarge 这实际上是我的第四段所描述的。单个进程根据 uuid 将 deltas 拆分为 RabbitMQ 队列。
标签: java multithreading rabbitmq amqp