【问题标题】:spring-integration group messages by classifier分类器的弹簧集成组消息
【发布时间】:2019-03-28 08:46:39
【问题描述】:

使用 Java 8 流,我可以按分类器对消息进行分组:

Map<String, List<String>> grouped = Arrays.asList("a", "b", "b", "b", "c")
        .stream()
        .collect(Collectors.groupingBy(Function.identity()));

我想编写一个聚合器来对消息进行相应的分组。如上所示,在带有有效负载的五条消息中,我想生成三条消息: 第一个应该有"a" 作为有效负载,第二个应该有一个列表,其中包含三个"b" 作为有效负载,第三个应该有"c" 作为有效负载。

当达到序列大小时,应释放所有消息组。基于有效负载的分组工作正常,但消息组永远不会被释放。

在发布策略中,我可以访问序列大小,但我无法找出已处理的项目总数。如何释放我的分组消息?

public interface StringGrouper {
    List<Message<?> groupSame(List<String> toGroup);
}   

@Bean
public IntegrationFlow groupStringsFlow() {
    return IntegrationFlows.from(StringGrouper.class)
        .split()
        .aggregate(agg -> agg
            .correlationStrategy(message -> message.getPayload())
            .releaseStrategy(group -> group.getSequenceSize() == /* what? */)) 
        .logAndReply();
}

@Test
public void shouldGroupMessages {
    List<Message<?> grouper
        .groupSame(Arrays.asList("a", "b", "b", "b", "c"));
}

解决方法是根本不使用聚合器,而是将传入列表分组到转换器中。但我希望我可以为此使用聚合器。

@Bean
public IntegrationFlow groupStringsFlow() {
    return IntegrationFlows.from(StringGrouper.class)
        .<List<String>, Collection<List<String>>>transform(source -> source.stream()
            .collect(Collectors.collectingAndThen(
                Collectors.groupingBy(Function.identity()), 
                grouped -> grouped.values())))
        .split()
        .log() // work with messages
        .aggregate()
        .get();
}

【问题讨论】:

    标签: spring-integration spring-integration-dsl


    【解决方案1】:

    使用默认序列大小释放策略将它们聚合为一个组,并使用自定义输出处理器 (MessageGroupProcessor) 在有效负载上重新组合,返回 Collection&lt;Message&lt;List&lt;?&gt;&gt;&gt;

    【讨论】:

    • 现在我应该知道 MessageGroupProcessor :-)
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-12-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多