【发布时间】:2020-11-22 16:13:27
【问题描述】:
上下文:将 Excel 文件作为 Flux 加载和流式传输。使用 R2DBC 处理 Flux 记录并将它们插入数据库。
implementation("org.apache.poi:poi:4.1.2") - apache lib which has excel domain of workbooks/sheets/rows/cells
implementation("org.apache.poi:poi-ooxml:4.1.2")
implementation("com.monitorjbl:xlsx-streamer:2.1.0") - streamer wrapper which avoids loading entire excel file into the memory and parses chunks of a file
将文件转换为 Flux(提取标题作为第一行,然后将其粘贴到来自 Flux 的每个后续事件/行):
override fun extract(inputStream: InputStream): Flux<Map<String, String>> {
val workbook: Workbook = StreamingReader.builder()
.rowCacheSize(10) // number of rows to keep in memory (defaults to 10)
.bufferSize(4096) // buffer size to use when reading InputStream to file (defaults to 1024)
.open(inputStream)
val xsltRows = Flux.fromIterable(workbook).flatMap { Flux.fromIterable(it) }
return xsltRows.next()
.map { rowToHeader(it) }
.flatMapMany { header -> xsltRows.map { combineToMap(header, it) } }
}
随后,我将此 Flux 处理为 Spring R2DBC 存储库的域模型,并将条目插入 MySQL 数据库。
问题:我缺少一个 Excel 行(大约 2 k)。它始终是同一行,但该行中的数据没有什么特别之处。
调用 combineToMap 方法,该方法将标题中的名称与每个单元格值相关联,它还打印文件中的行逻辑序列号:
private fun combineToMap(header: Map<Int, String>, row: Row): Map<String, String> {
val mapRow: MutableMap<String, String> = mutableMapOf()
val logicalRowNum = row.rowNum+1
logger.info("Processing row: $logicalRowNum")
for (cell in row) {
if (cell.columnIndex >= header.keys.size) {
continue
}
val headerName = header[cell.columnIndex].takeUnless { it.isNullOrBlank() }
?: throw IllegalStateException("No header name for ${cell.columnIndex} column index for header " +
"$header and cell ${cell.stringCellValue} row index ${row.rowNum}")
mapRow[headerName] = cell.stringCellValue
mapRow["row"] = logicalRowNum.toString()
}
return mapRow
}
当我添加日志行时,我注意到以下内容:
2020-11-22 15:49:56.684 INFO 20034 --- [ Test worker] c.b.XSLXFileRecordsExtractor : Processing row: 255
2020-11-22 15:49:56.687 INFO 20034 --- [ Test worker] c.b.XSLXFileRecordsExtractor : Processing row: 256
2020-11-22 15:49:56.689 INFO 20034 --- [ Test worker] c.b.XSLXFileRecordsExtractor : Processing row: 257
2020-11-22 15:50:02.458 INFO 20034 --- [tor-tcp-epoll-1] c.b.XSLXFileRecordsExtractor : Processing row: 259
2020-11-22 15:50:02.534 INFO 20034 --- [tor-tcp-epoll-1] c.b.XSLXFileRecordsExtractor : Processing row: 260
2020-11-22 15:50:02.608 INFO 20034 --- [tor-tcp-epoll-1] c.b.XSLXFileRecordsExtractor : Processing row: 261
请注意,调度程序在 257 行之后切换,在切换期间我丢失了 258 行。游泳池:
tor-tcp-epoll-1
理解为 Spring R2DBC 内部池。
在我的下游,如果不是执行repository.save,而是返回静态Mono.just(entity),我会返回我的 258 行,请注意调度程序也没有切换。
2020-11-22 16:01:14.000 INFO 21959 --- [ Test worker] c.b.XSLXFileRecordsExtractor : Processing row: 257
2020-11-22 16:01:14.006 INFO 21959 --- [ Test worker] c.b.XSLXFileRecordsExtractor : Processing row: 258
2020-11-22 16:01:14.009 INFO 21959 --- [ Test worker] c.b.XSLXFileRecordsExtractor : Processing row: 259
这是 Excel 库的问题还是我的实现问题?为什么我在切换 TP 时会丢失记录?
附:除了调用 Spring R2DBC 存储库之外,我没有指定任何调度程序或并行块或任何与我的流程中的任何地方的线程混淆的东西。
我将尝试使用implementation("org.apache.commons:commons-csv:1.8") 重写并观察是否会发生同样的情况,但如果有人能在其他地方发现任何明显或经历过类似的事情,我将不胜感激。
【问题讨论】:
标签: spring project-reactor reactor-netty spring-data-r2dbc r2dbc