【发布时间】:2020-07-02 03:11:18
【问题描述】:
Project Reactor 通过定义Scheduler 提供了一种很好的方法来定义代码在哪个线程池上运行。它还为使用 CompletableFuture 和 Mono.fromFuture(..) 的库提供了一座桥梁。
AWS 的 async client for DyanmoDB 执行它从对 java.util.concurrent.Executor 的 API 调用返回的 CompletableFuture。默认情况下,它创建一个由它也创建的线程池支持的Executor。结果是,即使是定义了Scheduler(如Mono.fromFuture(..).subscribeOn(Schedulers.boundedElastic()))的流也会在库创建的池中的线程上执行,而不是在Schedulers.boundedElastic() 中的线程上执行。所以我们看到像 sdk-async-response-0-2 这样的线程名称,而不是像 boundedElastic-1 这样的名称。
幸运的是,图书馆允许我们提供自己的Executor 为shown here,所以我的问题是:
你如何构建一个
Executor,它在运行时使用来自Scheduler的线程在流的那部分定义?
用例
我们有一个存储库类,它有一个 findById 方法,我们需要调用者能够控制在哪个 Scheduler 上运行,因为它用于这些截然不同的上下文中:
- 在
Schedulers.boundedElastic()调度程序上运行的 API 响应。 - 按照Reactor Kafka docs 中所示的已定义调度程序,在每个分区的线程上处理按顺序执行的 Kafka 消息。
尝试
我们尝试使用Schedulers.immediate() 和Runnable::run 定义Executor,如下所示,但两者都导致在 Netty 事件循环线程(示例名称:aws-java-sdk-NettyEventLoop-0-2)上执行,而不是来自定义Scheduler。
DynamoDbAsyncClient.builder()
.asyncConfiguration(builder -> builder.advancedOption(
SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR,
runnable -> Schedulers.immediate().schedule(runnable)
))
.build();
DynamoDbAsyncClient.builder()
.asyncConfiguration(builder -> builder.advancedOption(
SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR,
Runnable::run
))
.build();
【问题讨论】:
-
为什么要尝试使用即时调度程序?这不会切换线程。尝试使用其他调度程序,例如并行或弹性。
-
我尝试了立即调度程序正是因为我不希望它从流中定义的内容切换线程。关于使用并行或弹性调度程序的建议,利用 Executor 中的其中一个将导致它始终被使用并忽略在流上设置的调度程序。在我们的用例中,我们需要它根据流中定义的内容在不同的线程上运行。
-
所以如果我理解正确,你想切换事件循环线程。使用这种机制是不可能的。我怀疑这是可能的。如果您在项目的 GitHub 存储库中打开问题,您可能有更好的机会获得答案。
-
不,我不想切换事件循环线程,也不想在它们上运行应用程序逻辑。我希望 DynamoDB 逻辑在运行时定义为 active 调度程序的一部分的线程上运行。因此,如果活动线程来自弹性线程池,或者为流的该部分定义的单个线程,我希望在其上执行 DynamoDB 逻辑。
标签: java apache-kafka amazon-dynamodb project-reactor aws-sdk-java-2.0