【问题标题】:spring amqp-outbound gateway to produce reply from a different thead (Like jms-outbound gateway)spring amqp-outbound gateway 以产生来自不同thead的回复(如 jms-outbound gateway)
【发布时间】:2018-04-11 11:33:36
【问题描述】:

问题陈述:

Spring amqp-outbound 网关从不同的线程产生回复(如 jms-outbound 网关,具有不同的队列,使用关联键关联请求/响应)。

无法将消息与此示例相关联。

Spring 集成

    <int:gateway id="outboundGateway" service-interface="com.amqp.outbound.gateway.OutboundGateway"     
                        default-reply-channel="defaultReplyChannel" >
        <int:method name="process"   request-channel="inboundRequestChannel"/>
    </int:gateway>

    <int:channel id="defaultReplyChannel"/>
    <int:channel id="inboundRequestChannel"/>
    <int:channel id="enrichedInboundRequestChannel"/>
    <int:channel id="processAuthRequestChannel"/>
    <int:channel id="postProcessorChannel"/>

    <int:chain input-channel="inboundRequestChannel" output-channel="enrichedInboundRequestChannel">
        <int:service-activator id="serviceActivator"
                       ref="ouboundService"  method="createRequest"/>
    </int:chain>

    <int-amqp:outbound-gateway id="outboundGtwyId" header-mapper="headerMapper"
                        request-channel="enrichedInboundRequestChannel"
                        reply-channel="defaultReplyChannel"
                        amqp-template="template" 
                        reply-timeout="30000" 
                        exchange-name="request_exchange" 
                        routing-key="request_exchange_queue"/>

    <int-amqp:inbound-channel-adapter id="amqpMessageDriven"  queue-names="request_queue" 
                                 connection-factory="rabbitConnectionFactory"  channel="processAuthRequestChannel"/>

    <int:service-activator id="serviceActivator"
                       ref="ouboundService" input-channel="processAuthRequestChannel" output-channel="postProcessorChannel"
                       method="processRequest"/>

    <int-amqp:outbound-channel-adapter amqp-template="template" channel="postProcessorChannel" 
            header-mapper="headerMapper" exchange-name="reply_exchange" routing-key="reply_exchange_queue"/>

    <bean id="headerMapper" class="org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper"/>

配置

@Bean
public RabbitTemplate template(ConnectionFactory rabbitConnectionFactory){
    final RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory);
    template.setQueue("reply_queue");
    return template;
}



@Bean
public Binding binding(){
    return BindingBuilder.bind(this.queue()).to(this.exchange()).with("request_exchange_queue");
}

@Bean
public DirectExchange exchange(){
    return new DirectExchange("request_exchange");
}

@Bean
public Queue queue(){
    return new Queue("request_queue", true, false, true);
}

@Bean
public Binding bindingReply(){
    return BindingBuilder.bind(this.queue()).to(this.exchange()).with("reply_exchange_queue");
}

@Bean
public DirectExchange exchangeReply(){
    return new DirectExchange("reply_exchange");
}


@Bean
public Queue replyQueue(){
    return new Queue("reply_queue", true, false, true);
}

服务

@Service
public final class OuboundService {


    public Message createRequest(String message){
        System.out.println("Inside createRequest : "+ message);
        final String transactionId = UUID.randomUUID().toString();
        final Message builtMessage = MessageBuilder.withBody(message.getBytes())
                .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
                .setHeader(AmqpHeaders.CORRELATION_ID, transactionId)
                .build();
        return builtMessage;
    }


    public Message processRequest(Message message){
        System.out.println("Inside process Request : "+ new String(message.getBody()));
        System.out.println("Header values : "+message.getMessageProperties().getHeaders());
        final Message result = MessageBuilder.withBody("Successful".getBytes()).copyProperties(message.getMessageProperties())
                                .copyHeaders(message.getMessageProperties().getHeaders()).build();
        return result;
    }

}

错误:

org.springframework.integration.handler.ReplyRequiredException:处理程序“outboundGtwyId”没有产生回复,并且它的“requiresReply”属性设置为true。

GitHub 源代码(已解决的解决方案)

https://github.com/kingkongprab/spring-amqp-outbound-gateway

【问题讨论】:

    标签: java spring-integration spring-amqp


    【解决方案1】:

    相关性也在 Spring AMQP 中完成。有关更多信息,请参阅其RabbitTemplate#sendAndRecevie()。在Reference Manual 中也有关于此事的良好文档。

    Spring Integration 及其AbstractAmqpOutboundEndpointAmqpInboundGateway 实现提供了开箱即用的请求-回复关联解决方案。如果你不能在服务器端使用AmqpInboundGateway,你应该确保correlationId从接收到的请求转移到回复发送回。是的,您可以使用专用交换来进行回复,这就是RabbitTemplate#setQueue() 支持的方式来等待客户端的出站回复。但是,如果没有正确的correlation 转移,这仍然是行不通的。另请参阅 https://docs.spring.io/spring-integration/docs/4.3.12.RELEASE/reference/html/amqp.html#amqp-message-headers 了解如何在 Spring Integration 中映射标头(包括 correlationId)。

    更新

    感谢您分享您的申请。

    嗯,现在我看到了几个问题:

    1. 您肯定缺少replyQueue 绑定:

      @Bean
      public Binding bindingReply(){
          return BindingBuilder.bind(this.replyQueue()).to(this.exchangeReply()).with("reply_exchange_queue");
      }
      
    2. RabbitTemplate 必须使用setReplyAddress()。您必须为reply_queue 配置MessageListenerContainer 并将RabbitTemplate 作为监听器:

      @Bean
      public RabbitTemplate template(ConnectionFactory rabbitConnectionFactory){
          final RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory);
          template.setReplyAddress(replyQueue().getName());
          return template;
      }
      
      @Bean
      public MessageListenerContainer replyContainer(RabbitTemplate template) {
          SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
          container.setConnectionFactory(template.getConnectionFactory());
          container.setQueues(replyQueue());
          container.setMessageListener(template);
          return container;
      }
      
    3. 你的OuboundServiceorg.springframework.amqp.core.Message 操作是没用的。通道适配器不知道这种类型的payload,而您的自定义Message 只是成为另一个org.springframework.amqp.core.Message 的序列化body。我已经把它改成了这个,一切正常:

      public String createRequest(String message){
          System.out.println("Inside createRequest : "+ message);
          return message;
      }
      
      
      public Message processRequest(Message message){
          System.out.println("Inside process Request : " + message);
          return message;
      }
      

    无论如何,我建议您重新考虑您的设计并返回AmqpInboundGateway

    顺便说一句,在最终解决方案中,您不需要关心任何correlation。框架会自动为您完成这项工作。

    【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2015-05-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-10-24
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多