【发布时间】:2018-09-26 08:58:00
【问题描述】:
我正在实施一项日常工作,该工作从 MongoDB(大约 300K 个文档)获取数据,并为每个人在 RabbitMQ 队列上发布一条消息。 另一方面,我在同一个队列中有一些消费者,理想情况下它们应该并行工作。
一切正常,但不如我所愿,特别是在消费者表现方面。
这是我声明队列的方式:
rabbitMQ.getChannel().queueDeclare(QUEUE_NAME, true, false, false, null);
这是发布的方式:
rabbitMQ.getChannel().basicPublish("", QUEUE_NAME, null, body.getBytes());
所以用来声明队列的通道是用来发布所有消息的。
这就是消费者在 for 循环中的实例化方式(总共 10 个,但可以是任意数字):
Channel channel = rabbitMQ.getConnection().createChannel();
MyConsumer consumer = new MyConsumer(customMapper, channel, subscriptionUpdater);
channel.basicQos(1); // also tried with 0, 10, 100, ...
channel.basicConsume(QUEUE_NAME, false, consumer);
因此,我为每个消费者创建了一个新频道,这已通过日志确认:
...
com.rabbitmq.client.impl.recovery.AutorecoveringChannel@bdd2027
com.rabbitmq.client.impl.recovery.AutorecoveringChannel@5d1b9c3d
com.rabbitmq.client.impl.recovery.AutorecoveringChannel@49a26d19
...
据我从很短的 RabbitMQ 经验中了解到,这应该保证调用所有的消费者。 顺便说一句,消费者需要 0.5 到 1.2 秒来完成他们的任务。我刚刚发现只有很少的 3 秒。
我有两个单独的队列,我重复上面所说的两次(使用相同的 RabbitMQ 连接)。
所以,我测试了为每个队列发布 100 条消息。他们都有 10 个消费者,qos=1。
我没想到会有 10/s 的准确交付/消耗性能,但我注意到:
- 实际值在 0.4 和 1.0 左右。
- 至少绑定到队列的所有消费者都收到了消息,但看起来不像“公平调度”。
- 消耗两个队列上的所有消息大约需要 3 分 30 秒。
我是否错过了 RabbitMQ 中线程的主要概念?或者任何可能仍处于默认值的特定配置? 我从几天前就开始了,所以这可能是可能的。
请注意,我很幸运,我可以同时控制发布和消费部分:)
我在本地使用的是 RabbitMQ 3.7.3,所以这不可能是任何网络延迟问题。
感谢您的帮助!
【问题讨论】:
-
您是否查看过使用消息的应用程序的堆栈跟踪?是否有 10 个线程从 RabbitMQ 读取?那10个线程在做什么?我怀疑应用程序逻辑中存在阻塞消费者的东西。如果您随着时间的推移查看一些堆栈跟踪,您应该会得到一个提示。
-
@RobMoore 仔细检查堆栈跟踪,我看到消费者正在相互竞争以查找和更新 mongodb 文档。事实上,如果我把那部分注释掉,我每秒可以消费几十条消息。我现在正在重构代码,试图在发布端移动数据库更新,让消费者只做最简单的任务。
-
在一个分发客户端工作的频道上只有一个客户端消费者怎么样?
标签: java mongodb rabbitmq amqp mongodb-java