【问题标题】:Spring integration - relase messages in a loop (batches)Spring集成-循环发布消息(批量)
【发布时间】:2017-12-12 15:33:56
【问题描述】:

我需要能够从 rabbitmq 接收消息,进行一些转换(从 1 条输入消息,我创建 1000 条消息),然后按以下方式处理这 1000 条消息:我以 10 条为一组推送消息,然后休眠 5 秒.

您可以看到下面的代码,我需要的帮助是最后一步 - 如何以这种方式进行消息批处理?

@Bean
    public IntegrationFlow refreshFlow() {
        return IntegrationFlows
                //get messages from rabbitmq
                .from(refreshInboundAdapter())
                //convert to POJO
                .transform(new JsonToObjectTransformer(RefreshRequest.class))
                //make 1 -> 1000 messages (but release in batches of 10, not all)
                .<RefreshRequest, List<ElasticMatch>>transform(m -> componentConfig.matchRefreshService().processRequest(m))
                //HERE WAIT 5 seconds and forward to rabbit in batches of 10
                .handle(refreshOutboundEndpoint())
                .get();
    }

【问题讨论】:

    标签: java spring spring-integration spring-amqp


    【解决方案1】:

    不确定“等待 5 秒”是什么意思,但“分批发布 10 秒”正是聚合器的任务。你需要有一些人工的correlationKey,配置expireGroupsUponCompletion = trueMessageCountReleaseStrategy

    更多信息请参见Reference Manual

    【讨论】:

    • matchRefreshService().processRequest 将包含基于RefreshRequest 消息从REST(10 个批次)获取一些数据的代码,然后在这10 个大小的批次中将此消息发布给refreshOutboundEndpoint。这与聚合器(逻辑上)有点不同,因为processRequest 实际上是数据源。除非你认为我让 processRequest 看起来像聚合器并以 10 个批次发布,然后休眠然后释放 10 个......直到我释放所有 1000 条消息?
    • M-m-m。好的。让我重新表述您的评论,以确定我们在同一页上。您想基于一条消息执行一些自定义逻辑并生成1000/10 消息到outputChannel。这可以通过 splitter 然后 - 一对多组件来完成。是否接近您的预期?
    • 分离器似乎没问题。但我不会一次生成 1000 条消息然后拆分它们。我需要在 10 之后生产 10 并以这种方式发布。所以,我将在该组件中有 100 个休息调用。
    • 对,您收到 1 条消息,构建 1000 个迭代它们并返回 100 x 10 并将此自定义代码用于 .split()
    • 当你说构建 1000 意味着我必须等待所有 1000 并且当我有前 10 条消息要发送时无法将消息推送到管道中?
    猜你喜欢
    • 2012-03-04
    • 1970-01-01
    • 1970-01-01
    • 2010-12-02
    • 2019-07-03
    • 1970-01-01
    • 2017-02-04
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多