【问题标题】:Spring Boot AMQP based JmsListener fails on TextMessage基于 Spring Boot AMQP 的 JmsListener 在 TextMessage 上失败
【发布时间】:2019-06-22 12:26:43
【问题描述】:

我有一个 Spring Boot 应用程序,它在从 ActiveMQ 代理检索 TextMessage 类型的 JMS 消息时遇到问题。

如果消费者尝试从代理检索消息,它不能自动将消息转换为 TextMessage,而是将其视为 ByteMessage。有一个 JmsListener 应该将队列中的消息作为 TextMessage 读取:

...
@JmsListener(destination = "foo")
public void jmsConsumer(TextMessage message) {
...

JmsListener 产生如下警告,并丢弃消息:

org.springframework.jms.listener.adapter.ListenerExecutionFailedException: Listener method could not be invoked with incoming message
Endpoint handler details:
Method [public void net.aschemann.demo.springboot.jmsconsumer.JmsConsumer.jmsConsumer(javax.jms.TextMessage)]
Bean [net.aschemann.demo.springboot.jmsconsumer.JmsConsumer@4715f07]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [javax.jms.TextMessage] for org.springframework.jms.listener.adapter.AbstractAdaptableMessageListener$MessagingMessageConverterAdapter$LazyResolutionMessage@7c49d298, failedMessage=org.springframework.jms.listener.adapter.AbstractAdaptableMessageListener$MessagingMessageConverterAdapter$LazyResolutionMessage@7c49d298
    at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:118) ~[spring-jms-5.1.4.RELEASE.jar:5.1.4.RELEASE]

我提取了一个小示例应用程序 调试问题:https://github.com/ascheman/springboot-camel-jms

现实生活中的生产者是一个使用 Apache Camel 的商业应用程序。因此,我几乎无法更改/自定义生产者。我试图构建一个显示相同行为的示例生产者。

我能否以某种方式调整消费者以将消息视为TextMessage

另外:有什么方法可以直接在 Spring 中以编程方式从消息中检索附加的 AMQP 属性?当然,我仍然可以将消息读取为ByteMessage 并尝试解析属性。但我正在寻找一种由任何 Spring API 支持的更清洁的方式。到目前为止,Spring @Headers 注释没有帮助。

【问题讨论】:

  • 您收到的是byte[],默认情况下会转换为ByteMessage。我建议要么只接收byte[],而不是接收带有特定 JMS 类型消息的消息。尝试String 而不是TextMessage。属性不是标题(AFAIK),因此不确定在这种情况下它们被映射到哪里。如果您真的想使用TextMessage,请编写自定义消息转换器将byte[] 转换为TextMessage
  • 谢谢,但问题不在于更改类型:我可以获得byte[] 的内容以及String。但是,在这种情况下,消息还包含一些额外的数据 - 所谓的属性(查看提到的 AMQP 规范以获取详细信息)。我对制作人最初发送的正版短信感兴趣。我不想自己从byte[]String 对象中解析它。如果使用的传输与 JMS 兼容,我希望得到一个正确的 JMS 对象,而不是我需要自己处理的字节数。
  • 但是您没有使用 JMS 作为传输,这是您使用 AMQP 时的主要问题。您基本上是通过 AMQP(与 JMS 不同)为 JMS 建立隧道。因此消息被改编,TextMessage 将被转换为 AMQP 可以理解的东西,如果你查看 Java API 就是byte[]。事实上,您使用的不是 JMS,而是 AMQP,您试图将其硬塞到 JMS 中。为什么不直接使用 AMQP?
  • 这真的是 AMQP,还是 ActiveMQ (AMQ)?您可以接收Message<?>,它是对多个传输(包括 JMS)的弹簧消息传递抽象。它有一个payloadheaders(从JMS 消息映射)。
  • 为了避免人们浪费时间,这个问题也被cross-posted 作为 Spring Boot 问题。我怀疑这是使用默认native 映射而不是所需的jms 映射的代理配置问题。有关各种映射选项的详细信息,请参阅activemq.apache.org/amqp.html

标签: java spring spring-boot activemq spring-jms


【解决方案1】:

如果您在从 byte[] 转换为 String 时遇到问题:

.convertBodyTo(String.class)

路线示例:

from(QUEUE_URL)
                .routeId("consumer")
                .convertBodyTo(String.class)
                .log("${body}")
                .to("mock:mockRoute");

【讨论】:

  • 我不使用也不愿意在消费者端使用 Camel。
【解决方案2】:

在我遵循@AndyWilkinson 的评论,在 activemq.xml 中的 transportConnector 上添加 transport.transformer 选项后,我曾经遇到过与问题所有者相同的问题如下,问题就解决了。

<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600&amp;transport.transformer=jms"/>

【讨论】:

    【解决方案3】:

    我有同样的错误,这是因为LazyResolutionMessage是从MessagingMessageConverter调用的,这是MessageConverter的默认实现,which converts your message(实际上它没有,因为它是默认的):

    return ((org.springframework.messaging.Message) payload).getPayload();
    

    我已经完成了你想要的,最后我的消费者的工作方式是:

    @JmsListener(destination = "${someName}")
    public void consumeSomeMessages(MyCustomEvent e) {
      ....
    }
    

    我必须做的是:

    @Bean(name = "jmsListenerContainerFactory")
    public DefaultJmsListenerContainerFactory whateverNameYouWant(final ConnectionFactory genericCF) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setErrorHandler(t -> log.error("bad consumer, bad", t));
        factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
        factory.setConnectionFactory(genericCF);
        factory.setMessageConverter(
                new MessageConverter() {
                    @Override
                    public Message toMessage(Object object, Session session) {
                        throw new UnsupportedOperationException("since it's only for consuming!");
                    }
    
                    @Override
                    public MyCustomEvent fromMessage(Message m) {
                        try {
                          // whatever transformation you want here...
                          // here you could print the message, try casting,
                          // building new objects with message's attributes, so on...
                          // example:
                          return (new ObjectMapper()).readValue(((TextMessage) m).getText(), MyCustomEvent.class);
                        } catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    }
                }
        );
        return factory;
    }
    

    几个关键点:

    1. 如果您的DefaultJmsListenerContainerFactory 方法也称为jmsListenerContainerFactory,则在Bean 注释中不需要name 属性

    2. 请注意,您还可以实现 ErrorHandler 以在尝试转换/强制转换您的消息类型时处理异常!

    3. ConnectionFactory 是带有 Amazon 的 SQSConnectionFactory 的 Spring 托管 bean,因为我想从 SQS 队列中消费。请正确提供您的等价物。我的是:

      @Bean("connectionFactory")
      public SQSConnectionFactory someOtherNome() {
          return new SQSConnectionFactory(
                  new ProviderConfiguration(),
                  AmazonSQSClientBuilder.standard()
                          .withRegion(Regions.US_EAST_1)
                          .withCredentials(
                                  new AWSStaticCredentialsProvider(
                                          new BasicAWSCredentials(
                                                  "keyAccess",
                                                  "keySecret"
                                          )
                                  )
                          )
                          .build()
          );
      }
      

    【讨论】:

      猜你喜欢
      • 2016-06-03
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-04-08
      • 1970-01-01
      • 2019-09-01
      • 2020-11-14
      • 1970-01-01
      相关资源
      最近更新 更多