【问题标题】:Spring Integration: Split message again after using aggregate?Spring Integration:使用聚合后再次拆分消息?
【发布时间】:2011-10-21 00:00:42
【问题描述】:

在我的 Spring Integration 驱动项目中,我有一个拆分器和有效负载路由器,用于将我的数据发送到各种转换器。然后将新的“转换”对象传递回聚合器并进行处理。

现在我想拆分我的聚合结果,以便正确保存它们,因为我需要将一些对象路由到单独的出站通道适配器。为此,我在聚合器之后添加了第二个拆分器;但似乎只有聚合集合中的第一个元素会传递给路由器。

这是我目前的流程:

<splitter ref="articleContentExtractor" />

<!-- This router works exactly as expected -->
<payload-type-router>
    ... routing to various transformers ...
    ... results are sent to articleOutAggregateChannel ...
</payload-type-router>

<aggregator ref="articleAggregator" />

<splitter />

<!-- This is where it seems to go wrong, the second
        splitter returns only the first object in the collection -->    
<payload-type-router resolution-required="true">
    <mapping type="x.y.z.AbstractContent" channel="contentOutChannel" />
    <mapping type="x.y.z.Staff" channel="staffOutChannel" />
</payload-type-router>

<outbound-channel-adapter id="contentSaveService" ref="contentExporter" 
    method="persist" channel="contentOutChannel" />

<outbound-channel-adapter id="staffSaveService" ref="staffExporter" 
    method="persist" channel="staffOutChannel" />

还有我的聚合器代码:

@Aggregator
public List<? super BaseObject> compileArticle(List<? super BaseObject> parts) {

    // Search for the required objects for referencing
    Iterator<? super BaseObject> it = parts.iterator();
    Article article = null;
    List<Staff> authors = new ArrayList<Staff>();

    while (it.hasNext()) {
        Object part = it.next();
        if (part instanceof Article) {
            article = (Article)part;
        }
        else if (part instanceof Staff) {
            authors.add((Staff)part);
        }
    }

    // Apply references
    article.setAuthors(authors);

    return parts;
}

我做错了什么?我是否正确使用我的聚合器?

注意:如果我完全删除聚合器和第二个拆分器,则流程的其余部分可以完美运行。

【问题讨论】:

  • 它可以工作(即没有错误),但第二个拆分器只返回集合中的第一个元素
  • 我已经编辑了我的问题,以便更容易理解我的问题
  • 是的,我调试时返回的对象部分包含整个集合。例如,如果我的parts 对象中有{Article, Staff, Staff},我的payload-type-router 只用Article 对象触发一次,Staff 对象就完全丢失了

标签: spring spring-integration aggregator


【解决方案1】:

由于我没有您的所有代码,因此很难说出正在发生的一切,但我能够按照您想要的方式使此流程工作。 baseObjectTransformer 除了在通过它的对象上设置一个标志之外什么都不做:

...
<context:component-scan base-package="net.grogscave.example" />

<channel id="inputChannel" />

<splitter ref="articleContentExtractor" input-channel="inputChannel"
    output-channel="splitArticleStaff" />

<channel id="splitArticleStaff" />

<payload-type-router input-channel="splitArticleStaff">
    <mapping type="net.grogscave.example.domain.Article" channel="articleChannel" />
    <mapping type="net.grogscave.example.domain.Staff" channel="staffChannel" />
</payload-type-router>

<channel id="articleChannel" />

<transformer input-channel="articleChannel" output-channel="articleOutAggregateChannel"
    ref="baseObjectTransformer"/>

<channel id="staffChannel" />

<transformer input-channel="staffChannel" output-channel="articleOutAggregateChannel"
    ref="baseObjectTransformer"/>

<channel id="articleOutAggregateChannel" />

<aggregator ref="articleAggregator" input-channel="articleOutAggregateChannel" output-channel="splitArticleInChannel"/>

<channel id="splitArticleInChannel" />

<splitter input-channel="splitArticleInChannel" output-channel="splitArticleOutChannel" />

<channel id="splitArticleOutChannel" />

<payload-type-router resolution-required="true" input-channel="splitArticleOutChannel">
    <mapping type="net.grogscave.example.domain.Article" channel="contentOutChannel" />
    <mapping type="net.grogscave.example.domain.Staff" channel="staffOutChannel" />
</payload-type-router>

<channel id="contentOutChannel">
    <queue capacity="10" />
</channel>

<channel id="staffOutChannel">
    <queue capacity="10" />
</channel>
...

然后我在测试用例中使用以下代码来练习这个流程:

    ...
    AbstractApplicationContext context = new ClassPathXmlApplicationContext(
            "/META-INF/spring/split-agg-split.xml",
            SplitAggSplitTests.class);

    MessageChannel inputChannel = context.getBean("inputChannel",
            MessageChannel.class);

    PollableChannel contentOutChannel = context.getBean("contentOutChannel",
            PollableChannel.class);

    PollableChannel staffOutChannel = context.getBean("staffOutChannel",
            PollableChannel.class);

    inputChannel.send(new GenericMessage<String>("Dewey Wins!,A. Fool, C. Lewlis"));

    logger.info("==> Article recieved: "
            + contentOutChannel.receive(0).getPayload());

    for(int i = 0; i < 2; i++)
    {
        logger.info("==> Staff recieved: "
                + staffOutChannel.receive(0).getPayload());
    }
    ...

我的日志输出产生所有三个实体。

话虽如此,您是否考虑过简单地删除额外的拆分器/聚合器并将 setAuthors 逻辑移动到您的第一个拆分器?我不知道您的流程的所有细节,但它似乎可以简化事情。

【讨论】:

  • 感谢您的详细回答,我会将您的设置与我自己的设置进行比较,然后返回更新。你的第二个建议会很好,但我的 setAuthors 逻辑只有在我的转换器转换了拆分对象后才可能实现。
  • 我花了一段时间才回到这个问题上,但是您的建议以及对我的默认轮询器的一些调整解决了我的问题。谢谢!
猜你喜欢
  • 1970-01-01
  • 2012-12-19
  • 2014-02-17
  • 2016-01-19
  • 2023-04-09
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多