【问题标题】:How to manual ack on AMQP-Backed Channel如何在 AMQP 支持的通道上手动确认
【发布时间】:2021-08-18 10:37:18
【问题描述】:

我在我的工作流程中使用 AMQP 支持的通道,我想手动处理 ACK。 我虽然这可以在 AMQP 入站通道中完成,您可以在消息标头中获得 AMQP 客户端通道的引用,但我在消息中找不到标头 AmqpHeaders.CHANNEL。这是我设置 AmqpChannelFactoryBean 的方式:

@Bean(name = AMQP_BACKED_CHANNEL)
    public AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) {
        AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean();
        factoryBean.setConnectionFactory(connectionFactory);
        factoryBean.setQueueName(AMQP_BACKED_CHANNEL);
        factoryBean.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        factoryBean.setPubSub(false);
        factoryBean.setExtractPayload(true);

        return factoryBean;
    }

我的感觉是,我不应该使用与 AMQP 入站通道相同的方法,但无法在那里找到文档。有人可以帮忙吗?

【问题讨论】:

    标签: spring-integration spring-amqp


    【解决方案1】:

    小更新:不确定这是否是“正确”的方法,但受到 Artem Bilam 在这篇文章 Spring AMQP Integration - Consumer Manual Acknowledgement 中的评论的启发,我已经使用 MethodBeforeAdvice 解决了问题。基本上,我的 MethodBeforeAdvice 应用于由 AmqpChannelFactoryBean 创建的 SimpleMessageListenerContainer 的 invokeListener() 方法,因此我可以获得 Amqp 客户端通道和 amqp 标头,游戏就完成了!以下是我修改的代码(为了更清晰阅读,稍微简洁一点):

    @Bean(name = AMQP_BACKED_CHANNEL)
    public AmqpChannelFactoryBean amqpBackedChannel(ConnectionFactory connectionFactory) {
        AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean();
        factoryBean.setConnectionFactory(connectionFactory);
        factoryBean.setQueueName(AMQP_BACKED_CHANNEL);
        factoryBean.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        factoryBean.setPubSub(false);
        factoryBean.setExtractPayload(true);
        factoryBean.setConcurrentConsumers(5);
        MethodBeforeAdvice methodBeforeAdvice = new MethodBeforeAdvice() {
            @Override public void before(Method method, Object[] args, Object target)
                throws Throwable {
                Channel amqpClientChannel = (Channel) args[0];
                Message amqpCoreMessage = (Message) args[1];
                Map<String, Object>
                    amqpCoreMessageHeaders =  amqpCoreMessage.getMessageProperties().getHeaders();
                amqpCoreMessageHeaders.put(AmqpHeaders.CHANNEL,amqpClientChannel);
            }
        };
    
        factoryBean.setAdviceChain(new Advice[]{methodBeforeAdvice});
    
        return factoryBean;
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-06-06
      • 2016-12-08
      • 2011-10-27
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多