【问题标题】:How to use Project Reactor's Scheduler with Executor based libraries?如何将 Project Reactor 的调度程序与基于 Executor 的库一起使用?
【发布时间】:2020-07-02 03:11:18
【问题描述】:

Project Reactor 通过定义Scheduler 提供了一种很好的方法来定义代码在哪个线程池上运行。它还为使用 CompletableFutureMono.fromFuture(..) 的库提供了一座桥梁。

AWS 的 async client for DyanmoDB 执行它从对 java.util.concurrent.Executor 的 API 调用返回的 CompletableFuture。默认情况下,它创建一个由它也创建的线程池支持的Executor。结果是,即使是定义了Scheduler(如Mono.fromFuture(..).subscribeOn(Schedulers.boundedElastic()))的流也会在库创建的池中的线程上执行,而不是在Schedulers.boundedElastic() 中的线程上执行。所以我们看到像 sdk-async-response-0-2 这样的线程名称,而不是像 boundedElastic-1 这样的名称。

幸运的是,图书馆允许我们提供自己的Executorshown here,所以我的问题是:

你如何构建一个Executor,它在运行时使用来自Scheduler的线程在流的那部分定义

用例

我们有一个存储库类,它有一个 findById 方法,我们需要调用者能够控制在哪个 Scheduler 上运行,因为它用于这些截然不同的上下文中:

  1. Schedulers.boundedElastic() 调度程序上运行的 API 响应。
  2. 按照Reactor Kafka docs 中所示的已定义调度程序,在每个分区的线程上处理按顺序执行的 Kafka 消息。

尝试

我们尝试使用Schedulers.immediate()Runnable::run 定义Executor,如下所示,但两者都导致在 Netty 事件循环线程(示例名称:aws-java-sdk-NettyEventLoop-0-2)上执行,而不是来自定义Scheduler

DynamoDbAsyncClient.builder()
    .asyncConfiguration(builder -> builder.advancedOption(
        SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR,
        runnable -> Schedulers.immediate().schedule(runnable)
    ))
    .build();
DynamoDbAsyncClient.builder()
    .asyncConfiguration(builder -> builder.advancedOption(
        SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR,
        Runnable::run
    ))
    .build();

【问题讨论】:

  • 为什么要尝试使用即时调度程序?这不会切换线程。尝试使用其他调度程序,例如并行或弹性。
  • 我尝试了立即调度程序正是因为我不希望它从流中定义的内容切换线程。关于使用并行或弹性调度程序的建议,利用 Executor 中的其中一个将导致它始终被使用并忽略在流上设置的调度程序。在我们的用例中,我们需要它根据流中定义的内容在不同的线程上运行。
  • 所以如果我理解正确,你想切换事件循环线程。使用这种机制是不可能的。我怀疑这是可能的。如果您在项目的 GitHub 存储库中打开问题,您可能有更好的机会获得答案。
  • 不,我不想切换事件循环线程,也不想在它们上运行应用程序逻辑。我希望 DynamoDB 逻辑在运行时定义为 active 调度程序的一部分的线程上运行。因此,如果活动线程来自弹性线程池,或者为流的该部分定义的单个线程,我希望在其上执行 DynamoDB 逻辑。

标签: java apache-kafka amazon-dynamodb project-reactor aws-sdk-java-2.0


【解决方案1】:

第 1 部分。观察与订阅

查看问题,我发现在特定线程上执行后需要观察元素。 准确地说,observe 在这种情况下意味着*能够在某个特定线程上处理流中的值。在 RxJava 中,我们有一个适当的操作符,就像这样调用,但在 Project Reactor 中,我们将相同的操作称为 publishOn

因此, * 如果你想处理 Schedulers.boundedElastic() 上的数据 * 那么你应该使用以下结构

Mono.fromFuture(..)
    .publishOn(Schedulers.boundedElastic())

但是等等,.subscribeOn 也能正常工作???

阅读前面的构造,你可能会开始担心,因为你百分百确定

Mono.fromRunnable(..)
    .subscribeOn(Schedulers.boundedElastic())

在线程boundedElastic-1上发送onNext,那么同样的fromFuture有什么问题。

这里有一个技巧:

切勿将subscribeOnFutures / CompletableFuture 或任何可以在下面使用自己的异步机制的东西一起使用

如果我们查看subscribeOn 背后发生的事情,您会发现如下内容:

//  Simplified version of SubscribeOn operator
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
    Scheduler scheduler;
    Publisher<T> parent;
    scheduler.schedule(() -> parent.subscribe(actual));
}

这基本上意味着父级的subscribe 方法将在单独的线程上调用。

这种技术适用于fromRunnablefromSupplierfromCallable,因为它们的逻辑发生在subscribe 方法中:

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
    Operators.MonoSubscriber<T, T>
    sds = new Operators.MonoSubscriber<>(actual);

    actual.onSubscribe(sds);
    // skiped some parts 
    T t = supplier.get();
    if (t == null) {
        sds.onComplete();
    }
    else {
        sds.complete(t);
    }
}

这意味着它几乎等于

scheduler.schedule(() -> {
    T t = supplier.get();
    if (t == null) {
        sds.onComplete();
    }
    else {
        sds.complete(t);
    }
})

相比之下,fromFuture 的工作要复杂得多。 一个小测验。

我们可以在哪个线程上观察到一个值? (假设在 Main 线程上执行,任务在 ForkJoinPool 上执行)

var future = CompletableFuture
.supplyAsync(() -> {
  return value;
})
... // some code here, does not metter just code

future.thenAccept(value -> {
  System.out.println(Thread.currentThread())
});

还有正确答案....??????

可能是主线程
或者它可能是来自 ForkJoinPool 的线程
...
因为它是活泼的......而此时,我们消费价值,价值可能已经交付,所以我们只是在阅读器线程(线程Main)上读取volatile字段,否则,线程Main只是要设置一个@ 987654344@ 所以稍后将在ForkJoinPool 线程上调用接受器。

对,这就是为什么当你使用fromFuturesubscribeOn 时,不能保证subscribeOn 线程会观察到给定CompletableFuture 的值。

这就是为什么publishOn 是确保值处理发生在所需线程上的唯一方法。

好的,我应该一直使用publishOn吗???

是和不是。视情况而定。

如果您使用 Mono - 在 99% 的情况下,如果您想确保数据处理发生在特定线程上,则可以使用 publishOn - 始终使用 publishOn

不用担心潜在的开销,即使您不小心使用了 Project Reactor,它也会照顾您。 Project Reactor 有几个优化,可以在运行时用subscribeOn 替换你的publishOn(如果它在不破坏行为的情况下是安全的),所以你会得到最好的。

Part 2. 掉进Scheduelrs的兔子洞

永远不要使用Schedulers.immediate()

它几乎是无操作调度器,基本上可以

Schedulers.immediate().scheduler(runnable) {
   runnable.run()
}

是的,它对 reactor 用户没有任何用处,我们仅将其用于内部需求。

好的,那么我如何使用调度器在命令式世界中作为执行器使用它

有两种选择:

快速路径:分步指南

1.a) 创建有界Executor。 (例如Executors.fixed...
1.b) 如果您想获得周期性任务和延迟任务的力量,请创建您的有界ScheduledExecutorService
2) 使用Schedulers.fromExecutorXXX API 从您的执行程序创建一个Scheduler
3) 在命令式世界中使用您的有界Executor,使用您的Scheduler,它是对反应式世界的有界的包装器

漫长的道路

即将推出...

第 3 部分。如何序列化执行。

即将推出

【讨论】:

  • 嘿@Oleh,很好的答案!老实说,对于初学者来说超级不明显的东西,这应该在文档的某个地方更清楚
猜你喜欢
  • 2021-02-04
  • 1970-01-01
  • 2021-07-30
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2022-12-10
相关资源
最近更新 更多