【问题标题】:Spring integration routing to subflow in different classSpring集成路由到不同类中的子流
【发布时间】:2018-10-04 12:20:02
【问题描述】:

为了完成这项工作,我苦苦挣扎了几天。我想要完成的是从主流中调用不同的子流(即集成流),基于消息内容并在子流完成后返回到主流。它就像委派责任给一个特定的类来完成某事并返回主流。该责任也可能需要一些步骤,因此它也作为流程实施。这是我的主要流程:

public IntegrationFlow processingFlow(
  MessageChannel eventIn,
  MessageChannel eventOut,
  ChangedEventsLoader changedEventsLoader,
  CalculatorRouter calculatorRouter) {

return IntegrationFlows.from(eventIn)
    .handle(changedEventsLoader)
    .route(
        CalculatorRouter::getSportId,
        CalculatorRouter::routeCalculation)
    .channel(eventOut)
    .get();

}

这是路由器的实现:

@Service
@AllArgsConstructor
public class CalculatorRouter {
  private final MessageChannel eventOut;

  public RouterSpec<Integer, MethodInvokingRouter> routeCalculation(
      RouterSpec<Integer, MethodInvokingRouter> mapping) {
    return mapping
        .channelMapping(1, "subflowCalculationChannel")
        .defaultOutputToParentFlow();
  }

  public Integer getSportId(Event event) {
    return 1;
  }

  @Bean
  public MessageChannel subflowCalculationChannel() {
    return MessageChannels.direct().get();
  }
}

这是一个子流程的示例:

@Configuration
@AllArgsConstructor
public class CalculatorExample {

  @Bean
  public IntegrationFlow calculateProbabilities(MessageChannel subflowCalculationChannel) {
    return IntegrationFlows.from(subflowCalculationChannel)
        .<Event>handle((p, m) -> p * 2)
        .get();
  }
}

问题是子流错过了与主流的一些连接。我试图通过在路由部分使用 defaultOutputToParentFlow() 来解决这个问题,但这还不够。

【问题讨论】:

    标签: java spring-integration


    【解决方案1】:

    从某个版本开始,我们决定将 Java DSL 路由器行为与带有注释或 XML 的标准配置保持一致。所以,如果我们发送到路由器,我们不能期待那里的回复。我们只能继续将通道作为子流的输出。

    在您的情况下,您在主要流程中有一个.channel(eventOut)。所以,你所有的路由子流都应该准确地回复这个频道:

        .<Event>handle((p, m) -> corners1H2HCustomBet.getCalculation(p))
        .channel(eventOut)
        .get();
    

    我认为.defaultOutputToParentFlow(); 不会为您做任何事情,因为您没有默认映射。而且它已经有点不同了:它对其他映射没有任何影响。

    还要注意这个JavaDoc:

    /**
     * Add a subflow as an alternative to a {@link #channelMapping(Object, String)}.
     * {@link #prefix(String)} and {@link #suffix(String)} cannot be used when subflow
     * mappings are used.
     * <p> If subflow should refer to the external {@link IntegrationFlow} bean and
     * there is a requirement to expect reply from there, such a reference should be
     * wrapped with a {@code .gateway()}:
     * <pre class="code">
     * {@code
     *     .subFlowMapping(false, sf -> sf.gateway(evenFlow())))
     * }
     * </pre>
     * @param key the key.
     * @param subFlow the subFlow.
     * @return the router spec.
     */
    public RouterSpec<K, R> subFlowMapping(K key, IntegrationFlow subFlow) {
    

    与您的基于通道的路由配置无关,但将来可能有用。

    更新

    这是subFlowMapping 的示例(Kotlin)并返回主流程:

        @Bean
        fun splitRouteAggregate() =
                IntegrationFlow { f ->
                    f.split()
                            .route<Int, Boolean>({ o -> o % 2 == 0 },
                                    { m ->
                                        m.subFlowMapping(true) { sf -> sf.gateway(oddFlow()) }
                                                .subFlowMapping(false) { sf -> sf.gateway(evenFlow()) }
                                    })
                            .aggregate()
                }
    
        @Bean
        fun oddFlow() =
                IntegrationFlow { flow ->
                    flow.handle<Any> { _, _ -> "odd" }
                }
    
        @Bean
        fun evenFlow() =
                IntegrationFlow { flow ->
                    flow.handle<Any> { _, _ -> "even" }
                }
    

    【讨论】:

    • 感谢您的回复。我希望有一些方法可以在子流之外指定输出通道。
    • 好吧,您可以在主流程中使用 enrichHeaders() 将其添加到 replyChannel 标头中。
    • 您是否有使用 subFlowMapping 和集成流的示例。这看起来更好,因为我不需要指定频道
    • 我刚刚添加了一个关于子流和网关的 Kotlin 示例。
    • 非常感谢。我会努力使它适合我的情况。
    猜你喜欢
    • 1970-01-01
    • 2021-11-23
    • 2018-06-15
    • 1970-01-01
    • 1970-01-01
    • 2011-01-29
    • 2014-04-04
    • 1970-01-01
    • 2019-07-20
    相关资源
    最近更新 更多