【问题标题】:Using both publishOn and subscribeOn on a flux results in nothing happening在通量上同时使用 publishOn 和 subscribeOn 不会导致任何事情发生
【发布时间】:2017-06-15 19:30:16
【问题描述】:

每当我同时使用 subscribeOn 和 publishOn 时,都不会打印任何内容。 如果我只使用一个,它将打印。 如果我使用 subscribeOn(Schedulers.immediate()) 或弹性它可以工作。 知道这是为什么吗?

据我了解,publishOn 会影响它发布的线程以及订阅订阅者运行的线程。你能指出我正确的方向吗?

fun test() {
        val testPublisher = EmitterProcessor.create<String>().connect()
        testPublisher
                .publishOn(Schedulers.elastic())
                .map { it ->
                    println("map on ${Thread.currentThread().name}")
                    it
                }
                .subscribeOn(Schedulers.parallel())  
                .subscribe { println("subscribe on ${Thread.currentThread().name}") }
        testPublisher.onNext("a")
        testPublisher.onNext("b")
        testPublisher.onNext("c")
        Thread.sleep(5000)
        println("---")
    }

【问题讨论】:

标签: spring kotlin project-reactor


【解决方案1】:

subscribeOn 会影响 订阅 发生的位置。即触发源发射元素的初始事件。另一方面,SubscriberonNext 钩子受到链中最近的 publishOn 的影响(很像您的 map)。

但是EmitterProcessor 和大多数Processors 一样,更先进,可以做一些偷窃工作。我不确定为什么您的案例中没有打印任何内容(您的示例转换为 Java 可以在我的机器上运行),但我敢打赌它与那个处理器有关。

这段代码可以更好地演示subscribeOn vs publishOn

Flux.just("a", "b", "c") //this is where subscription triggers data production
        //this is influenced by subscribeOn
        .doOnNext(v -> System.out.println("before publishOn: " + Thread.currentThread().getName()))
        .publishOn(Schedulers.elastic())
        //the rest is influenced by publishOn
        .doOnNext(v -> System.out.println("after publishOn: " + Thread.currentThread().getName()))
        .subscribeOn(Schedulers.parallel())
        .subscribe(v -> System.out.println("received " + v + " on " + Thread.currentThread().getName()));
    Thread.sleep(5000);

打印出来:

before publishOn: parallel-1
before publishOn: parallel-1
before publishOn: parallel-1
after publishOn: elastic-2
received a on elastic-2
after publishOn: elastic-2
received b on elastic-2
after publishOn: elastic-2
received c on elastic-2

【讨论】:

  • 感谢您为我澄清这方面的问题!
  • 但是我很难理解为什么我们需要两个?发布和订阅。我们试图实现的只是在单独的线程池中运行链的阻塞/耗时任务。
  • 在这种情况下你不一定需要这两个,subscribeOn 应该足以改变初始线程
  • publishOn 通常用于快速发布者、慢消费者场景。 subscribeOn 通常用于慢发布者、快消费者场景。正如@SimonBaslé 在这里解释的那样,对于subscribeOn,事件在同一线程上发布和接收,而publishOn 则不同。
猜你喜欢
  • 2019-07-24
  • 2020-08-06
  • 1970-01-01
  • 2018-06-12
  • 1970-01-01
  • 1970-01-01
  • 2011-08-12
  • 2018-03-18
  • 2018-11-13
相关资源
最近更新 更多