【问题标题】:Rabbitmq retrieve multiple messages using single synchronous callRabbitmq 使用单个同步调用检索多条消息
【发布时间】:2013-06-05 00:35:34
【问题描述】:

有没有办法使用单个同步调用接收多条消息?

当我知道队列中有 N 条消息(N 可能是小于 10 的小值)时,我应该能够执行类似 channel.basic_get(String queue, boolean autoAck, int numberofMsg) 的操作。我不想向服务器发出多个请求。

【问题讨论】:

    标签: rabbitmq synchronous


    【解决方案1】:

    不幸的是,RabbitMQ 的 basic.get 不支持多条消息 as seen in the docs。检索多条消息的首选方法是使用basic.consume,它将消息推送到客户端,避免多次往返。 acks 是异步的,因此您的客户端不会等待服务器响应。 basic.consume 还具有在客户端断开连接时允许 RabbitMQ 重新传递消息的好处,这是 basic.get 无法做到的。这也可以将no-ack 设置为true 来关闭。

    设置basic.qosprefetch-count会设置随时推送给客户端的消息数量。如果在客户端没有等待消息(将立即返回),客户端库往往会因可选的超时而阻塞。

    【讨论】:

    • 这不是对所问内容的回答。在某些情况下(例如出于安全考虑),无法从消息代理触发连接,而​​只能从安全区域触发连接到不太安全的区域。既然如此,这仍然没有回答所问的问题。
    • AMQP 或 RabbitMQ 可能已经发生了变化,允许同步调用在其间的几年中获取多条消息。 RabbitMQ 的连接来自客户端 -> Rabbit。消息的发布发生在这个现有的连接上。使用basic.getbasic.consume 不会改变建立连接的方式。
    • 这是关于消费消息而不是发布。这是否意味着消费者建立了一个连接,然后为了 RabbitMQ 回调工作而保持打开状态?在某些情况下,如果限制防火墙规则,这可能是不可能的。从这个意义上讲,使用 basic.get 或 basic.consume 确实会改变连接的处理方式。
    • 无论在连接上使用什么命令,客户端都会建立连接,无论是消费还是发布。由于 AMQP 消息通过该连接,回调由客户端库处理。
    【解决方案2】:

    您可以使用Consumer 接口的QueueingConsumer 实现,它允许您在单个请求中检索多条消息。

     QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
     channel.basicConsume(plugin.getQueueName(), false, queueingConsumer);
    
     for(int i = 0; i < 10; i++){
        QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(100);//read timeout in ms
        if(delivery == null){
          break;
        }
     }
    

    【讨论】:

    【解决方案3】:

    首先声明包装模型的 QueueingBasicConsumer() 实例。
    从模型执行 model.BasicConsume(QueueName, false, consumer)
    然后实现一个循环,该循环将围绕队列中的消息进行循环,然后处理
    下一行 - consumer.Queue.Dequeue() 方法 - 等待从队列中接收消息。
    然后将字节数组转换为字符串并显示出来。
    Model.BasicAck() - 从队列中释放消息以接收下一条消息
    然后在服务器端可以开始等待下一条消息通过:

      public string GetMessagesByQueue(string QueueName)
        {
            var consumer = new QueueingBasicConsumer(_model);
            _model.BasicConsume(QueueName, false, consumer);
    
            string message = string.Empty;
    
            while (Enabled)
            {
                //Get next message
                var deliveryArgs = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
    
                //Serialize message
                 message = Encoding.Default.GetString(deliveryArgs.Body);
                    _model.BasicAck(deliveryArgs.DeliveryTag, false);
            }
            return message;
        }
    

    【讨论】:

      【解决方案4】:

      不是一个优雅的解决方案,也不能解决多次调用,但您可以使用 MessageCount 方法。例如:

        bool noAck = false;
        var messageCount = channel.MessageCount("hello");
        BasicGetResult result = null;
        if (messageCount == 0)
        {
            // No messages available
        }
        else
        {
            while (messageCount > 0)
            {
                result = channel.BasicGet("hello", noAck);
                var message = Encoding.UTF8.GetString(result.Body);
                //process message .....
                messageCount = channel.MessageCount("hello");
            }
      

      【讨论】:

        猜你喜欢
        • 2015-11-25
        • 2019-04-14
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2020-08-31
        • 2012-05-24
        • 1970-01-01
        相关资源
        最近更新 更多