【问题标题】:RabbitMQ only processes 50 messages then blocksRabbitMQ 只处理 50 条消息然后阻塞
【发布时间】:2014-10-31 14:19:08
【问题描述】:

我在 .net 中使用 RabbitMQ,当我在队列中放置 100 条消息时,我看到了一个奇怪的问题。它处理大约 50 条消息,然后 Dequeue() 方法就挂起。如果我重新启动服务,它会处理剩余的项目。

编辑:它正在处理 50% 的队列。当我添加 1000 条消息时,它只处理 500 条。即使是单线程

我在这里错过了什么?

    private void InitializeAgent() {
        var agentFactory = new ConnectionFactory() { HostName = "localhost" };
        agentConnection = agentFactory.CreateConnection();
        agentChannel = agentConnection.CreateModel();
        var ok = agentChannel.QueueDeclare(GetType().Name, true, false, false, null);
        consumer = new QueueingBasicConsumer(agentChannel);
        agentChannel.BasicConsume(GetType().Name, false, consumer);
    }

    public void DequeueMessages() {
        ThreadPool.SetMaxThreads(200, 200);
        ThreadPool.SetMinThreads(200, 200);
        var ea = consumer.Queue.Dequeue();
        ThreadPool.QueueUserWorkItem(ProcessWorkInThread, ea);
    }

    public void AgentTask() {
        var instance = factory.GetInstance(threadItem);

        while (true) 
            DequeueMessages();
    }

    private void ProcessWorkInThread(object state) {
         var ea = state as BasicDeliverEventArgs;

         var message = Encoding.UTF8.GetString(ea.Body);

         var settings = new JsonSerializerSettings();
         settings.ContractResolver = new DefaultContractResolver() { DefaultMembersSearchFlags = BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Public };
         var item = JsonConvert.DeserializeObject<TEntity>(message, settings);

         Thread.Sleep(10000) //simulate work
         lock (agentChannel)             
             agentChannel.BasicAck(ea.DeliveryTag, false);            
     }

【问题讨论】:

  • 相当肯定 IModel 不是线程安全的。 .net 用户指南特别指出,IModel 不应在线程之间共享。
  • @user1450877 我可以在 dequeue & ack 周围加一个锁吗?
  • 线程必须使用一个 IModel。
  • @ChrisKooken 我什至不确定这是否是您的问题,但这似乎是一个不错的起点。就个人而言,如果我要使用您的消息处理模型,我会将消息出列并在 DequeueMessages() 方法中直接确认(或关闭确认),然后让线程完成所需的工作。如果任何特定的处理出现问题,只需重新排队即可。
  • @user1450877 参见上面的编辑。即使我立即确认,或者一起删除线程,它仍然只处理启动时队列大小的 50%。

标签: c# multithreading rabbitmq


【解决方案1】:

这一定是 .NET 客户端版本问题,使用3.4.0,以下代码按预期工作。

static readonly ConnectionFactory Factory = new ConnectionFactory { HostName = "localhost" };
static readonly IConnection Connection = Factory.CreateConnection();
static QueueingBasicConsumer consumer;
static IModel agentChannel;

static CancellationTokenSource _tokenSource;

static void Main(string[] args)
{
    _tokenSource = new CancellationTokenSource();

    const string queueName = "testQueue";
    agentChannel = Connection.CreateModel();
    agentChannel.QueueDeclare(queueName, true, false, false, null);
    agentChannel.QueueBind(queueName, "testExchange", "");

    consumer = new QueueingBasicConsumer(agentChannel);
    agentChannel.BasicConsume(queueName, false, consumer);

    while (!_tokenSource.Token.IsCancellationRequested)
    {
        DequeueMessages();
    }
    Console.ReadLine();
    _tokenSource.Cancel();
}

static void DequeueMessages()
{
    ThreadPool.SetMaxThreads(200, 200);
    ThreadPool.SetMinThreads(200, 200);
    var ea = consumer.Queue.Dequeue();
    ThreadPool.QueueUserWorkItem(ProcessWorkInThread, ea);
}

static void ProcessWorkInThread(object state)
{
    var ea = state as BasicDeliverEventArgs;

    var message = Encoding.UTF8.GetString(ea.Body);

    var settings = new JsonSerializerSettings();
    settings.ContractResolver = new DefaultContractResolver()
    {
        DefaultMembersSearchFlags = BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Public
    };
    var item = JsonConvert.DeserializeObject<string>(message, settings);
    Console.WriteLine(item);
    Thread.Sleep(10000); //simulate work
    lock (agentChannel)
        agentChannel.BasicAck(ea.DeliveryTag, false);
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-03-19
    • 1970-01-01
    相关资源
    最近更新 更多