【发布时间】:2021-11-10 22:31:07
【问题描述】:
我试图了解doAfterTerminate 如何与delaySequence 一起使用。我有以下测试:
@Test
fun testDoAfterTerminate() {
logger.info("Starting test")
val sch = Schedulers.single()
val testFlux = Flux.fromArray(intArrayOf(1, 2, 3).toTypedArray())
.doAfterTerminate { logger.info("Finished processing batch!") }
.delaySequence(Duration.ofSeconds(1), sch)
.doOnNext { logger.info("Done $it")}
.doAfterTerminate { logger.info("Finished v2")}
StepVerifier.create(testFlux).expectNextCount(3).verifyComplete()
}
这个测试的输出是:
22:27:54.547 [Test worker] INFO leon.patmore.kafkareactive.TestReactor - Finished processing batch!
22:27:55.561 [single-1] INFO leon.patmore.kafkareactive.TestReactor - Done 1
22:27:55.561 [single-1] INFO leon.patmore.kafkareactive.TestReactor - Done 2
22:27:55.561 [single-1] INFO leon.patmore.kafkareactive.TestReactor - Done 3
22:27:55.562 [single-1] INFO leon.patmore.kafkareactive.TestReactor - Finished v2
有谁明白为什么第一个 doAfterTerminate 在通量完成之前被调用?
如果我删除 .delaySequence(Duration.ofSeconds(1), sch) 行,终止会按预期发生:
22:29:37.588 [Test worker] INFO leon.patmore.kafkareactive.TestReactor - Done 1
22:29:37.588 [Test worker] INFO leon.patmore.kafkareactive.TestReactor - Done 2
22:29:37.588 [Test worker] INFO leon.patmore.kafkareactive.TestReactor - Done 3
22:29:37.588 [Test worker] INFO leon.patmore.kafkareactive.TestReactor - Finished v2
22:29:37.588 [Test worker] INFO leon.patmore.kafkareactive.TestReactor - Finished processing batch!
谢谢!
【问题讨论】:
标签: java kotlin project-reactor