【问题标题】:Rabbitmq headers exchange and confirmed deliveryRabbitmq 标头交换和确认交付
【发布时间】:2017-10-07 13:53:24
【问题描述】:

我正在尝试在 RabbitMQ 上使用混合的 java 和 python 组件进行标头交换,并且我需要确认交付。

我似乎从 python (pika) 和 java 客户端得到不同的行为。

在python中:

channel.exchange_declare(exchange='headers_test',
¦   ¦   ¦   ¦   ¦   ¦   ¦type='headers',
¦   ¦   ¦   ¦   ¦   ¦   ¦durable=True)
channel.confirm_delivery()
result = channel.basic_publish(exchange='headers_test',
¦   ¦   ¦   ¦   ¦   ¦ routing_key='',
¦   ¦   ¦   ¦   ¦   ¦ mandatory=True,
¦   ¦   ¦   ¦   ¦   ¦ body=message,
¦   ¦   ¦   ¦   ¦   ¦ properties=pika.BasicProperties(
¦   ¦   ¦   ¦   ¦   ¦   ¦ delivery_mode=2,
¦   ¦   ¦   ¦   ¦   ¦   ¦ headers=message_headers))

如果标头与任何绑定的消费者都不匹配并且无法路由消息,则结果为假

但在 java/scala 中:

channel.exchangeDeclare("headers_test", "headers", true, false, null)
channel.confirmSelect

val props = MessageProperties.PERSISTENT_BASIC.builder
¦   ¦   ¦   ¦  .headers(messageHeaders).build
channel.basicPublish("headers_test", 
¦   ¦   ¦   ¦   ¦   ¦"", //routingKey
¦   ¦   ¦   ¦   ¦   ¦true, //mandatory
¦   ¦   ¦   ¦   ¦   ¦props, 
¦   ¦   ¦   ¦   ¦   ¦"data".getBytes)
channel.waitForConfirmsOrDie()

在这里,当 messageHeaders 找不到匹配项时,消息似乎只是被丢弃而没有错误

是我遗漏了什么还是两个客户的行为真的不同?以及如何使用 java 中的标头交换获得确认交付?

注意:我已经有一个“复杂”的交换到队列路由设置,我宁愿避免向游戏添加死信路由,而只是发送失败。

【问题讨论】:

    标签: java python rabbitmq rabbitmq-exchange


    【解决方案1】:

    即使没有与您的标头匹配的队列,消息仍被视为已确认的问题。来自文档 (https://www.rabbitmq.com/confirms.html):

    对于不可路由的消息,代理将在 exchange 验证消息不会路由到任何队列(返回一个空的 队列列表)。如果消息也被强制发布,则 basic.return 在 basic.ack 之前发送给客户端。也是如此 用于否定确认 (basic.nack)。

    相反,您应该检查 basic.return 消息以检测消息是否已被路由。

    我用wireshark检查过,确实我可以看到如果一条消息没有被路由,那么就会有一个AMQP basic.return消息。

    我想你应该从

    开始
    channel.addReturnListener(new ReturnListener() {
      @Override
      public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("App.handleReturn");
        System.out.println("replyCode = [" + replyCode + "], replyText = [" + replyText + "], exchange = [" + exchange + "], routingKey = [" + routingKey + "], properties = [" + properties + "], body = [" + body + "]");
      }
    });
    

    事实上,如果消息没有被路由,我会得到这个:

    replyCode = [312],replyText = [NO_ROUTE],exchange = [headers_logs], routingKey = [], pro....

    此外,如果您想在 Java 中模拟 Pika 的同步行为,您似乎可以通过在发布消息之前获取当前发布序列号并注册确认侦听器而不是依赖 .waitForConfirmsOrDie() 来实现。

    所以一个完整的代码示例应该是:

    channel.addReturnListener(new ReturnListener() {
          @Override
          public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("App.handleReturn");
            System.out.println("replyCode = [" + replyCode + "], replyText = [" + replyText + "], exchange = [" + exchange + "], routingKey = [" + routingKey + "], properties = [" + properties + "], body = [" + body + "]");
          }
        });
    
        channel.addConfirmListener(new ConfirmListener() {
          @Override
          public void handleAck(long deliveryTag, boolean multiple) throws IOException {
            System.out.println("App.handleAck");
            System.out.println("deliveryTag = [" + deliveryTag + "], multiple = [" + multiple + "]");
          }
    
          @Override
          public void handleNack(long deliveryTag, boolean multiple) throws IOException {
            System.out.println("App.handleNack");
            System.out.println("deliveryTag = [" + deliveryTag + "], multiple = [" + multiple + "]");
          }
    });
    
    long nextPublishSeqNo = channel.getNextPublishSeqNo();
    System.out.println("nextPublishSeqNo = " + nextPublishSeqNo);
    
    channel.basicPublish("headers_logs",
        "",
         true,
         props,
        "data".getBytes());
    

    在返回/确认回调中,您需要查找在发布消息之前获得的频道发布序列号。

    如果您查看线路上发生的情况,如果消息没有被路由到任何队列,RabbitMq 会发送回一条 basic.return 消息,该消息还包含确认(交付标签)。如果消息已被路由,RabbitMq 会发回一条包含确认信息的 bacic.ack 消息。

    似乎 RabbitMq Java 客户端总是在 basicConfirm() 之前调用 basicReturn() 回调,因此判断消息是否已路由的逻辑可以是:

    在频道上注册返回和确认听众; 记住一个频道的下一个发布序列号; 等待返回或确认回调。如果是返回回调 - 消息尚未路由,您应该忽略对同一传递标签的进一步确认。如果在收到 handleReturn() 之前收到 handleAck() 回调,则表示消息已被路由到队列。

    虽然我不确定在哪种情况下可以调用 .handleNack()。

    【讨论】:

      猜你喜欢
      • 2015-05-03
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-09-30
      • 2017-07-22
      • 1970-01-01
      相关资源
      最近更新 更多