【问题标题】:Spring Integration AMQPSpring 集成 AMQP
【发布时间】:2020-10-21 01:21:56
【问题描述】:

我刚刚开始学习spring-integration 我想在队列中接收消息并并行执行 2 个步骤: 第 1 步 -> 使用 bean 处理它 第 2 步 -> 转换并将其发送到另一个队列。 类似的东西:

return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queue1")
            .configureContainer(simpleMessageListenerContainerSpec -> {
                simpleMessageListenerContainerSpec.concurrentConsumers(3);
            }))
            .log(message -> "SERVICE EVENT QUEUE : Received Message : " + message.getPayload())
            .handle(serviceBean, "process")
            .<String,String>transform(String::toLowerCase)
            .log(message -> "SERVICE EVENT QUEUE : Transformed Message : " + message.getPayload())
            .handle(
                    Amqp.outboundAdapter(rabbitTemplate)
                            .exchangeName("exchange")
                            .routingKey("queue2.routing"))
            .get();

我错过了什么?第一个句柄之后的动作没有被执行。我想我没有正确理解这部分。 另外我怎样才能并行执行这两个步骤?

【问题讨论】:

    标签: spring spring-integration spring-integration-dsl spring-integration-amqp


    【解决方案1】:

    你应该从理论入手,了解Spring Integration中的很多概念和组件。

    “2 步并行” - 正是一个发布-订阅模式:https://www.enterpriseintegrationpatterns.com/patterns/messaging/PublishSubscribeChannel.html,Spring Integration 为其提供了一个实现:https://docs.spring.io/spring-integration/docs/current/reference/html/core.html#channel-implementations-publishsubscribechannel。正如您根据文档看到的那样,要实现并行性,您需要使用 TaskExecutor 配置这样的通道。

    通过 Java DSL,我们为发布-订阅配置提供了高级 API:

    https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-subflows

    因此,要使您的 .handle(serviceBean, "process").&lt;String,String&gt;transform(String::toLowerCase) 平行,您需要有这样的东西:

    return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queue1")
            .configureContainer(simpleMessageListenerContainerSpec -> {
                simpleMessageListenerContainerSpec.concurrentConsumers(3);
            }))
            .log(message -> "SERVICE EVENT QUEUE : Received Message : " + message.getPayload())
            .publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
                    .subscribe(f -> f
                             .handle(serviceBean, "process")))
            .<String,String>transform(String::toLowerCase)
            .log(message -> "SERVICE EVENT QUEUE : Transformed Message : " + message.getPayload())
            .handle(
                    Amqp.outboundAdapter(rabbitTemplate)
                            .exchangeName("exchange")
                            .routingKey("queue2.routing"))
            .get();
    

    【讨论】:

      猜你喜欢
      • 2016-11-23
      • 2015-12-24
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-10-26
      相关资源
      最近更新 更多