【问题标题】:Reply Message post processing in AsyncRabbitTemplateAsyncRabbitTemplate 中的回复消息后处理
【发布时间】:2019-03-08 01:40:55
【问题描述】:

我在使用 AsyncRabbitTemplate 时遇到了 GZip/GUnzip 消息处理问题。

使用这样的同步模板设置一切正常:

@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
    final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setMessageConverter(jsonConverter());
    rabbitTemplate.setReplyTimeout(config.getRabbitSendAndReceiveReplyTimeout());
    rabbitTemplate.setReceiveTimeout(config.getRabbitSendAndReceiveReceiveTimeout());
    rabbitTemplate.setAfterReceivePostProcessors(new GUnzipPostProcessor(true));
    rabbitTemplate.setBeforePublishPostProcessors(new GZipPostProcessor(true));
    return rabbitTemplate;
}

但是,当我像这样设置异步模板时:

@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public AsyncRabbitTemplate rabbitTemplateAsync(final ConnectionFactory connectionFactory) {
    final AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate(connectionFactory));
    // need to manually start the reply listener container for some reason
    asyncRabbitTemplate.start();
    return asyncRabbitTemplate;
}

回复消息未正确解压缩,我收到此错误消息

Caused by: java.io.UnsupportedEncodingException: gzip:UTF-8
    at java.lang.StringCoding.decode(Unknown Source) ~[?:1.8.0_192]
    at java.lang.String.<init>(Unknown Source) ~[?:1.8.0_192]
    at java.lang.String.<init>(Unknown Source) ~[?:1.8.0_192]
    at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.convertBytesToObject(AbstractJackson2MessageConverter.java:235) ~[spring-amqp-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.fromMessage(AbstractJackson2MessageConverter.java:199) ~[spring-amqp-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.amqp.rabbit.AsyncRabbitTemplate.onMessage(AsyncRabbitTemplate.java:576) ~[spring-rabbit-2.1.4.RELEASE.jar:2.1.4.RELEASE]

我尝试为 AsyncRabbitTemplate 提供一个已配置的 DirectReplyToMessageListenerContainer,但没有帮助

    final DirectReplyToMessageListenerContainer directReplyToMessageListenerContainer = new DirectReplyToMessageListenerContainer(
            connectionFactory);
    directReplyToMessageListenerContainer.setAfterReceivePostProcessors(new GUnzipPostProcessor(true));
    final AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate(connectionFactory),
            directReplyToMessageListenerContainer);

这只会导致这个错误:

[错误] 2019-03-06 12:18:05.192 [AMQP 连接 172.17.3.6:5672] CachingConnectionFactory.log - 通道关闭:通道错误;协议方法:#method(reply-code=406, reply-text=PRECONDITION_FAILED - 快速回复消费者不存在, class-id=60, method-id=40)

请注意,我可以通过使用 spring-rabbit 项目的一个分支并将此构造函数添加到 AsyncRabbitTemplate 来使事情正常进行:

public IndigoAsyncRabbitTemplate(final RabbitTemplate template,
        final DirectReplyToMessageListenerContainer directReplyToContainer) {
    Assert.notNull(template, "'template' cannot be null");
    this.template = template;
    container = null;
    replyAddress = null;
    this.directReplyToContainer = directReplyToContainer;
    directReplyToContainer.setMessageListener(this);
}

那么,这是否需要对 spring rabbit 库进行增强才能开始工作?或者有没有办法让 GUnzip 在回复侦听器上工作而不会跳过太多的环节?

【问题讨论】:

    标签: spring-amqp


    【解决方案1】:

    是的,这必须作为对框架的改进。在AsyncRabbitTemplate 的情况下,我们只是错过了afterReceivePostProcessors 的事实。我们可以从提供的RabbitTemplate 重新配置一个内部DirectReplyToMessageListenerContainer 以使用afterReceivePostProcessors

    同时,您可以坚持使用常规的SimpleMessageListenerContainer 注入。 或者您可以尝试使用外部DirectReplyToMessageListenerContainer 注入。

    看到这个演员:

    /**
     * Construct an instance using the provided arguments. The first queue the container
     * is configured to listen to will be used as the reply queue. Replies will be
     * routed using the default exchange with that queue name as the routing key.
     * @param template a {@link RabbitTemplate}
     * @param container a {@link AbstractMessageListenerContainer}.
     */
    public AsyncRabbitTemplate(RabbitTemplate template, AbstractMessageListenerContainer container) {
        this(template, container, null);
    }
    

    关于此事的问题:https://github.com/spring-projects/spring-amqp/issues/920

    【讨论】:

    • 感谢您的快速响应 :) Re: 尝试DirectReplyToMessageListenerContainer 注入。我用提到的构造函数尝试了这个,但这导致了“PRECONDITION_FAILED - 快速回复消费者不存在”错误。它似乎不喜欢将 DirectReplyToMessageListenerContainer 作为容器字段,而且我看不到通过 API 设置 directReplyToContainer 字段的方法。我现在必须尝试SimpleMessageListenerContainer 设置
    • 是的,我明白了。请考虑为 Spring AMQP 项目提出 GH 问题,这样我们就不会忘记在框架级别解决问题。
    猜你喜欢
    • 2020-05-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-04-18
    • 2019-11-26
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多