【发布时间】:2019-12-02 08:29:55
【问题描述】:
Project Reactor 是否可以在单声道中等待事件/条件,而无需为每个单声道使用阻塞线程?有了CompletableFuture,我可以完成这样的事情,但我不知道如何使用 Project Reactor 来做到这一点。
我的问题是我需要将请求与响应关联起来。响应时间变化很大,有些甚至永远不会得到回复和超时。在客户端时,每个请求的阻塞线程不是问题,但由于这是一个服务器应用程序,我不希望最终生成一个阻塞等待响应的请求的线程。
API 看起来像这样:
Mono<Response> doRequest(Mono<Request> request);
由于我不知道如何使用 Reactor 进行操作,我将解释如何使用 CompletableFuture 进行操作,以阐明我在寻找什么。 API 如下所示:
CompletableFuture<Response> doRequest(Request request);
当调用者调用时,会向服务器发出请求,其中包含由该方法生成的相关 ID。调用者返回一个CompletableFuture,该方法将对此CompletableFuture 的引用存储在映射中,并将相关ID 作为键。
还有一个线程(池)接收服务器的所有响应。当它收到响应时,它会从响应中获取相关 ID,并使用它在映射中查找原始请求(即 CompletableFuture)并在其上调用 complete(response);。
在这个实现中,每个请求不需要阻塞线程。这基本上更像是一种 Vert.X / Netty 的思维方式?我想知道如何使用 Project Reactor 来实现这样的事情(如果可能的话)。
编辑 25-07-2019:
根据 cmets 中的要求,澄清我在下面得到的内容是我将如何使用 CompleteableFuture's 实现这一点的示例。
我还注意到我犯了一个可能相当令人困惑的错误:在CompletableFuture 示例中,我传递了Mono 作为参数。那应该只是一个“正常”的论点。很抱歉,我希望我没有把人们弄糊涂。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
class NonBlockingCorrelatingExample {
/**
* This example shows how to implement correlating requests with responses without needing a (sleeping)
* thread per request to wait for the response with the use of {@link CompletableFuture}'s.
*
* So the main feat of this example is that there is always a fixed (small) number of threads used even if one
* would fire a thousands requests.
*/
public static void main(String[] args) throws Exception {
RequestResponseService requestResponseService = new RequestResponseService();
Request request = new Request();
request.correlationId = 1;
request.question = "Do you speak Spanish?";
CompletableFuture<Response> responseFuture = requestResponseService.doRequest(request);
responseFuture.whenComplete((response, throwable) -> System.out.println(response.answer));
// The blocking call here is just so the application doesn't exit until the demo is completed.
responseFuture.get();
}
static class RequestResponseService {
/** The key in this map is the correlation ID. */
private final ConcurrentHashMap<Long, CompletableFuture<Response>> responses = new ConcurrentHashMap<>();
CompletableFuture<Response> doRequest(Request request) {
Response response = new Response();
response.correlationId = request.correlationId;
CompletableFuture<Response> reponseFuture = new CompletableFuture<>();
responses.put(response.correlationId, reponseFuture);
doNonBlockingFireAndForgetRequest(request);
return reponseFuture;
}
private void doNonBlockingFireAndForgetRequest(Request request) {
// In my case this is where the request would be published on an MQTT broker (message bus) in a request topic.
// Right now we will just make a call which will simulate a response message coming in after a while.
simulateResponses();
}
private void processResponse(Response response) {
// There would usually be a (small) thread pool which is subscribed to the message bus which receives messages
// in a response topic and calls this method to handle those messages.
CompletableFuture<Response> responseFuture = responses.get(response.correlationId);
responseFuture.complete(response);
}
void simulateResponses() {
// This is just to make the example work. Not part of the example.
new Thread(() -> {
try {
// Simulate a delay.
Thread.sleep(10_000);
Response response = new Response();
response.correlationId = 1;
response.answer = "Si!";
processResponse(response);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
static class Request {
long correlationId;
String question;
}
static class Response {
long correlationId;
String answer;
}
}
【问题讨论】:
-
您能否与 CompletableFuture 分享一个虚拟示例代码,以显示它现在是如何工作的?
-
虽然这是一个有趣的问题 (+1),但我想知道这是否属于过早优化的幌子——您是否测试过/基准测试过所有这些阻塞线程在实践中是否真的存在问题?我怀疑他们不会,最糟糕的情况是他们可能会坐在那里使用一些内存,但我从未尝试过......
-
@MichaelBerry 我同意过早的优化是值得厌倦的。我没有做过任何基准测试,但像 Netty 和 Vert.x 这样的框架之所以能够很好地扩展,是因为它们以事件驱动的方式工作,而不是拥有数千个休眠线程。如果我们要断定每个(例如)连接的线程不是问题,那么我可以让生活变得更加简单,只需为每个新连接使用一个线程,并简单地在该线程中按顺序进行一堆阻塞调用。这将比使用 Reactor 简单得多。
-
您能否扩展 API 的
Mono<Request>部分?MonovsCompletableFuture通常暗示Mono的懒惰:在Mono实际订阅/请求之前什么都不应该发生。它在这个 API 中是如何发挥作用的? -
@SimonBaslé @Yossarian 我添加了一个工作(单类)示例,说明如何使用
CompletableFuture实现它。我怀疑您可能也被我在描述CompletableFutureAPI 时所犯的错误所迷惑;它不应该将Mono作为参数。这个错误可能暗示我正在尝试将两者结合起来,但事实并非如此。对此感到抱歉。
标签: java multithreading project-reactor