【问题标题】:How to deal with JSON message with spring-rabbit in spring boot application?如何在spring boot应用程序中使用spring-rabbit处理JSON消息?
【发布时间】:2017-07-19 04:41:51
【问题描述】:

这是我的代码 sn-ps。

  • MQConfiguration 配置类

    @Configuration
    public class MQConfiguration {
        @Bean
        public Receiver receiver() {
            return new Receiver();
        }
    }
    
  • Receiver 处理接收消息的类

    @RabbitListener(queues = "testMQ")
    public class Receiver {
    
        @RabbitHandler
        public void receive(Message msg){
            System.out.println(msg.toString());
        }
    }
    
  • 这是我发送给 RabbitMQ

    的 JSON 消息
    {
        "id": 1,
        "name": "My Name",
        "description": "This is description about me"
    }
    

但是,当我运行我的应用程序时,我收到了以下错误消息。

2017-02-28 17:16:35.931  WARN 11828 --- [cTaskExecutor-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:872) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:782) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:702) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:95) [spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:186) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1227) [spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:683) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1181) [spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1165) [spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1500(SimpleMessageListenerContainer.java:95) [spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1367) [spring-rabbit-1.7.0.RELEASE.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]
Caused by: org.springframework.amqp.AmqpException: No method found for class [B
    at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.getHandlerForPayload(DelegatingInvocableHandler.java:127) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.getMethodNameFor(DelegatingInvocableHandler.java:224) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.getMethodAsString(HandlerAdapter.java:61) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:140) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:106) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:779) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    ... 10 common frames omitted

如果我只想在receive() 方法中打印JSON 消息,我该怎么办?我真的很感激任何人都可以对此有所了解。 :)

【问题讨论】:

    标签: java spring spring-boot spring-amqp spring-rabbit


    【解决方案1】:

    如果你使用Spring Boot,你只需要配置:

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    

    否则你必须配置:

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    ...
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
    ...
        return factory;
    }
    

    http://docs.spring.io/spring-amqp/docs/1.7.0.RELEASE/reference/html/_reference.html#async-annotation-driven

    【讨论】:

    • @Artem Bilan 是的,我用的是Spring Boot,我在配置类中添加jsonMessageConverter()注解后仍然提示错误,你可以从here查看我的代码。
    【解决方案2】:

    为了将 JSON 发送到 RabbitMQ 并通过 Spring Boot 使用,我们需要设置 content_type

    让我用一个示例来描述我有一个 Python 生产者和 Java 消费者(我将 JSON 从 Python 发送到 RabbitMQ,而 Spring Boot Java 应该接收 JSON 任务)。

    有两种解决方案:

    解决方案 1:作为 JSON 字符串发送并使用 Jakson 或 GS​​ON 手动转换

    你需要设置 content_type="text/plain" 并将 JSON 转换为字符串。然后在 Spring 端,使用一个以字符串为输入的函数作为侦听器并手动转换对象。

    兔子处理程序:

    @RabbitHandler
    public void receive(String inputString) throws IOException {
        ObjectMapper objectMapper = new ObjectMapper();
        SimStatusReport theResult = objectMapper.readValue(inputString, SimStatusReport.class);
    
        System.out.println("String instance "  + theResult.toString() +
                " [x] Received");
    }
    

    SimStatusReport 对象:

    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public class SimStatusReport {
        private String id;
        private int t;
    }
    

    这是我的 Python 代码:

    import pika
    import json
    import uuid
    
    
    connectionResult = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channelResult = connectionResult.channel()
    routing_key_result = 'sim_results'
    channelResult.queue_declare(queue=routing_key_result, durable=True)
    
    def publish_result(sim_status):
        message =json.dumps(sim_status)
        channelResult.basic_publish(exchange='',
                                    routing_key=routing_key_result,
                                    body=message,
                                    properties=pika.BasicProperties(
                                        content_type="text/plain",
                                        content_encoding= 'UTF-8',
                                        delivery_mode=2,  # make message persistent
                              ))
        print("Sent ", message)
    
    
    newsim_status = {'id': str(uuid.uuid4()), 't': 0}
    publish_result(newsim_status)
    

    解决方案二:发送 JSON 字符串,让 Jackson2JsonMessageConverter 自动为您进行转换。

    您需要设置 content_type="application/json"。然后你需要在 RabbitMQ 请求的 header 中添加适当的 header 到 __TypeId__。您需要包含对象的确切名称空间,以便杰克逊无法理解转换。

    这是我使用 Python 的示例(只是 publish_result 函数):

    def publish_result(sim_status):
        message =json.dumps(sim_status)
        channelResult.basic_publish(exchange='',
                                    routing_key=routing_key_result,
                                    body=message,
                                    properties=pika.BasicProperties(
                                        content_type="application/json"
                                        headers={'__TypeId__': 'com.zarinbal.simtest.run.model.SimStatusReport'},
                                        content_encoding= 'UTF-8',
                                        delivery_mode=2,  # make message persistent
                              ))
        print("Sent ", message)
    

    然后你需要配置Java使用Jackson2JsonMessageConverter:

    @Configuration
        public class RabbitConfiguration {
            @Bean
            public MessageConverter jsonMessageConverter() {
                return new Jackson2JsonMessageConverter();
            }
        }
    

    这是你的听众:

    @RabbitListener(queues = "sim_results")
    public class TaskReceiver {
        @RabbitHandler
        public void receive(SimStatusReport in) {
            System.out.println("Object instance "  + in +
                    " [x] Received");
        }
    }
    

    注意: 确保所有对象都具有所有属性和所有参数构造函数的 setter 和 getter。我使用 lombok 的 @Data、@NoArgsConstructor 和 @AllArgsConstructor 来自动生成它

    【讨论】:

    • 或者使用 ClassMapper 例如DefaultJackson2JavaTypeMapper 并设置类映射。那么你就不需要透露(紧密耦合)实现细节了。
    【解决方案3】:

    我知道距离上次回复已经有一段时间了,但我想添加我的答案。以防其他人遇到同样的问题。

    当我设置像您这样的接收器时,我遇到了类似的问题,即我无法将 RabbitMQ 消息正文中的 JSON 转换为我的对象。它只是返回这样的错误消息。

    org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Failed to convert message
            at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:146) ~[spring-rabbit-2.3.9.jar:2.3.9]
            at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1654) ~[spring-rabbit-2.3.9.jar:2.3.9]
            at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1573) ~[spring-rabbit-2.3.9.jar:2.3.9]
            at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1561) ~[spring-rabbit-2.3.9.jar:2.3.9]
            at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1552) ~[spring-rabbit-2.3.9.jar:2.3.9]
            at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1496) ~[spring-rabbit-2.3.9.jar:2.3.9]
            at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:968) [spring-rabbit-2.3.9.jar:2.3.9]
            at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:914) [spring-rabbit-2.3.9.jar:2.3.9]
            at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83) [spring-rabbit-2.3.9.jar:2.3.9]
            at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1289) [spring-rabbit-2.3.9.jar:2.3.9]
            at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1195) [spring-rabbit-2.3.9.jar:2.3.9]
            at java.lang.Thread.run(Thread.java:748) [na:1.8.0_271]
    Caused by: org.springframework.amqp.AmqpException: No method found for class java.util.LinkedHashMap
            at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.getHandlerForPayload(DelegatingInvocableHandler.java:185) ~[spring-rabbit-2.3.9.jar:2.3.9]
            at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.getMethodFor(DelegatingInvocableHandler.java:317) ~[spring-rabbit-2.3.9.jar:2.3.9]
            at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.getMethodFor(HandlerAdapter.java:110) ~[spring-rabbit-2.3.9.jar:2.3.9]
            at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:192) ~[spring-rabbit-2.3.9.jar:2.3.9]
            at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:137) ~[spring-rabbit-2.3.9.jar:2.3.9]
            ... 11 common frames omitted
    

    在没有任何好的答案的情况下搜索了几个小时的不同网站后,我终于发现@RabbitListener 在课堂级别不起作用。但是,当您将其移至方法级别时,它可以正常工作。

    所以,我删除了@RabbitHandler 并将@RabbitListener 移动到receive 方法。

    您的接收器代码在编辑后应如下所示。

    public class Receiver {
    
        @RabbitListener(queues = "testMQ")
        public void receive(Message msg){
            System.out.println(msg.toString());
        }
    }
    

    我仍然不完全理解为什么它在课堂级别上不起作用。如果有人有好的解释,请在 cmets 部分分享。非常感谢。

    来源:https://titanwolf.org/Network/Articles/Article?AID=0f214d79-10f0-4b4c-9478-607428770256#gsc.tab=0

    【讨论】:

      【解决方案4】:

      你可以简单地使用这个函数new String(Messages)

      @RabbitListener(queues = "testMQ")
      public class Receiver {
      
          @RabbitHandler
          public void receive(Message msg){
      String MQMessage = new String(msg.getBody());
              System.out.println(MQMessage);
          }
      }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2021-07-16
        • 2021-07-28
        • 1970-01-01
        • 2018-03-02
        • 1970-01-01
        • 1970-01-01
        • 2017-01-13
        相关资源
        最近更新 更多