【问题标题】:doAfterTerminate happening before Flux completesdoAfterTerminate 在 Flux 完成之前发生
【发布时间】:2021-11-10 22:31:07
【问题描述】:

我试图了解doAfterTerminate 如何与delaySequence 一起使用。我有以下测试:

@Test
fun testDoAfterTerminate() {
    logger.info("Starting test")
    val sch = Schedulers.single()
    val testFlux = Flux.fromArray(intArrayOf(1, 2, 3).toTypedArray())
        .doAfterTerminate { logger.info("Finished processing batch!") }
        .delaySequence(Duration.ofSeconds(1), sch)
        .doOnNext { logger.info("Done $it")}
        .doAfterTerminate { logger.info("Finished v2")}
    StepVerifier.create(testFlux).expectNextCount(3).verifyComplete()
}

这个测试的输出是:

22:27:54.547 [Test worker] INFO leon.patmore.kafkareactive.TestReactor - Finished processing batch!
22:27:55.561 [single-1] INFO leon.patmore.kafkareactive.TestReactor - Done 1
22:27:55.561 [single-1] INFO leon.patmore.kafkareactive.TestReactor - Done 2
22:27:55.561 [single-1] INFO leon.patmore.kafkareactive.TestReactor - Done 3
22:27:55.562 [single-1] INFO leon.patmore.kafkareactive.TestReactor - Finished v2

有谁明白为什么第一个 doAfterTerminate 在通量完成之前被调用?

如果我删除 .delaySequence(Duration.ofSeconds(1), sch) 行,终止会按预期发生:

22:29:37.588 [Test worker] INFO leon.patmore.kafkareactive.TestReactor - Done 1
22:29:37.588 [Test worker] INFO leon.patmore.kafkareactive.TestReactor - Done 2
22:29:37.588 [Test worker] INFO leon.patmore.kafkareactive.TestReactor - Done 3
22:29:37.588 [Test worker] INFO leon.patmore.kafkareactive.TestReactor - Finished v2
22:29:37.588 [Test worker] INFO leon.patmore.kafkareactive.TestReactor - Finished processing batch!

谢谢!

【问题讨论】:

    标签: java kotlin project-reactor


    【解决方案1】:

    第一个doAfterTerminate 在主线程上触发,没有任何延迟。稍后,信号被延迟并在single()调度程序上继续。

    添加一些logs() 使其更清晰:

    INFO main r.F.P.1 - | onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
    INFO main r.Flux.Peek.2 - onSubscribe(FluxPeek.PeekSubscriber)
    INFO main r.Flux.Peek.2 - request(unbounded)
    INFO main r.F.P.1 - | request(unbounded)
    INFO main r.F.P.1 - | onNext(1)
    INFO main r.F.P.1 - | onNext(2)
    INFO main r.F.P.1 - | onNext(3)
    INFO main r.F.P.1 - | onComplete()
    Finished processing batch!
    Done 1
    Done 2
    INFO single-1 r.Flux.Peek.2 - onNext(1)
    Done 3
    INFO single-1 r.Flux.Peek.2 - onNext(2)
    INFO single-1 r.Flux.Peek.2 - onNext(3)
    INFO single-1 r.Flux.Peek.2 - onComplete()
    Finished v2
    

    【讨论】:

      猜你喜欢
      • 2020-10-16
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多