【问题标题】:Several data sources in one spring integration pipeline?一个弹簧集成管道中有多个数据源?
【发布时间】:2012-05-19 17:10:12
【问题描述】:

我有一个配置的 spring 集成管道,其中 xml 文件被解析为各种对象。这些对象正在通过几个通道端点,它们在这些端点上略有修改 - 没什么特别的,只是添加了一些属性。

管道的最后一个端点是持久化器,对象在数据库中持久化。可能存在重复,因此在此端点中还会检查对象是已持久化还是新对象。 我使用消息驱动的架构,带有简单的直接渠道。

<int:channel id="parsedObjects1" />
<int:channel id="parsedObjects2" />
<int:channel id="processedObjects" />
<int:service-activator input-channel="parsedObjects1" ref="processor1" method="process" />
<int:service-activator input-channel="parsedObjects2" ref="processor2" method="process" />
<int:service-activator input-channel="processedObjects" ref="persister" method="persist" />

目前只有一个数据源,我从中获取xml文件,一切顺利。当我需要附加第二个数据源时,问题就开始了。这些文件同时出现,所以我希望它们并行处理。所以,我放置了两个解析器实例,每个解析器都通过管道发送消息。 我有直接通道的配置会产生并发问题,所以我尝试修改它。我已经尝试了 spring 集成文档中的几种配置,但到目前为止都没有成功。

我已经尝试过将最大池大小配置为 1 的调度程序 - 每个通道端点中的每条消息一个线程。

<task:executor id="channelTaskExecutor" pool-size="1-1" keep-alive="10" rejection-policy="CALLER_RUNS" queue-capacity="1" />
<int:channel id="parsedObjects1" >
    <int:dispatcher task-executor="channelTaskExecutor" />
</int:channel>
<int:channel id="parsedObjects2" >
    <int:dispatcher task-executor="channelTaskExecutor" />
</int:channel>
<int:channel id="processedObjects" >
    <int:dispatcher task-executor="channelTaskExecutor" />
</int:channel>

我也尝试过队列轮询器配置:

<task:executor id="channelTaskExecutor" pool-size="1-1" keep-alive="10" rejection-policy="CALLER_RUNS" queue-capacity="1" />
<int:channel id="parsedObjects1" >
    <int:rendezvous-queue/>
</int:channel>
<int:channel id="parsedObjects2" >
    <int:rendezvous-queue/>
</int:channel>
<int:channel id="processedObjects" >
    <int:rendezvous-queue/>
</int:channel>

<int:service-activator input-channel="parsedObjects1" ref="processor1" method="process" >
    <int:poller task-executor="channelTaskExecutor" max-messages-per-poll="1"  fixed-rate="2" />
</int:service-activator>
<int:service-activator input-channel="parsedObjects2" ref="processor2" method="process" >
    <int:poller task-executor="channelTaskExecutor" max-messages-per-poll="1"  fixed-rate="2" />
</int:service-activator>
<int:service-activator input-channel="processedObjects" ref="persister" method="persist" >
    <int:poller task-executor="channelTaskExecutor" max-messages-per-poll="1"  fixed-rate="2" />
</int:service-activator>

基本上,我想摆脱通道端点中的任何竞争条件 - 在我的情况下是持久化。持久化通道端点应该为每条消息阻塞,因为如果它并行运行,我会在数据库中持久保存许多重复项。

编辑:

在我进行了一些调试之后,问题似乎出在端点逻辑而不是配置上。通过管道发送到持久化器的一些对象也存储在本地缓存中,直到文件解析完成 - 它们稍后也通过管道发送,以将一些连接表作为其他域的一部分持久化实体。碰巧使用上述配置,一些对象在管道中第二次发送时还没有持久化,所以最后我在数据库中得到了重复。 我在春季集成方面相当新,所以可能在这一点上我会问更一般的问题。在具有多个数据源的设置中 - 意味着解析器的多个实例等:

  1. 是否有一种通用方法(最佳方法)来配置管道以启用并行化?
  2. 如果有需要,有没有办法序列化消息处理?

欢迎提出任何建议。提前致谢。

【问题讨论】:

    标签: java spring spring-integration


    【解决方案1】:

    首先,您能描述一下“并发问题”是什么吗?理想情况下,您不需要序列化消息处理,因此这是一个很好的起点。

    其次,您配置的线程池不会完全序列化。您将有 1 个线程可用池中,但您选择的拒绝策略会导致调用者线程在队列满载的情况下运行任务本身(基本上是节流)。这意味着您将同时获得一个调用者运行的线程和池中的线程。

    【讨论】:

    • 感谢您的回答,我会再次查看调度程序配置。我还编辑了我的问题以清除一些内容。
    【解决方案2】:

    对于您的方案,我能想到的最佳方法是:

    让你的 parsedObject1 和 parsedObject2 成为普通的队列通道,队列的容量可以适当设置(比如随时25):

    <int:channel id="parsedObjects1" >
        <int:queue />
    </int:channel>
    

    此时,您的 xml 处理器针对 2 个通道 - parsedObjects1 和 parsedObjects2,将处理 xml 并应输出到 processedObjects 通道。您可以使用与您的配置类似的配置,除了我已明确指定处理对象通道 - :

    <int:service-activator input-channel="parsedObjects1" ref="processor1" method="process" output-channel="processedObjects">
        <int:poller task-executor="channelTaskExecutor"/>
    </int:service-activator>
    

    第三步我会偏离你的配置,此时你说你想要序列化持久化,最好的方法是通过池大小为 1 的不同任务执行器来完成,这种方式只有您的持久化器的 1 个实例在任何时间点都在运行:

    <task:executor id="persisterpool" pool-size="1"/>
    <int:service-activator input-channel="processedObjects" ref="persister" method="persist" >
        <int:poller task-executor="persisterpool" fixed-delay="2"/>
    </int:service-activator>
    

    【讨论】:

    • 感谢您的回答,我设法让它工作了。我不确定这是否正确,我会多尝试一些并尝试您的建议。
    【解决方案3】:

    我设法让管道正常工作。我不确定我是否会保留当前配置,或者进行更多试验,但现在,这是我最终得到的配置:

    <task:executor id="channelTaskExecutor" pool-size="1-1" keep-alive="10" rejection-policy="CALLER_RUNS" queue-capacity="1" />
    <int:channel id="parsedObjects1" >
    <int:queue capacity="1000" />
    </int:channel>
    <int:channel id="parsedObjects2" >
    <int:queue capacity="1000" />
    </int:channel>
    <int:channel id="processedObjects" >
    <int:queue capacity="1000" />
    </int:channel>
    
    <int:service-activator input-channel="parsedObjects1" ref="processor1" method="process" >
    <int:poller task-executor="channelTaskExecutor" max-messages-per-poll="100"  fixed-rate="2" />
    </int:service-activator>
    <int:service-activator input-channel="parsedObjects2" ref="processor2" method="process" >
    <int:poller task-executor="channelTaskExecutor" max-messages-per-poll="100"  fixed-rate="2" />
    </int:service-activator>
    <int:service-activator input-channel="processedObjects" ref="persister" method="persist" >
    <int:poller task-executor="channelTaskExecutor" max-messages-per-poll="1"  fixed-rate="2" />
    </int:service-activator>
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2016-06-16
      • 1970-01-01
      • 2018-04-08
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多