【问题标题】:Spring Integration Mail: Send email after all database insertsSpring Integration Mail:在所有数据库插入后发送电子邮件
【发布时间】:2020-01-29 01:01:51
【问题描述】:

您好,我有一个集成流程,它逐行拆分文件,将每一行转换为 POJO,然后通过 JDBC 出站网关将该 POJO 插入到数据库中。

我希望在文件处理完成后能够发送一封电子邮件。 我目前在我的 jdbcOutboundGateway 之后发送到 smtpFlow 通道,但是这是在每次插入数据库后发送一封电子邮件。

这是我当前的流程 DSL

IntegrationFlow ftpFlow() {
    return IntegrationFlows.from(
            ftpSource(), spec -> spec.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)))
            .split(splitFile())
            .transform(this::transformToIndividualScore)
            .handle(jdbcOutboundGateway(null))
            .channel("smtpFlow")
            .get();

jdbcOutboundGateway 中处理完所有文件后,如何让此流程仅发送一封电子邮件?

这是我的splitFile() 方法

@Bean
FileSplitter splitFile() {
    FileSplitter fs = new FileSplitter(true, false);
    fs.setFirstLineAsHeader("IndividualScore");
    return fs;

这是我的transformToIndividualScore 方法

@Transformer
private IndividualScore transformToIndividualScore(String payload) {
    String[] values = payload.split(",");
    IndividualScore is = new IndividualScore();
    is.setScorecardDate(values[0]);
    is.setVnSpId(values[1]);
    is.setPrimaryCat(values[2]);
    is.setSecondaryCat(values[3]);
    is.setScore(Integer.parseInt(values[4]));
    is.setActual(values[5]);
    return is;
}

【问题讨论】:

    标签: spring-integration spring-integration-dsl spring-integration-ftp spring-integration-jdbc


    【解决方案1】:

    在句柄后添加.aggregate(),将拆分结果重新组合成一条消息。

    【讨论】:

    • 另一种方法是使用publishSubscribeChannel()split()jdbcOutboundGateway() 作为一个订阅者子流,Mail.outboundAdapter() 作为另一个订阅者子流。一次完成您要求的所有文件确实是不可能的:没有一个标志来检查所有文件是否已完成。如果在处理现有文件期间有新文件到达怎么办?.. 考虑将您的逻辑更改为基于每个文件的基础。或者正如 Gary 所说:使用一些聚合器策略在一定数量的文件或超时后发出电子邮件。
    • 哦,对了;抱歉,我误读了这个问题 - 聚合器将为每个文件发送一封邮件,而不是为每个插入发送一封邮件;您需要 @ArtemBilan 对所有文件的一条消息的建议。
    • 当我在.handle(jdbcOutboundGateway(null)) 之后删除.aggregate() 它并没有完成流程,它只是挂起。
    • @ArtemBilan 是的!现在对我来说很有意义。我对集成很陌生,它是一个很棒的框架。
    • 我将很快更新我的代码并发布一条新消息并提供解决方案。
    【解决方案2】:

    所以解决了我的问题,(有点)。

    在我的FileSplitter 上将迭代器标记为false 现在允许排序标头。

    更新后的splitFile() 如下

    @Bean
    FileSplitter splitFile() {
        FileSplitter fs = new FileSplitter(false, false);
        fs.setFirstLineAsHeader("IndividualScore");
        fs.setApplySequence(true);
        return fs;
    }
    

    我的直觉告诉我,默认的.aggregate()发布策略一定是消息头sequenceSize==消息聚合列表。

    在创建FileSplitter 并将iterator 设置为true 时,sequenceSize 设置为0,这将永远不会满足默认.aggregate() 的发布策略

    但是,这使得FileSplitter 使用List 将文件的所有行存储在内存中。聚合器还在内存中存储了另一个ArrayList 行..

    是否有更好的解决方案来创建处理 END FileMarker 的自定义聚合器以允许使用迭代器来拆分文件?

    【讨论】:

      【解决方案3】:

      在@ArtemBilan 的帮助下

      我能够依次使用publishSubscribeChannel() 和链2 subscribe() 方法,下面是新的IntegrationFlow

       @Bean
      IntegrationFlow ftpFlow() {
          return IntegrationFlows.from(
                  ftpSource(), spec -> spec.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)))
                  .publishSubscribeChannel(channel -> channel
                          .subscribe(
                              a -> a
                                      .split(splitFile())
                                      .transform(this::transformToIndividualScore)
                                      .handle(jdbcMessageHandler(null)))
                          .subscribe(
                              b -> b
                                      .transform(this::transformToSuccessEmail)
                                      .handle(emailHandler()))
                  )
                  .get();
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2017-03-08
        • 2022-11-13
        • 2014-10-03
        • 2011-04-22
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多