【问题标题】:ActiveMQ - Consumers not sharing the loadActiveMQ - 消费者不分担负载
【发布时间】:2011-04-06 07:02:20
【问题描述】:

假设我有一个名为通知的队列,我向该队列发布了 1000 条消息。我也有该队列的 20 个消费者。我希望这些消费者中的每一个从队列中获取一条消息,对其进行处理,然后获取下一条可用消息。

现在发生的情况是,一些消费者获取大量消息并处理它们,而其他消费者什么都不做。

下面是一个完整的测试用例,演示了我所看到的。实际上,消费者都是不同机器上的独立进程,但这完全复制了行为:

using System;
using System.Threading;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
using Spring.Messaging.Nms.Core;

namespace QueueTest
{
    class Program
    {
        private static object _syncObj = new object();
        private static int _row;

        static void Main()
        {
            var connectionFactory = new ConnectionFactory("tcp://localhost:61616");
            var template = new NmsTemplate(connectionFactory);

            for (int i = 0; i < 500; i++)
            {
                template.ConvertAndSend("notifications", "hello");
            }

            for (int i = 0; i < 10; i++)
            {
                ThreadPool.QueueUserWorkItem(Spawn, i);
            }

            Console.ReadKey();
        }

        static void Spawn(object o)
        {
            int count = 0;
            int threadId = (int)o;

            var ts = new TimeSpan(0,0,0,5);
            while (true)
            {
                var connectionFactory = new ConnectionFactory(
                                "tcp://localhost:61616"
                                + "?nms.PrefetchPolicy.QueuePrefetch=1",
                                String.Format("{0}:{1}:{2}", 
                Environment.MachineName, 
                threadId, 
                "notifications"));
                using (var conn = connectionFactory.CreateConnection())
                {
                    conn.ClientId = String.Format("{0}:{1}:{2}", Environment.MachineName, threadId, "notifications");
                    conn.Start();

                    using (var session = conn.CreateSession())
                    {
                        var queue = session.GetQueue("notifications");
                        using (var consumer = session.CreateConsumer(queue))
                        {
                            IMessage msg = consumer.Receive(ts);
                            while (msg != null)
                            {
                                lock (_syncObj)
                                {
                                    Interlocked.Increment(ref _row);
                                    Console.WriteLine("{0}.  {1} processed {2}", _row, threadId, ++count);
                                    if (_row == 500)
                                    {
                                        Environment.Exit(0);
                                    }
                                }
                                Thread.Sleep(1000);
                                msg = consumer.Receive(ts);
                            }
                            Console.WriteLine("{0} nothing to process", threadId);
                        }
                    }
                    conn.Stop();
                }
            }
        }
    }
}

如何让它更均匀地向所有消费者分发消息?

【问题讨论】:

    标签: c# activemq


    【解决方案1】:

    问题围绕消费者的预取缓冲区展开。默认情况下,消费者将从代理中预取大约 1000 条消息。您需要为消费者设置预取,以便他们允许其他消费者获取一些消息,在您的情况下,您可能希望将预取设置为一个,以便每个消费者平均分担负载。查看 NMS.ActiveMQ API 中的预取策略。

    您可以在连接 URI 上设置预取,它看起来类似于以下 tcp://localhost:61616?nms.PrefetchPolicy.QueuePrefetch=1

    【讨论】:

    • 我已经尝试了大约十几种 prefetchPolicy=1 的变体,包括你所拥有的、consumer.prefetchPolicy=1 和 consumer.prefetchPolicy=0 和 jms.prefetchPolicy=1 等。它们都展示了相同的行为:前 XX 个消费者预取所有消息。
    • 好像漏掉了.QueuePrefetch部分,不能设置整个策略,看一下src/test/csharp文件夹下的NMSConnectionFactoryTest.cs文件,里面有一个测试用例设置预取限制。 v1.5.1 版本中有一个更新,允许您使用以下内容设置策略中的所有内容:nms.PrefetchPolicy.All=1
    【解决方案2】:

    在我设置赏金之后,我就发现了问题...... :-(

    我使用的是不支持预取策略的旧版 NMS。

    【讨论】:

      猜你喜欢
      • 2020-02-28
      • 2016-06-06
      • 2021-03-06
      • 2014-08-06
      • 2021-05-20
      • 2012-04-18
      • 2021-12-02
      • 2012-12-24
      • 1970-01-01
      相关资源
      最近更新 更多