【问题标题】:Java reactor `suscribe` is sometime blocking, sometime notJava 反应器 `subscribe` 有时阻塞,有时不阻塞
【发布时间】:2023-01-10 23:42:58
【问题描述】:

我已经在 reactor 上玩了一段时间了,但我仍然需要得到一些东西。

这段代码

    Flux.range(1, 1000)
        .delayElements(Duration.ofNanos(1))
        .map(integer -> integer + 1)
        .subscribe(System.out::println);
    System.out.println("after");

退货:

after
2
3
4

预期作为订阅状态的文档:this will immediately return control to the calling thread.

那么,为什么是这段代码:

    Flux.range(1, 1000)
        .map(integer -> integer + 1)
        .subscribe(System.out::println);

回报

1
2
...
1000
1001
after

我永远无法弄清楚subscribe 何时会阻塞,这在编写批处理时非常烦人。

如果有人有答案,那将是惊人的

【问题讨论】:

    标签: java asynchronous reactive-programming project-reactor


    【解决方案1】:

    您的 sn-p 中没有阻止代码。

    在第一个示例中,您使用 .delayElements() 并将执行切换到另一个线程并释放您的主线程。所以你可以看到你的 System.out.println("after"); 立即在主线程中执行,而反应链正在 parallel-n 线程上执行。

    你的第一个例子:

    18:49:29.195 [main] INFO com.example.demo.FluxTest - AFTER
    18:49:29.199 [parallel-1] INFO com.example.demo.FluxTest - v: 2
    18:49:29.201 [parallel-2] INFO com.example.demo.FluxTest - v: 3
    18:49:29.202 [parallel-3] INFO com.example.demo.FluxTest - v: 4
    18:49:29.203 [parallel-4] INFO com.example.demo.FluxTest - v: 5
    18:49:29.205 [parallel-5] INFO com.example.demo.FluxTest - v: 6
    

    但是你的第二个例子没有切换执行线程,所以你的反应链在主线程上执行。完成后它会继续执行你的System.out.println("after");

    18:51:28.490 [main] INFO com.example.demo.FluxTest - v: 995
    18:51:28.490 [main] INFO com.example.demo.FluxTest - v: 996
    18:51:28.490 [main] INFO com.example.demo.FluxTest - v: 997
    18:51:28.490 [main] INFO com.example.demo.FluxTest - v: 998
    18:51:28.490 [main] INFO com.example.demo.FluxTest - v: 999
    18:51:28.490 [main] INFO com.example.demo.FluxTest - v: 1000
    18:51:28.490 [main] INFO com.example.demo.FluxTest - v: 1001
    18:51:28.491 [main] INFO com.example.demo.FluxTest - AFTER
    

    编辑:如果你想在你的第二个 sn-p 中切换线程,基本上你有两个选择:

    1. 在你的反应链的任何地方添加subscribeOn(<Scheduler>)。然后整个订阅过程将在您提供的调度程序的线程上发生。

    2. 添加publishOn(<Scheduler>),例如,在Flux.range()之后,发射本身将发生在您的调用线程上,但下游将在您提供的调度程序的线程上执行

    【讨论】:

    • 谢谢你的回答!你知道我如何让第一个 sn-p 在主线程中执行吗?用 suscribeOn() 也许吧?
    • 你不能在第一个 sn-p 中回到主线程,这就是 delayElements 的工作原理。更新了第二个 sn-p 中关于线程切换的答案
    猜你喜欢
    • 2012-12-11
    • 1970-01-01
    • 1970-01-01
    • 2016-07-04
    • 2016-01-15
    • 2014-05-25
    • 2015-08-06
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多