【问题标题】:Losing event when reactor is switching scheduler反应堆切换调度程序时丢失事件
【发布时间】:2020-11-22 16:13:27
【问题描述】:

上下文:将 Excel 文件作为 Flux 加载和流式传输。使用 R2DBC 处理 Flux 记录并将它们插入数据库。

implementation("org.apache.poi:poi:4.1.2") - apache lib which has excel domain of workbooks/sheets/rows/cells
implementation("org.apache.poi:poi-ooxml:4.1.2")
implementation("com.monitorjbl:xlsx-streamer:2.1.0") - streamer wrapper which avoids loading entire excel file into the memory and parses chunks of a file

将文件转换为 Flux(提取标题作为第一行,然后将其粘贴到来自 Flux 的每个后续事件/行):

override fun extract(inputStream: InputStream): Flux<Map<String, String>> {
        val workbook: Workbook = StreamingReader.builder()
                .rowCacheSize(10) // number of rows to keep in memory (defaults to 10)
                .bufferSize(4096) // buffer size to use when reading InputStream to file (defaults to 1024)
                .open(inputStream)

        val xsltRows = Flux.fromIterable(workbook).flatMap { Flux.fromIterable(it) }

        return xsltRows.next()
                .map { rowToHeader(it) }
                .flatMapMany { header -> xsltRows.map { combineToMap(header, it) } }
    }

随后,我将此 Flux 处理为 Spring R2DBC 存储库的域模型,并将条目插入 MySQL 数据库。

问题:我缺少一个 Excel 行(大约 2 k)。它始终是同一行,但该行中的数据没有什么特别之处。

调用 combineToMap 方法,该方法将标题中的名称与每个单元格值相关联,它还打印文件中的行逻辑序列号

private fun combineToMap(header: Map<Int, String>, row: Row): Map<String, String> {

        val mapRow: MutableMap<String, String> = mutableMapOf()
        val logicalRowNum = row.rowNum+1

        logger.info("Processing row: $logicalRowNum")

        for (cell in row) {
            if (cell.columnIndex >= header.keys.size) {
                continue
            }

            val headerName = header[cell.columnIndex].takeUnless { it.isNullOrBlank() }
                             ?: throw IllegalStateException("No header name for ${cell.columnIndex} column index for header " +
                                                            "$header and cell ${cell.stringCellValue} row index ${row.rowNum}")

            mapRow[headerName] = cell.stringCellValue
            mapRow["row"] = logicalRowNum.toString()

        }


        return mapRow
    }

当我添加日志行时,我注意到以下内容:

2020-11-22 15:49:56.684  INFO 20034 --- [    Test worker] c.b.XSLXFileRecordsExtractor    : Processing row: 255
2020-11-22 15:49:56.687  INFO 20034 --- [    Test worker] c.b.XSLXFileRecordsExtractor    : Processing row: 256
2020-11-22 15:49:56.689  INFO 20034 --- [    Test worker] c.b.XSLXFileRecordsExtractor    : Processing row: 257
2020-11-22 15:50:02.458  INFO 20034 --- [tor-tcp-epoll-1] c.b.XSLXFileRecordsExtractor    : Processing row: 259
2020-11-22 15:50:02.534  INFO 20034 --- [tor-tcp-epoll-1] c.b.XSLXFileRecordsExtractor    : Processing row: 260
2020-11-22 15:50:02.608  INFO 20034 --- [tor-tcp-epoll-1] c.b.XSLXFileRecordsExtractor    : Processing row: 261

请注意,调度程序在 257 行之后切换,在切换期间我丢失了 258 行。游泳池:

tor-tcp-epoll-1

理解为 Spring R2DBC 内部池。

在我的下游,如果不是执行repository.save,而是返回静态Mono.just(entity),我会返回我的 258 行,请注意调度程序也没有切换。

2020-11-22 16:01:14.000  INFO 21959 --- [    Test worker] c.b.XSLXFileRecordsExtractor    : Processing row: 257
2020-11-22 16:01:14.006  INFO 21959 --- [    Test worker] c.b.XSLXFileRecordsExtractor    : Processing row: 258
2020-11-22 16:01:14.009  INFO 21959 --- [    Test worker] c.b.XSLXFileRecordsExtractor    : Processing row: 259

这是 Excel 库的问题还是我的实现问题?为什么我在切换 TP 时会丢失记录?

附:除了调用 Spring R2DBC 存储库之外,我没有指定任何调度程序或并行块或任何与我的流程中的任何地方的线程混淆的东西。

我将尝试使用implementation("org.apache.commons:commons-csv:1.8") 重写并观察是否会发生同样的情况,但如果有人能在其他地方发现任何明显或经历过类似的事情,我将不胜感激。

【问题讨论】:

    标签: spring project-reactor reactor-netty spring-data-r2dbc r2dbc


    【解决方案1】:

    最后我切换到commons-csv 没有同样的问题:

    2020-11-22 18:34:03.719  INFO 15733 --- [    Test worker] c.b.CSVFileRecordsExtractor     : Processing row: 256
    2020-11-22 18:34:09.062  INFO 15733 --- [tor-tcp-epoll-1] c.b.CSVFileRecordsExtractor     : Processing row: 257
    2020-11-22 18:34:09.088  INFO 15733 --- [tor-tcp-epoll-1] c.b.CSVFileRecordsExtractor     : Processing row: 258
    

    对于原始方法,我尝试将所有 xlsx-streamerpoi 发布在一个调度程序上,甚至强制 Spring R2DBC 在同一个单线程调度程序上发布,但它仍然跳过了记录。

    我可以观察到,当数据库回调开始出现时,无论哪个线程池,这正是记录丢失的确切时刻,似乎迭代器上下文被破坏了。

    我的意思是 xslx 库从来没有声称是响应式的,所以没有期望。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-07-27
      • 2021-03-08
      • 1970-01-01
      • 2010-11-28
      • 2022-10-13
      • 2013-08-02
      相关资源
      最近更新 更多