【发布时间】:2019-07-30 09:16:25
【问题描述】:
您好,提前感谢您检查此问题。
我们有一个基于文件的 spring 集成 (4.3.19.RELEASE) 管道,其中提取、提取 zip 并在管道中处理每个 csv 文件。 这些被更改,然后通过一些自定义逻辑导入到数据库中。
问题: 我们有一个案例,聚合器没有收到预期的消息数量并默默地取消进程(不调用下一个通道/服务激活器)
弹簧集成管道:
- 读取 48 个 CSV 文件
- 丰富一些标头数据
- 处理(使用拆分器)csv 文件。
- 更多的标头标头丰富
- 几个服务激活器(一个接一个)
- header-value-router -> 向“subaggregator”(默认)和“pimAggrator”发送消息,以防特殊标头(代码示例如下) -> subaggregator 只是转发到“pimAggrator”
- “pimAggrator”转发到服务激活器(这里描述的问题不会发生这种情况)
目前调试:
聚合器使用默认的“SequenceSizeReleaseStrategy”。
据我了解的策略,当接收到的消息计数达到序列计数时,它会释放通道。
我在“if”行中设置了一个断点
int sequenceSize = messageGroup.getSequenceSize();
// If there is no sequence then it must be incomplete....
if (sequenceSize == size) {
canRelease = true;
}
- 序列计数似乎是传入文件的数量(从解压缩通道中提取的文件的数量)。
- 处理 CSV 文件(例如 10 个)。
- 如果此类 csv 文件仅包含 csv 的标头(= 没有数据行),则不会将新的 csv 文件写入磁盘以进行进一步处理。
- “pimAggregatorChannel”(前面提到过)需要 10 条消息,但只接收 9 条消息(因为 1 个文件没有数据线,因此没有写入磁盘以进行进一步处理)。
- “pimAggregatorChannel”不会继续到以下服务激活器。
- 我没有发现任何例外。
... file polling. csv proprozessing (e.g. substititung values)....
<!-- 5.1 move the files (= ZIP entries) to processing and re-configure output directories -->
<file:outbound-gateway request-channel="pimFileProcessingChannel"
reply-channel="pimFileHeaderSetupChannel"
directory="#{baseDirectoryPim}/processing" delete-source-files="true"/>
.... a bunch of service-activators, processing the files. (mostly data enrichment for further processing) ....
<int:header-value-router input-channel="pimAggregationRoutingChannel"
default-output-channel="pimSubAggregatorChannel"
resolution-required="false"
header-name="#{T(...PimFileHeaders).FILETYPE}">
<int:mapping value="#{T(...PimFileType).ITEM.name()}"
channel="pimAggregatorChannel"/>
</int:header-value-router>
<int:aggregator input-channel="pimSubAggregatorChannel"
output-channel="pimAggregatorChannel"/>
<int:aggregator input-channel="pimAggregatorChannel"
output-channel="pimPerformCleanUpDbChannel"/>
为您的理解添加了评论
// this methods contains the logic for the splitting messages / writing the new files/message payloads.
public final List<Message<byte[]>> preProcessCsv(final Message<byte[]> message) throws IOException {
final String originalFilename = (String) message.getHeaders().get(FileHeaders.FILENAME);
// dataPerKey is an empty Map for the csv file which contains no data
final Map<String, byte[]> dataPerKey = processPayload(message);
// hence no splitMessages will be generated.
final List<Message<byte[]>> splitMessages = new ArrayList<>();
if (MapUtils.isNotEmpty(dataPerKey)) {
createNewMessages(originalFilename, dataPerKey, splitMessages);
}
// returns the empty obviously empty list.
return splitMessages;
}
当处理的csv文件不包含数据线时,控制台写入如下调试信息:
[MethodInvokingSplitter] handler 'pimFileNavigationTextpoolRefChain$child#0.handler' produced no reply for request Message: GenericMessage [payload=byte[59], headers={file_name_original=310_NavigationRefTextpool.csv, zip_entryPath=, sequenceNumber=21, zip_name=P_pim_import4189562333325735125.zip, file_name=NavigationRefTextpool-1564409172681.csv, sequenceSize=48, file_type=NAVIGATION_TEXTPOOL_REFERENCE, correlationId=43450967-5c51-105b-172b-9093f8e2b3e9, history=pimZipInputChannel,pimUnzipChannel,pimFileFormatChannel,pimFileFormatChain,pimFileRoutingChannel,pimFileNavigationTextpoolRefChannel,pimFileNavigationTextpoolRefChain, id=23e40731-eb8a-3411-541a-65fe56b552b4, timestamp=1564409172682}]
实际行为:
- 当 csv 文件在“pimAggregatorChannel”之后不包含任何数据行时,管道不会继续。
预期/想要的行为:
- 我希望“pimAggregatorChannel”仍然调用服务激活器而不是静默取消。
- 当文件不包含任何数据时,没有什么可做的,实际上这是一个有效的情况。
再次感谢您的阅读和帮助。
【问题讨论】: