【问题标题】:Spring aggregrator stops processing when message is missing消息丢失时 Spring 聚合器停止处理
【发布时间】:2019-07-30 09:16:25
【问题描述】:

您好,提前感谢您检查此问题。

我们有一个基于文件的 spring 集成 (4.3.19.RELEASE) 管道,其中提取、提取 zip 并在管道中处理每个 csv 文件。 这些被更改,然后通过一些自定义逻辑导入到数据库中。

问题: 我们有一个案例,聚合器没有收到预期的消息数量并默默地取消进程(不调用下一个通道/服务激活器)

弹簧集成管道:

  • 读取 48 个 CSV 文件
  • 丰富一些标头数据
  • 处理(使用拆分器)csv 文件。
  • 更多的标头标头丰富
  • 几个服务激活器(一个接一个)
  • header-value-router -> 向“subaggregator”(默认)和“pimAggrator”发送消息,以防特殊标头(代码示例如下) -> subaggregator 只是转发到“pimAggrator”
  • “pimAggrator”转发到服务激活器(这里描述的问题不会发生这种情况)

目前调试:

聚合器使用默认的“SequenceSizeReleaseStrategy”。

据我了解的策略,当接收到的消息计数达到序列计数时,它会释放通道。

我在“if”行中设置了一个断点

int sequenceSize = messageGroup.getSequenceSize();
// If there is no sequence then it must be incomplete....
    if (sequenceSize == size) {
    canRelease = true;
    }
  • 序列计数似乎是传入文件的数量(从解压缩通道中提取的文件的数量)。
  • 处理 CSV 文件(例如 10 个)。
  • 如果此类 csv 文件仅包含 csv 的标头(= 没有数据行),则不会将新的 csv 文件写入磁盘以进行进一步处理。
  • “pimAggregatorChannel”(前面提到过)需要 10 条消息,但只接收 9 条消息(因为 1 个文件没有数据线,因此没有写入磁盘以进行进一步处理)。
  • “pimAggregatorChannel”不会继续到以下服务激活器。
  • 我没有发现任何例外。
... file polling. csv proprozessing (e.g. substititung values).... 

<!-- 5.1 move the files (= ZIP entries) to processing and re-configure output directories -->
<file:outbound-gateway request-channel="pimFileProcessingChannel" 
    reply-channel="pimFileHeaderSetupChannel"
    directory="#{baseDirectoryPim}/processing" delete-source-files="true"/>

.... a bunch of service-activators, processing the files. (mostly data enrichment for further processing) ....


<int:header-value-router input-channel="pimAggregationRoutingChannel"
     default-output-channel="pimSubAggregatorChannel"
     resolution-required="false"
     header-name="#{T(...PimFileHeaders).FILETYPE}">
     <int:mapping value="#{T(...PimFileType).ITEM.name()}"
          channel="pimAggregatorChannel"/>
</int:header-value-router>

<int:aggregator input-channel="pimSubAggregatorChannel"
    output-channel="pimAggregatorChannel"/>

<int:aggregator input-channel="pimAggregatorChannel"
    output-channel="pimPerformCleanUpDbChannel"/>

为您的理解添加了评论

// this methods contains the logic for the splitting messages / writing the new files/message payloads. 
    public final List<Message<byte[]>> preProcessCsv(final Message<byte[]> message) throws IOException {
        final String originalFilename = (String) message.getHeaders().get(FileHeaders.FILENAME);
        // dataPerKey is an empty Map for the csv file which contains no data
        final Map<String, byte[]> dataPerKey = processPayload(message);

        // hence no splitMessages will be generated.
        final List<Message<byte[]>> splitMessages = new ArrayList<>();
        if (MapUtils.isNotEmpty(dataPerKey)) {
            createNewMessages(originalFilename, dataPerKey, splitMessages);
        }
        // returns the empty obviously empty list. 
        return splitMessages;
    }



当处理的csv文件不包含数据线时,控制台写入如下调试信息:

[MethodInvokingSplitter] handler 'pimFileNavigationTextpoolRefChain$child#0.handler' produced no reply for request Message: GenericMessage [payload=byte[59], headers={file_name_original=310_NavigationRefTextpool.csv, zip_entryPath=, sequenceNumber=21, zip_name=P_pim_import4189562333325735125.zip, file_name=NavigationRefTextpool-1564409172681.csv, sequenceSize=48, file_type=NAVIGATION_TEXTPOOL_REFERENCE, correlationId=43450967-5c51-105b-172b-9093f8e2b3e9, history=pimZipInputChannel,pimUnzipChannel,pimFileFormatChannel,pimFileFormatChain,pimFileRoutingChannel,pimFileNavigationTextpoolRefChannel,pimFileNavigationTextpoolRefChain, id=23e40731-eb8a-3411-541a-65fe56b552b4, timestamp=1564409172682}]

实际行为:

  • 当 csv 文件在“pimAggregatorChannel”之后不包含任何数据行时,管道不会继续。

预期/想要的行为:

  • 我希望“pimAggregatorChannel”仍然调用服务激活器而不是静默取消。
  • 当文件不包含任何数据时,没有什么可做的,实际上这是一个有效的情况。

再次感谢您的阅读和帮助。

【问题讨论】:

    标签: spring-integration


    【解决方案1】:

    最简单的解决方案是始终写入文件,即使它是空的。

    只要所有内容都按顺序处理(不是QueueChannels 或ExecutorChannels),那么接下来最简单的就是自定义发布策略。

    SequenceSizeReleaseStrategy 只是查看消息组的大小。

    自定义发布策略可以查看组内最后一条消息的序号头,当序号等于序列大小时释放。

    这适用于所有情况,除非最后一个文件为“空”并因此丢失。

    您可以通过组超时来处理这种情况 - 如果没有更多消息到达,则可以释放部分组。

    或者,您可以只使用组超时而不使用自定义释放策略,并始终根据超时释放。

    【讨论】:

    • 你好,Gary,非常感谢你的回答,我会仔细审查这个答案并考虑解决方案,并会很快回来接受或进一步的问题;-)。
    • 感谢 Gary 的帮助,我们将寻求“最简单的解决方案是始终写入文件,即使它是空的。”解决方案。但是我们没有发送文件,我现在明白了我们如何处理“损坏”数据。这个解决方案让我走上了正确的轨道。我们正在创建“损坏/空”数据消息。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2023-03-30
    • 2019-07-07
    • 1970-01-01
    • 2011-08-06
    • 1970-01-01
    • 2017-09-02
    • 2013-02-27
    相关资源
    最近更新 更多