【问题标题】:Spring Integration : Scatter-Gather, Gather not working after parallel web service callsSpring Integration:Scatter-Gather,Gather 在并行 Web 服务调用后不起作用
【发布时间】:2019-11-19 13:19:53
【问题描述】:

我正在设置 Spring 集成流以使用来自 MQ 的消息,并通过创建从 MQ 消息创建的请求来并行调用 Web 服务调用。

下面是 Spring 集成流程的样子

  • 使用来自 IBM MQ 的消息,使用 Marshaller 转换消息并将实体保存到 DB。
  • 将保存的实体发送到 Scatter-Gather 频道。
  • Scatter-Gather Channel 有两个分发渠道,每个分发渠道是一个包含以下组件的链

    1. Webservice 客户端进行 Web 服务调用(服务激活器)
    2. 将响应转换为实体对象的转换器
    3. 将数据保存到 DB 的处理程序。
  • 收集来自并行 Web 服务调用的响应,并将从两个并行 Web 服务调用创建的新对象发送到 RabbitMQ。

我能够从分散-聚集模式进行并行 Web 服务调用,但我没有看到聚集模式中发生聚合,基本上流不会进入聚集器类。

我尝试使用任务执行器作为 Scatter-Gather 模式的输入通道的 Publish-Subscribe 通道,并且根据日志,Web 服务调用与两个任务执行器并行发生,但在 Web 服务调用后它永远不会到达 Gatherer。

<si:service-activator input-channel="transformedEntity"
        ref="incidentHandler" output-channel="outputChannelFromMQ" />

<si:scatter-gather input-channel="outputChannelFromMQ" 
        requires-reply="false" output-channel="gatherResponseOutputChannel" gather-channel="gatherChannel" gather-timeout="4000">
        <si:scatterer apply-sequence="true">
            <si:recipient channel="distributionChannel1" />
            <si:recipient channel="distributionChannel2" />
        </si:scatterer>         
</si:scatter-gather>

<si:publish-subscribe-channel id="outputChannelFromMQ" apply-sequence="true" 
        task-executor="taskExecutor" />

<task:executor id="taskExecutor" queue-capacity="25"    pool-size="10-10" />

<si:chain id="planngedBagsChain" input-channel="distributionChannel1"
        output-channel="gatherChannel">
    <si:service-activator ref="webServiceClient1" method="getResponse" />
    <si:service-activator ref="serviceHandler1"     method="saveToDB" />
</si:chain>

<si:chain id="bagHistoryChain" input-channel="distributionChannel2"
    output-channel="gatherChannel">
    <si:service-activator ref="webServiceClient2" method="getResponse" />
    <si:transformer ref="transformer" />
    <si:service-activator ref="serviceHandler2" method="saveToDB" />
</si:chain>

<si:service-activator input-channel="gatherResponseOutputChannel"
    ref="responseTransformer" method="receiveResponse" output-channel="toRabbitMQ" />

【问题讨论】:

  • 您的saveToDB 是否返回任何内容?该方法调用的结果将被发送到gatherChannel
  • 是的,调试了DB保存成功后webservice客户端返回对象的代码。
  • 这不是我要问的。你的&lt;si:chain&gt; 最后都有&lt;si:service-activator ref="serviceHandler1" method="saveToDB" /&gt; 之类的东西。根据您的配置,saveToDB 在服务调用之后已经发生。所以,问题是:saveToDB 方法中发生了什么?
  • 嗯,您可以简单地为org.springframework.integration 类别启用DEBUG 日志记录级别,您将在日志中看到您的消息是如何传播的。
  • 另外,如果我删除 requires-reply='false' 属性并尝试然后我得到`org.springframework.integration.handler.ReplyRequiredException:处理程序'org.springframework.integration.scattergather没有产生回复.ScatterGatherHandler#0',并且它的'requiresReply'属性设置为true',所以scattergatherhandler基本上不会产生回复。

标签: spring-integration scatter


【解决方案1】:

启用调试日志后,能够找出它正在发送到的所有通道。使用上述配置,它不会正确收集器。更改为以下配置有效。

<si:service-activator input-channel="transformedEntity"
        ref="incidentHandler" output-channel="outputChannelFromMQ" />

<si:channel id="outputChannelFromMQ"></si:channel>

<si:scatter-gather input-channel="outputChannelFromMQ"
         requires-reply="false" scatter-channel="scatterInputChannel" output-channel="toRabbit"  
         gather-channel="gatherChannel" gather-timeout="4000">
        <si:gatherer id="responseGatherer"  ref="responseTransformer" release-strategy-expression="size() == 2"/>
    </si:scatter-gather>

<si:publish-subscribe-channel id="scatterInputChannel" apply-sequence="true" 
        task-executor="taskExecutor" />

<task:executor id="taskExecutor" queue-capacity="25"    pool-size="10-10" />

<si:chain id="planngedBagsChain" input-channel="scatterInputChannel"
        output-channel="gatherChannel">
    <si:service-activator ref="webServiceClient1" method="getResponse" />
    <si:service-activator ref="serviceHandler1"     method="saveToDB" />
</si:chain>

<si:chain id="bagHistoryChain" input-channel="scatterInputChannel"
    output-channel="gatherChannel">
    <si:service-activator ref="webServiceClient2" method="getResponse" />
    <si:transformer ref="transformer" />
    <si:service-activator ref="serviceHandler2" method="saveToDB" />
</si:chain>

【讨论】:

    猜你喜欢
    • 2023-04-04
    • 1970-01-01
    • 2022-01-16
    • 1970-01-01
    • 2018-11-03
    • 2021-12-13
    • 2022-12-10
    • 2020-05-07
    • 2016-07-14
    相关资源
    最近更新 更多