【问题标题】:Wait for a single RabbitMQ message with a timeout等待一条带有超时的 RabbitMQ 消息
【发布时间】:2011-02-17 11:43:56
【问题描述】:

我想向 RabbitMQ 服务器发送一条消息,然后等待回复消息(在“回复”队列中)。当然,我不想永远等待,以防处理这些消息的应用程序出现故障 - 需要超时。这听起来像是一项非常基本的任务,但我找不到这样做的方法。我现在在py-amqplibRabbitMQ .NET client 都遇到了这个问题。

到目前为止,我得到的最佳解决方案是使用basic_getsleep 进行轮询,但这很难看:

def _wait_for_message_with_timeout(channel, queue_name, timeout):
    slept = 0
    sleep_interval = 0.1

    while slept < timeout:
        reply = channel.basic_get(queue_name)
        if reply is not None:
            return reply

        time.sleep(sleep_interval)
        slept += sleep_interval

    raise Exception('Timeout (%g seconds) expired while waiting for an MQ response.' % timeout)

肯定有更好的方法吗?

【问题讨论】:

    标签: .net python rabbitmq amqp py-amqplib


    【解决方案1】:

    这似乎打破了异步处理的整个想法,但如果你必须这样做,我认为正确的方法是使用RpcClient

    【讨论】:

    • 虽然 RpcClient 本身对我没有用,但查看它的实现揭示了使用方法:创建一个 QueueingBasicConsumer 并等待它的队列,它支持超时。这在 .NET 中并不像我担心的那样复杂。
    【解决方案2】:

    有一个示例 here 使用 qpidmsg = q.get(timeout=1) 应该可以满足您的需求。抱歉,我不知道还有哪些 AMQP 客户端库实现了超时(特别是我不知道你提到的两个具体的)。

    【讨论】:

    • 查看 qpid 的来源,它似乎使用与 .NET 客户端完全相同的方法:basic_consume 使用队列并在队列中等待超时。看来这就是我必须做的。
    【解决方案3】:

    这是我最终在 .NET 客户端中所做的事情:

    protected byte[] WaitForMessageWithTimeout(string queueName, int timeoutMs)
    {
        var consumer = new QueueingBasicConsumer(Channel);
        var tag = Channel.BasicConsume(queueName, true, null, consumer);
        try
        {
            object result;
            if (!consumer.Queue.Dequeue(timeoutMs, out result))
                throw new ApplicationException(string.Format("Timeout ({0} seconds) expired while waiting for an MQ response.", timeoutMs / 1000.0));
    
            return ((BasicDeliverEventArgs)result).Body;
        }
        finally
        {
            Channel.BasicCancel(tag);
        }
    }
    

    不幸的是,我不能对 py-amqplib 做同样的事情,因为它的 basic_consume 方法不会调用回调,除非你调用 channel.wait()channel.wait() 不支持超时!这个愚蠢的限制(我一直遇到)意味着如果您再也没有收到过消息,您的线程将永远冻结。

    【讨论】:

      【解决方案4】:

      我刚刚在carrot 中添加了对amqplib 的超时支持。

      这是amqplib.client0_8.Connection的子类:

      http://github.com/ask/carrot/blob/master/carrot/backends/pyamqplib.py#L19-97

      wait_multichannel.wait 的一个版本,能够接收任意数字 频道。

      我想这可能会在某个时候在上游合并。

      【讨论】:

      • 现在这就是我所说的“很好的答案”:“它已修复”!接受——希望它合并到amqplib中。
      • @EMP 哈哈 :) 好笑 :)
      【解决方案5】:

      Rabbit 现在允许您添加超时事件。只需将代码包装在 try catch 中,然后在 TimeOut 和 Disconnect 处理程序中抛出异常:

      try{
          using (IModel channel = rabbitConnection.connection.CreateModel())
          {
              client = new SimpleRpcClient(channel, "", "", queue);
              client.TimeoutMilliseconds = 5000; // 5 sec. defaults to infinity
              client.TimedOut += RpcTimedOutHandler;
              client.Disconnected += RpcDisconnectedHandler;
              byte[] replyMessageBytes = client.Call(message);
              return replyMessageBytes;
          }
      }
      catch (Exception){
          //Handle timeout and disconnect here
      }
      private void RpcDisconnectedHandler(object sender, EventArgs e)
      {
           throw new Exception("RPC disconnect exception occured.");
      }
      
      private void RpcTimedOutHandler(object sender, EventArgs e)
      {
           throw new Exception("RPC timeout exception occured.");
      }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2015-04-15
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2013-12-07
        • 2021-01-31
        • 1970-01-01
        相关资源
        最近更新 更多