【问题标题】:C# RabbitMQ pool/queue approachC# RabbitMQ 池/队列方法
【发布时间】:2021-09-03 09:38:46
【问题描述】:

我正在尝试创建一个兔子 mq 通道的队列/池以供重用,他们建议这样做,但没有提供任何实际执行此操作的方法!

我有这个工作,但理想情况下我不想在调用构造函数时填充通道。我想创建最多 9 个频道,只有在需要时才创建一个。

我想从显示的完整示例中删除以下内容。

// COMMENT THIS LINE OUT
GetConnection();

// COMMENT THESE LINE OUT
for (var i = 0; i < MAX_CHANNELS; i++)
{
    _channelQueue.Add(_connection.CreateModel());
     _channelCount++;
}
// END

当我删除这些时,第一次创建通道和/或连接需要 13 秒,但如果我将代码留在其中,则不会发生这种情况!这让我很困惑,我不确定现在发生了什么。

using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using RabbitMQ.Client;

namespace Enqueue
{
    public class RabbitMqChannelPool
    {
        private readonly string _hostName;

        private const int MAX_CHANNELS = 9;
        
        // Keep track of the number of good channels
        private int _channelCount = 0;

        private IConnection _connection;
        
        private readonly BlockingCollection<IModel> _channelQueue = new BlockingCollection<IModel>(MAX_CHANNELS);

        private Object connectionLock = new Object();
        private Object channelLock = new Object();

        public RabbitMqChannelPool(string hostName)
        {
            _hostName = hostName;
            // COMMENT THIS LINE OUT
            GetConnection();
        }
        
        private IConnection GetConnection()
        {
            lock (connectionLock)
            {
                if (_connection == null || !_connection.IsOpen)
                {
                    Stopwatch sw = new Stopwatch();
                    sw.Start();
                    _connection = CreateNewConnection();
                    // Having this here makes it run fast, creating on the fly seems to block for ages on the first one
                    // COMMENT THESE LINE OUT
                    for (var i = 0; i < MAX_CHANNELS; i++)
                    {
                        _channelQueue.Add(_connection.CreateModel());
                        _channelCount++;
                    }
                    // END
                    sw.Stop();
                    System.Diagnostics.Debug.WriteLine($"Connection created in { sw.Elapsed.Seconds}.{sw.Elapsed.Milliseconds} seconds");
                }
                System.Diagnostics.Debug.WriteLine("Returning Connection");
                return _connection;
            }
        }

        private IConnection CreateNewConnection()
        {
            var factory = new ConnectionFactory
            {
                HostName = _hostName
            };

            return factory.CreateConnection();
        }

        /// <summary>
        /// Get a channel if at all possible
        /// </summary>
        /// <returns></returns>
        public IModel DequeueChannel()
        {
            lock (channelLock)
            {
                System.Diagnostics.Debug.WriteLine("Getting channel from queue");
                // if we have not used up the limit then return one
                // This should then be put back on the queue after use
                if (_channelCount < MAX_CHANNELS)
                {
                    System.Diagnostics.Debug.WriteLine("Returning new channel");
                    return CreateNewChannel();
                }

                // Otherwise block until one becomes available
                System.Diagnostics.Debug.WriteLine("Waiting for queue...");
                IModel channel = _channelQueue.Take(); // this blocks till there is something there
                return channel;
            }
        }

        /// <summary>
        /// Add the channel back on the queue if it seems fine
        /// </summary>
        /// <param name="channel"></param>
        public void EnqueueChannel(IModel channel)
        {
            if (channel != null && channel.IsOpen)
            {
                System.Diagnostics.Debug.WriteLine("Putting channel back on queue");
                _channelQueue.Add(channel);
            }
            else
            {
                System.Diagnostics.Debug.WriteLine("Channel was bad");
                _channelCount--;
            }
        }

        private IModel CreateNewChannel()
        {
            Stopwatch sw = new Stopwatch();
            sw.Start();
            var channel = GetConnection().CreateModel();
            sw.Stop();
            _channelCount++;
            System.Diagnostics.Debug.WriteLine($"Channel {_channelCount} created in { sw.Elapsed.Seconds}.{sw.Elapsed.Milliseconds} seconds");
            return channel;
        }

        public int ChannelQueueSize => _channelCount;
    }
}

我有这个单元测试来尝试模拟它被快速连续调用。

[SetUp]
public void Setup()
{
    _rabbitMqChannel = new RabbitMqChannelPool("localhost");
}

[Test]
public void WhenMoreChannelsRequestedThanAvailable_ThenBlockAndWaitTillAvailable()
{
    // Arrange
    // Act

    List<Task> tasks = new List<Task>();

    for (var i = 0; i < 20; i++)
    {
        tasks.Add(Task.Factory.StartNew(DoSomeWork));
    }

    Task.WaitAll(tasks.ToArray());

    // Assert

    Assert.AreEqual(9, _rabbitMqChannel.ChannelQueueSize);
}

private void DoSomeWork()
{
    var channel = _rabbitMqChannel.DequeueChannel();
    Thread.Sleep(100); // simulate doing some work
    _rabbitMqChannel.EnqueueChannel(channel);
}

当我在调试单元测试模式下运行它时,我可以看到如下输出,你可以看到它说 13.816 秒,但是如果我将代码保留在它最初填充队列时,它运行得很快吗?

Getting channel from queue
Returning new channel
Connection created in 13.861 seconds
Returning Connection
Channel 1 created in 13.876 seconds
Getting channel from queue
Returning new channel
Returning Connection
Channel 2 created in 0.2 seconds
Getting channel from queue
Returning new channel
Returning Connection
Channel 3 created in 0.3 seconds
Getting channel from queue
Returning new channel
Returning Connection
Channel 4 created in 0.3 seconds
Getting channel from queue
Returning new channel
Returning Connection
Channel 5 created in 0.3 seconds
Getting channel from queue
Returning new channel
Returning Connection
Channel 6 created in 0.2 seconds
Getting channel from queue
Returning new channel
Returning Connection
Channel 7 created in 0.2 seconds
Getting channel from queue
Returning new channel
Returning Connection
Channel 8 created in 0.3 seconds
Getting channel from queue
Returning new channel
Returning Connection
Channel 9 created in 0.3 seconds
Getting channel from queue
Waiting for queue...
Putting channel back on queue
Getting channel from queue
Waiting for queue...
Putting channel back on queue
Getting channel from queue
Waiting for queue...
Putting channel back on queue
Getting channel from queue
Waiting for queue...
Putting channel back on queue
Getting channel from queue
Waiting for queue...
Putting channel back on queue
Getting channel from queue
Waiting for queue...
Putting channel back on queue
Getting channel from queue
Waiting for queue...
Putting channel back on queue
Getting channel from queue
Waiting for queue...
Putting channel back on queue
Getting channel from queue
Waiting for queue...
Putting channel back on queue
Getting channel from queue
Waiting for queue...
Putting channel back on queue
Getting channel from queue
Waiting for queue...
Putting channel back on queue
Putting channel back on queue
Putting channel back on queue
Putting channel back on queue
Putting channel back on queue
Putting channel back on queue
Putting channel back on queue
Putting channel back on queue
Putting channel back on queue
Putting channel back on queue

【问题讨论】:

  • >"他们推荐这样做" 你在哪里读到的?您不需要池,只需为每个线程/上下文创建一个通道。例如,每个线程/生产者 1 个通道,线程消费者 1 个通道。你可以让频道保持打开状态。
  • 嗨 Gabriele 此文本“鉴于这两个因素,强烈建议限制每个连接使用的通道数。根据经验,大多数应用程序可以在每个连接中使用一位数的通道数。那些并发率特别高的应用程序(通常这样的应用程序是消费者)可以从每个线程/进程/协程一个通道开始,并在指标表明原始模型不再可持续时切换到通道池,例如因为它消耗了太多内存。”这里rabbitmq.com/channels.html
  • Also here "就像连接一样,通道也应该是长期存在的。为每个操作打开一个新通道效率非常低,并且非常不鼓励。但是,通道可以有更短的寿命“ rabbitmq.com/dotnet-api-guide.html
  • 在rabbitmq google group上提问后,听起来我需要调查masstransit-project.com,听起来我不必那么担心
  • 只有当你有非常多的连接和通道时才需要这种优化。顺便说一句,如果你发现解决方案没问题

标签: c# rabbitmq blockingcollection


【解决方案1】:

我决定停止那里的工作并研究 MassTransit,因为听起来这样可以为我处理这些事情https://masstransit-project.com/

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-06-14
    • 2018-12-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-04-19
    • 2013-08-09
    • 2015-04-30
    相关资源
    最近更新 更多