【问题标题】:Custom messages using RabbitMQ/ActiveMQ?使用 RabbitMQ/ActiveMQ 的自定义消息?
【发布时间】:2014-09-18 09:28:57
【问题描述】:

我来自 Sidekiq,现在正在转向 Java 解决方案来处理分布式作业。我遇到了 RabbitMQ 和 ActiveMQ,但似乎这些代理使用纯文本或原始字节 [] 消息。我想知道是否可以使用这些框架发送自定义消息?

理想情况下,我会为每个特定的消息类型定义一个 Java 类,并在工作者和生产者中使用它。这样的事情可能吗?或者我应该看看其他类型的中间件?

 MyOwnMessageFormat message = new MyOwnMessageFormat(content)
 channel.send(message)

 Message message = channel.receive()
 if (message.class == MyOwnMessageFormat)
 {
      doSomething();
 }

【问题讨论】:

    标签: java rabbitmq activemq


    【解决方案1】:

    通过像 Camel 这样的服务总线,在 Java 中通过消息代理交换消息要容易得多。您不会失去配置代理端点的灵活性,并且您的代码仍然与使用的特定传输或消息格式隔离。例如。您可以部署 ActiveMQ 并稍后切换到 RabbitMQ 而无需更新您的代码 - 只需服务总线配置。或者,在向代理发送消息时,您可以通过添加消息转换器从纯 Java 序列化切换到 JSON。同样,您的业务层不必修改。

    这是一个使用 POJO 生产/消费的示例,其中生产者调用常规 Java 接口,消费者实现该接口。该示例假设发送方/接收方使用 Spring 实例化,以便注入 Camel 端点

    消息发送者:

    interface MyService {
       MyResult addTask(MyTask task);
    }  
    
    class Sender {
        @Produce(uri="activemq:queue:myservice")
        MyService service;
    
        public void run() {
          MyTask task = new MyTask();
          MyResult result = service.addTask(task);
    }
    

    消息接收者:

    class Receiver {
        @Consume(uri="activemq:queue:myservice")
        public MyResult addTask(MyTask task) {
             return new MyResult();
        }
    }
    

    MyTask 和 MyResult 需要可序列化。

    我认为学习 Camel 框架并不难,但收获颇丰。

    【讨论】:

      【解决方案2】:

      在 JMS 中,只要您的自定义消息对象是可序列化的,您就可以根据您的要求使用 ObjectMessage、Stream Message 或 Map Message 来发送(Object Message):

      MessageProducer producer = session.createProducer( destination );
      ObjectMessage message = session.createObjectMessage( getMyObject() );
      producer.send( message );
      

      接收:

       Message message = consumer.receive();
      if (message instanceof ObjectMessage) {
      Object object = ((ObjectMessage) message).getObject();
      

      希望对您有所帮助!

      【讨论】:

        【解决方案3】:

        是的,在将消息发布到 RabbitMQ 之前,将您的消息转换为 Json 字符串。 现在在接收端,当收到消息时,解析它以转换成相同的格式。

        在 ruby​​ 中可以使用

        发送时 => Message.to_json
        接收时 => 消息 = JSON.parse(received_msg)


        要获取数据的类别,您可以将指定数据类别的变量与数据一起发送。

        【讨论】:

        • 是的,这是一种解决方案,但我真的不喜欢这样。我更喜欢只使用 java 类。但我确实可以编写一个通用解析器,根据 JSON 返回我想要的 java 对象。
        • 或者您可以使用 Jackson、spring amqp 或任何您喜欢的编组/序列化抽象层。
        • 当问题是关于 java 的时候,你为什么要在 ruby​​ 中提供代码示例?伪代码可能比 ruby​​ 代码更好。
        【解决方案4】:

        这就是我使用最新版本的 ActiveMQ 的原因

        ...
        MessageConsumer consumer=session.createConsumer(destination);       
        
        while(true) {
            javax.jms.Message message=consumer.receive();
            ActiveMQObjectMessage queueMessage=(ActiveMQObjectMessage)message;
            Object payload=queueMessage.getObject();
            if(payload instanceof NotificationMessage) {
                this.sendMessage((NotificationMessage)payload);
            }
        }
        

        NotificationMessage 对象扩展了Serializable,看起来像这样

        public static class NotificationMessage implements Serializable {
        
            /**
             * 
             */
            private static final long serialVersionUID = 1631373969001850200L;
        
            public String to;
            public String data;
        
        }
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2017-11-24
          • 2013-07-13
          • 1970-01-01
          • 1970-01-01
          • 2016-06-08
          相关资源
          最近更新 更多