【问题标题】:Conditional File movement in Spring Integration File SupportSpring 集成文件支持中的条件文件移动
【发布时间】:2020-07-12 11:09:36
【问题描述】:

我正在实施一个 Spring Integration 工作流程,如下所示。

IntegrationFlows.from("inputFileProcessorChannel")
                           .split(fileSplitterSpec, spec -> {})
                           .transform(lineItemTransformer)
                           .handle(httpRequestExecutingMessageHandler)
                           .transform(reportDataAggregator)
                           .aggregate(aggregatorSpec -> aggregatorSpec.requiresReply(false))
                           .channel("reportGeneratorChannel")
                           .get();

现在,一旦完成上述流程,我需要将input file 移动到存档目录。决定目标目录的决定基于消息头processingFailed,并且此头添加到流程中的.transform(reportDataAggregator) 步骤中。为了移动这些文件,我创建了另一个流程,如下面的代码

 IntegrationFlows.from(MessageChannels.direct("inputFileProcessorChannel"))
                           .routeToRecipients(routerSpec -> {
                               routerSpec.recipient("processedFileMoverChannel", createMessageSelector(Boolean.FALSE))
                                         .recipient("failedFileMoverChannel", createMessageSelector(Boolean.TRUE));
                           })

                           .get();  

选择器方法

 private MessageSelector createMessageSelector(Boolean ruleBoolean) {
    return message -> ruleBoolean.equals(message.getHeaders().get("processingFailed"));
}

报告下面的渠道流

IntegrationFlows.from("reportGeneratorChannel")
                           .transform(executionReportTransformer)
                           .handle(reportWritingMessageHandlerSpec)
                           .get();

但是,正如该流程所预期的那样,文件移动并未完成,因为所述标头不存在于流程执行中。

那么,如何在报表文件创建后执行file mover flow来实现这个目标呢?

【问题讨论】:

  • 您有两个来自 inputFileProcessorChannel 的流使用 - 如果它是 DirectChannel,它们将获得备用消息(默认情况下)。 reportGeneratorChannel 消耗了什么?
  • 是的,谢谢 Gary,我在 DirectChannel 中看到了这个问题。会改变它。 reportGeneratorChannel是终端操作。之后没有消费者

标签: spring spring-integration spring-integration-dsl


【解决方案1】:

FileSplitter 为我们填充此标题以生成每一行:

@Override
protected boolean willAddHeaders(Message<?> message) {
    Object payload = message.getPayload();
    return payload instanceof File || payload instanceof String;
}

@Override
protected void addHeaders(Message<?> message, Map<String, Object> headers) {
    File file = null;
    if (message.getPayload() instanceof File) {
        file = (File) message.getPayload();
    }
    else if (message.getPayload() instanceof String) {
        file = new File((String) message.getPayload());
    }
    if (file != null) {
        if (!headers.containsKey(FileHeaders.ORIGINAL_FILE)) {
            headers.put(FileHeaders.ORIGINAL_FILE, file);
        }
        if (!headers.containsKey(FileHeaders.FILENAME)) {
            headers.put(FileHeaders.FILENAME, file.getName());
        }
    }
}

因此,即使您完成了聚合并准备向该 .channel("reportGeneratorChannel") 发送消息,您仍然可以访问那些与文件相关的标头。

将此reportGeneratorChannel 设为PublishSubscribeChannel 并将“文件移动器流”移到那里,对您有用。

顺便说一句:到目前为止,您所拥有的 IntegrationFlows.from(MessageChannels.direct("inputFileProcessorChannel")) 和同一通道上的第二个流将引导您进行循环调度。那不是发布-订阅分发。在文档中查看更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/core.html#channel

【讨论】:

  • 将此reportGeneratorChannel 设为PublishSubscribeChannel :这样做后The Payload 传递给文件移动器流将是来自聚合器的列表,它会尝试仅在文件中复制此数据?不是吗?
  • 您可以将该移动器流的第一个端点设置为transform(),以将您的File 从标头移动到payload
  • 太棒了。谢谢。
  • 嗨,Artem,我可以在异步模式下执行 httpRequestExecutingMessageHandler 并让其他所有事情保持原样吗?
  • 如果在split() 之后放置一个ExecutorChannel,则无需担心HTTP 的异步模式:一切都将在各自的线程上并行执行。对于异步行为,您需要查看 WebFlux 支持:docs.spring.io/spring-integration/docs/current/reference/html/…
猜你喜欢
  • 2018-08-16
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-07-02
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-04-03
相关资源
最近更新 更多