【问题标题】:Is it possible to wait for a event without blocking a thread with Project Reactor?是否可以在不阻塞 Project Reactor 线程的情况下等待事件?
【发布时间】:2019-12-02 08:29:55
【问题描述】:

Project Reactor 是否可以在单声道中等待事件/条件,而无需为每个单声道使用阻塞线程?有了CompletableFuture,我可以完成这样的事情,但我不知道如何使用 Project Reactor 来做到这一点。

我的问题是我需要将请求与响应关联起来。响应时间变化很大,有些甚至永远不会得到回复和超时。在客户端时,每个请求的阻塞线程不是问题,但由于这是一个服务器应用程序,我不希望最终生成一个阻塞等待响应的请求的线程。

API 看起来像这样:

Mono<Response> doRequest(Mono<Request> request);

由于我不知道如何使用 Reactor 进行操作,我将解释如何使用 CompletableFuture 进行操作,以阐明我在寻找什么。 API 如下所示:

CompletableFuture<Response> doRequest(Request request);

当调用者调用时,会向服务器发出请求,其中包含由该方法生成的相关 ID。调用者返回一个CompletableFuture,该方法将对此CompletableFuture 的引用存储在映射中,并将相关ID 作为键。

还有一个线程(池)接收服务器的所有响应。当它收到响应时,它会从响应中获取相关 ID,并使用它在映射中查找原始请求(即 CompletableFuture)并在其上调用 complete(response);

在这个实现中,每个请求不需要阻塞线程。这基本上更像是一种 Vert.X / Netty 的思维方式?我想知道如何使用 Project Reactor 来实现这样的事情(如果可能的话)。

编辑 25-07-2019:

根据 cmets 中的要求,澄清我在下面得到的内容是我将如何使用 CompleteableFuture's 实现这一点的示例。

我还注意到我犯了一个可能相当令人困惑的错误:在CompletableFuture 示例中,我传递了Mono 作为参数。那应该只是一个“正常”的论点。很抱歉,我希望我没有把人们弄糊涂。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;

class NonBlockingCorrelatingExample {

    /**
     * This example shows how to implement correlating requests with responses without needing a (sleeping)
     * thread per request to wait for the response with the use of {@link CompletableFuture}'s.
     *
     * So the main feat of this example is that there is always a fixed (small) number of threads used even if one
     * would fire a thousands requests.
     */
    public static void main(String[] args) throws Exception {

        RequestResponseService requestResponseService = new RequestResponseService();

        Request request = new Request();
        request.correlationId = 1;
        request.question = "Do you speak Spanish?";

        CompletableFuture<Response> responseFuture = requestResponseService.doRequest(request);
        responseFuture.whenComplete((response, throwable) -> System.out.println(response.answer));

        // The blocking call here is just so the application doesn't exit until the demo is completed.
        responseFuture.get();
    }

    static class RequestResponseService {

        /** The key in this map is the correlation ID. */
        private final ConcurrentHashMap<Long, CompletableFuture<Response>> responses =  new ConcurrentHashMap<>();

        CompletableFuture<Response> doRequest(Request request) {
            Response response = new Response();
            response.correlationId = request.correlationId;
            CompletableFuture<Response> reponseFuture = new CompletableFuture<>();
            responses.put(response.correlationId, reponseFuture);

            doNonBlockingFireAndForgetRequest(request);

            return reponseFuture;
        }

        private void doNonBlockingFireAndForgetRequest(Request request) {
            // In my case this is where the request would be published on an MQTT broker (message bus) in a request topic.
            // Right now we will just make a call which will simulate a response message coming in after a while.
            simulateResponses();
        }

        private void processResponse(Response response) {
            // There would usually be a (small) thread pool which is subscribed to the message bus which receives messages
            // in a response topic and calls this method to handle those messages.
            CompletableFuture<Response> responseFuture = responses.get(response.correlationId);
            responseFuture.complete(response);
        }

        void simulateResponses() {
            // This is just to make the example work. Not part of the example.
            new Thread(() -> {
                try {
                    // Simulate a delay.
                    Thread.sleep(10_000);

                    Response response = new Response();
                    response.correlationId = 1;
                    response.answer = "Si!";

                    processResponse(response);

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }

    static class Request {
        long correlationId;
        String question;
    }

    static class Response {
        long correlationId;
        String answer;
    }

}

【问题讨论】:

  • 您能否与 CompletableFuture 分享一个虚拟示例代码,以显示它现在是如何工作的?
  • 虽然这是一个有趣的问题 (+1),但我想知道这是否属于过早优化的幌子——您是否测试过/基准测试过所有这些阻塞线程在实践中是否真的存在问题?我怀疑他们不会,最糟糕的情况是他们可能会坐在那里使用一些内存,但我从未尝试过......
  • @MichaelBerry 我同意过早的优化是值得厌倦的。我没有做过任何基准测试,但像 Netty 和 Vert.x 这样的框架之所以能够很好地扩展,是因为它们以事件驱动的方式工作,而不是拥有数千个休眠线程。如果我们要断定每个(例如)连接的线程不是问题,那么我可以让生活变得更加简单,只需为每个新连接使用一个线程,并简单地在该线程中按顺序进行一堆阻塞调用。这将比使用 Reactor 简单得多。
  • 您能否扩展 API 的 Mono&lt;Request&gt; 部分? Mono vs CompletableFuture 通常暗示 Mono 的懒惰:在 Mono 实际订阅/请求之前什么都不应该发生。它在这个 API 中是如何发挥作用的?
  • @SimonBaslé @Yossarian 我添加了一个工作(单类)示例,说明如何使用CompletableFuture 实现它。我怀疑您可能也被我在描述CompletableFuture API 时所犯的错误所迷惑;它不应该将Mono 作为参数。这个错误可能暗示我正在尝试将两者结合起来,但事实并非如此。对此感到抱歉。

标签: java multithreading project-reactor


【解决方案1】:

是的,这是可能的。你可以使用reactor.core.publisher.Mono#create方法来实现它

你的例子:

public static void main(String[] args) throws Exception {
    RequestResponseService requestResponseService = new RequestResponseService();

    Request request = new Request();
    request.correlationId = 1;
    request.question = "Do you speak Spanish?";


    Mono<Request> requestMono = Mono.just(request)
            .doOnNext(rq -> System.out.println(rq.question));
    requestResponseService.doRequest(requestMono)
            .doOnNext(response -> System.out.println(response.answer))
            // The blocking call here is just so the application doesn't exit until the demo is completed.
            .block();
}

static class RequestResponseService {
    private final ConcurrentHashMap<Long, Consumer<Response>> responses =
            new ConcurrentHashMap<>();

    Mono<Response> doRequest(Mono<Request> request) {
        return request.flatMap(rq -> doNonBlockingFireAndForgetRequest(rq)
                .then(Mono.create(sink -> responses.put(rq.correlationId, sink::success))));
    }

    private Mono<Void> doNonBlockingFireAndForgetRequest(Request request) {
        return Mono.fromRunnable(this::simulateResponses);
    }

    private void processResponse(Response response) {
        responses.get(response.correlationId).accept(response);
    }

    void simulateResponses() {
        // This is just to make the example work. Not part of the example.
        new Thread(() -> {
            try {
                // Simulate a delay.
                Thread.sleep(10_000);

                Response response = new Response();
                response.correlationId = 1;
                response.answer = "Si!";

                processResponse(response);

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

【讨论】:

  • Mono.fromRunnable(this::simulateResponses) 调用不会简单地导致 Mono 使用该线程以使其处于睡眠状态吗? VisualVM 似乎是这样建议的。
  • 我用 Mono 包装它只是为了方便。您可以让方法返回 void 并将其称为示例中的语句。关于线程,subscribeOn 运算符有很多选项。
猜你喜欢
  • 2015-09-18
  • 1970-01-01
  • 2011-06-17
  • 2020-12-01
  • 2011-08-04
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-04-05
相关资源
最近更新 更多