【发布时间】:2011-04-06 07:02:20
【问题描述】:
假设我有一个名为通知的队列,我向该队列发布了 1000 条消息。我也有该队列的 20 个消费者。我希望这些消费者中的每一个从队列中获取一条消息,对其进行处理,然后获取下一条可用消息。
现在发生的情况是,一些消费者获取大量消息并处理它们,而其他消费者什么都不做。
下面是一个完整的测试用例,演示了我所看到的。实际上,消费者都是不同机器上的独立进程,但这完全复制了行为:
using System;
using System.Threading;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
using Spring.Messaging.Nms.Core;
namespace QueueTest
{
class Program
{
private static object _syncObj = new object();
private static int _row;
static void Main()
{
var connectionFactory = new ConnectionFactory("tcp://localhost:61616");
var template = new NmsTemplate(connectionFactory);
for (int i = 0; i < 500; i++)
{
template.ConvertAndSend("notifications", "hello");
}
for (int i = 0; i < 10; i++)
{
ThreadPool.QueueUserWorkItem(Spawn, i);
}
Console.ReadKey();
}
static void Spawn(object o)
{
int count = 0;
int threadId = (int)o;
var ts = new TimeSpan(0,0,0,5);
while (true)
{
var connectionFactory = new ConnectionFactory(
"tcp://localhost:61616"
+ "?nms.PrefetchPolicy.QueuePrefetch=1",
String.Format("{0}:{1}:{2}",
Environment.MachineName,
threadId,
"notifications"));
using (var conn = connectionFactory.CreateConnection())
{
conn.ClientId = String.Format("{0}:{1}:{2}", Environment.MachineName, threadId, "notifications");
conn.Start();
using (var session = conn.CreateSession())
{
var queue = session.GetQueue("notifications");
using (var consumer = session.CreateConsumer(queue))
{
IMessage msg = consumer.Receive(ts);
while (msg != null)
{
lock (_syncObj)
{
Interlocked.Increment(ref _row);
Console.WriteLine("{0}. {1} processed {2}", _row, threadId, ++count);
if (_row == 500)
{
Environment.Exit(0);
}
}
Thread.Sleep(1000);
msg = consumer.Receive(ts);
}
Console.WriteLine("{0} nothing to process", threadId);
}
}
conn.Stop();
}
}
}
}
}
如何让它更均匀地向所有消费者分发消息?
【问题讨论】: