【问题标题】:How to correlate request & reply when using raw (not using Gateway) Spring Integration?使用原始(不使用网关)Spring集成时如何关联请求和回复?
【发布时间】:2013-07-08 19:15:38
【问题描述】:

我正在学习 Spring-Integration 并对 Gateway 和 Service-Activators 有基本的了解。我喜欢网关的概念。 Spring Integration 在运行时为网关生成代理。此代理对网关的消费者隐藏所有消息传递细节。此外,生成的代理也可能关联请求和回复。

出于学习的目的,我开始使用原始 Spring 集成功能而不是使用网关来实现请求和回复关联。我可以在请求标头中设置相关标识符,但在接收通道回复时无法指定相关标识符。以下(在问题的末尾)是相同的代码 sn-p。此外,相关内容如何针对消息代理(例如 RabbitMQ)起作用? RabbitMQ 是否提供了检索带有特定标头(相关标识符)的消息的功能?

public class RemoteProxyCalculatorService implements CalculatorService
{
    public int Square(int n) 
    {
        UUID uuid = SendRequest(n, "squareRequestChannel");
        int squareOfn = ReceiveReply("squareReplyChannel", uuid);
        return squareOfn;
    }

    private <T> UUID SendRequest(T payload, String requestChannel)
    {
        UUID requestID = UUID.randomUUID();
        Message<T> inputMessage = MessageBuilder.withPayload(payload)
                .setCorrelationId(requestID)
                .build();

        MessageChannel channel = (MessageChannel)context.getBean(requestChannel, MessageChannel.class);
        channel.send(inputMessage);
        return requestID;
    }

    @SuppressWarnings("unchecked")
    private <T> T ReceiveReply(String replyChannel, UUID requestID)
    {
        //How to consume requestID so as to receive only the reply related to the request posted by this thread
        PollableChannel channel = (PollableChannel)context.getBean(replyChannel);
        Message<?> groupMessage = channel.receive();
        return (T)groupMessage.getPayload();
    }

    private ClassPathXmlApplicationContext context;
}

谢谢。

【问题讨论】:

    标签: rabbitmq messaging spring-integration eai


    【解决方案1】:

    问题在于公共回复渠道。解决方案(Mark 建议类似)将如下所示。

    public class RemoteProxyCalculatorService
    {
      public int Square(int n)
      {
        PollableChannel replyChannel = SendRequest(n, "squareRequestChannel");
        int squareOfn = ReceiveReply(replyChannel);
        return squareOfn;
      }
    
      private <T> PollableChannel SendRequest(T payload, String requestChannel)
      {
        UUID requestID = UUID.randomUUID();
        QueueChannel replyQueueChannel = new QueueChannel();
        Message<T> inputMessage = MessageBuilder.withPayload(payload)
            .setCorrelationId(requestID)
            .setReplyChannel(replyQueueChannel)
            .build();
        MessageChannel channel = context.getBean(requestChannel, MessageChannel.class);
        channel.send(inputMessage);
        return replyQueueChannel;
      }
    
      @SuppressWarnings("unchecked")
      private <T> T ReceiveReply(PollableChannel replyChannel)
      {
        Message<?> groupMessage = replyChannel.receive();
        return (T) groupMessage.getPayload();
      }
    
      private ClassPathXmlApplicationContext context;
    }
    

    【讨论】:

      【解决方案2】:

      如果您想使用公共回复渠道,那么我认为这就是您要寻找的。​​p>

      public class RemoteProxyCalculatorService
      {
        public int Square(int n)
        {
          PollableChannel replyChannel = SendRequest(n, "squareRequestChannel");
          int squareOfn = ReceiveReply(replyChannel);
          return squareOfn;
        }
      
        private <T> PollableChannel SendRequest(T payload, String requestChannel)
        {
          UUID requestID = UUID.randomUUID();
          Message<T> inputMessage = MessageBuilder.withPayload(payload)
              .setCorrelationId(requestID)
              .setReplyChannel(myMessageHandler.getSubscribedChannel())
              .build();
      
          // Create a Pollable channel for two things
      
          // 1. Pollable channel is where this thread should look for reply.
          QueueChannel replyQueueChannel = new QueueChannel();
      
          // 2. Message Handler will send reply to this Pollable channel once it receives the reply using correlation Id.
          myMessageHandler.add(requestID, replyQueueChannel);
      
          MessageChannel channel = context.getBean(requestChannel, MessageChannel.class);
          channel.send(inputMessage);
      
          return replyQueueChannel;
        }
      
        @SuppressWarnings("unchecked")
        private <T> T ReceiveReply(PollableChannel replyChannel)
        {
          Message<?> groupMessage = replyChannel.receive();
          return (T) groupMessage.getPayload();
        }
      
        private ClassPathXmlApplicationContext context;
      
        @Autowired
        private MyMessageHandler myMessageHandler;
      }
      
      /**
       * Message Handler
       * 
       */
      public class MyMessageHandler implements MessageHandler
      {
        private final Map<Object, MessageChannel> idChannelsMap = new TreeMap<>();
        private final Object lock = new Object();
        private final SubscribableChannel subscribedChannel;
      
        public MyMessageHandler(SubscribableChannel subscribedChannel)
        {
          this.subscribedChannel = subscribedChannel;
        }
      
        @Override
        public void handleMessage(Message<?> message) throws MessagingException
        {
          synchronized (lock)
          {
            this.idChannelsMap.get(message.getHeaders().getCorrelationId()).send(message);
            this.idChannelsMap.remove(message.getHeaders().getCorrelationId());
          }
        }
      
        public void add(Object correlationId, MessageChannel messageChannel)
        {
          synchronized (lock)
          {
            this.idChannelsMap.put(correlationId, messageChannel);
          }
        }
      
        public SubscribableChannel getSubscribedChannel()
        {
          return subscribedChannel;
        }
      
      }
      

      【讨论】:

        【解决方案3】:

        在应用内进行关联的最简单方法甚至不需要correlationId 标头。相反,您可以创建一个 QueueChannel 实例(您不共享该实例)并将其作为您发送的 Message 上的 replyChannel 标头提供。无论下游组件最终响应什么,它都会在 Message 中找到该标头。

        关于 RabbitMQ,我们的出站网关只是应用了类似的技术,但使用了 AMQP 消息的 replyTo 属性。

        希望对您有所帮助。

        -马克

        【讨论】:

        • 谢谢马克。当然,我应该知道并学习是否有一种最简单的方法可以实现同样的目标。但是,这对我来说是一个学习练习,我想坚持下去。如果您能查看我的代码并帮助我关联请求和回复,我将不胜感激。我得到了正确人的注意;它不能比这更好:)。
        猜你喜欢
        • 1970-01-01
        • 2012-03-19
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2012-01-17
        • 1970-01-01
        • 2016-05-21
        相关资源
        最近更新 更多