【问题标题】:SubscribeOn or PublishOn not working with ReactiveCassandraCrudRepositorySubscribeOn 或 PublishOn 不适用于 ReactiveCassandraCrudRepository
【发布时间】:2020-08-06 15:42:20
【问题描述】:

我们将 Reactive Spring Data Repository 与 Spring WebFlux 一起使用,我对 SubscribeOn 的理解是它决定了在 SubscribeOn 之前的操作员将在哪个线程池中执行,而 PublishOn 决定订阅将在其上执行的实际线程池。然而,在下面的代码中,即使使用 PublishOn 和 SubscribeOn,代码也不会在主线程上执行,而是回退到 Cluster-nio-worker-1。

    System.out.println("Current Thread :- "+Thread.currentThread().getName()); //Current Thread :- main
    personRepository.findAll().log()
            .map(document -> mapDocumentToSomethingElse(document)) //Current thread cluster-nio-worker-1
            .subscribeOn(Schedulers.immediate())
            .publishOn(Schedulers.immediate())
            .subscribe(trackingevent -> System.out.println("Got Item "+item +" inside thread "+Thread.currentThread()), //Thread[cluster-nio-worker-1,5,main]
                    excp -> excp.printStackTrace(),
                    () -> System.out.println("Completed processing Thread:- "+Thread.currentThread().getName())); //cluster-nio-worker-1

还有什么 Thread[cluster-nio-worker-1,5,main] 是什么意思?为什么这些方法调用不使用主线程执行。

【问题讨论】:

    标签: cassandra reactive-programming spring-webflux project-reactor spring-data-cassandra


    【解决方案1】:

    subscribeOn 方法使发布者使用给定的线程池来发布值。管道中可能有 N 个 subscribeOn 方法。最接近的将生效。 personRepository.findAll().log() 是一个包装器并返回一个通量。因此,如果它在内部使用任何调度程序,那么您不能使用 subscribeOn 更改它。例如,interval 方法使用并行,我无法将其更改为 boundedElastic,如下所示。

        Flux.interval(Duration.ofSeconds(1))
                .subscribeOn(Schedulers.boundedElastic())
    

    Schedulers.immediate 只是将管道执行保持在同一个线程中。它不是主要的,在您的情况下它将是 cluster-nio-worker 线程池。

    我们可以从主线程池切换到任何调度程序线程池。但是我们不能将执行切换回主线程。这不是项目反应堆的限制。应该是Java本身的限制吧。

    【讨论】:

    • 非常感谢,由于数据库池耗尽,我的应用程序经常出现 503 服务不可用。现在,大多数事情看起来都还不错:)
    猜你喜欢
    • 2018-06-12
    • 2019-07-24
    • 1970-01-01
    • 1970-01-01
    • 2017-06-15
    • 1970-01-01
    • 1970-01-01
    • 2012-06-29
    • 1970-01-01
    相关资源
    最近更新 更多