【问题标题】:Sending queries in parallel with ExecutorService in Java 8在 Java 8 中与 ExecutorService 并行发送查询
【发布时间】:2017-02-04 20:28:38
【问题描述】:

我正在用 Java 编写一个 Web 应用程序(即使用 JavaLite)。在这个 Web 应用程序中,我有一个端点,它应该在调用时向服务器发送一堆其他请求。由于这些请求的数量可能会增加,因此我决定使用 Java 8 中引入的 Java Concurrency API 并行发送这些请求。我的并行发送多个请求的代码如下:

public List<String> searchAll(List<String> keywords) {
    ExecutorService executor = Executors.newWorkStealingPool();
    List<Callable<List<String>>> tasks = new ArrayList<>();
    for (String key : keywords) {
        tasks.add(() -> {
            LOGGER.info("Sending query for key: " + key);
            return sendSearchQuery(key);
        });
    }
    List<String> all = new ArrayList<>();
    try {
        executor.invokeAll(tasks)
                .stream()
                .map(future -> {
                    try {
                        return future.get();
                    }
                    catch (Exception e) {
                        throw new IllegalStateException(e);
                    }
                })
                .forEach((list) ->
                {
                    LOGGER.info("Received list: " + list);
                    all.addAll(list);
                });
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return all;
}

private List<String> sendSearchQuery(String query) throws UnirestException {
    long startTime = System.nanoTime();
    HttpResponse<JsonNode> response = Unirest.get(SEARCH_URL)
            .queryString("q", query).asString();
    Map<String, Object> result = JsonHelper.toMap(response.getBody());
//    Get get = Http.get(SEARCH_URL + "?q=" + query);
//    Map<String, Object> result = JsonHelper.toMap(get.text());
    LOGGER.info("Query received in " + (System.nanoTime() - startTime) / 1000000 + " ms for key: " + query);
    return (List<String>) result.get("result");
}

这段代码的输出如下:

[ForkJoinPool-2-worker-1] INFO app.managers.SearchManager - Sending query for key: sky
[ForkJoinPool-2-worker-2] INFO app.managers.SearchManager - Sending query for key: outdoor
[ForkJoinPool-2-worker-3] INFO app.managers.SearchManager - Sending query for key: bridge
[ForkJoinPool-2-worker-0] INFO app.managers.SearchManager - Sending query for key: water
[ForkJoinPool-2-worker-0] INFO app.managers.SearchManager - Query received in 1331 ms for key: water
[ForkJoinPool-2-worker-0] INFO app.managers.SearchManager - Sending query for key: building
[ForkJoinPool-2-worker-1] INFO app.managers.SearchManager - Query received in 1332 ms for key: sky
[ForkJoinPool-2-worker-2] INFO app.managers.SearchManager - Query received in 1332 ms for key: outdoor
[ForkJoinPool-2-worker-3] INFO app.managers.SearchManager - Query received in 1332 ms for key: bridge
[ForkJoinPool-2-worker-0] INFO app.managers.SearchManager - Query received in 302 ms for key: building
[1324676647@qtp-178658894-0] INFO app.managers.SearchManager - Received list: [16973, 4564, 12392, 1195, 1207, 682, 10518, 10532, 10545, 19328, 10524, 10537, 10551, 19334, 10522, 10535, 10548, 19332, 10521, 10534]
[1324676647@qtp-178658894-0] INFO app.managers.SearchManager - Received list: []
[1324676647@qtp-178658894-0] INFO app.managers.SearchManager - Received list: [4303, 2844, 4366]
[1324676647@qtp-178658894-0] INFO app.managers.SearchManager - Received list: [9490, 1638, 20006, 17715, 17758, 18788, 6071, 11230, 13384, 4940, 18039, 17871, 16629, 6148, 19172, 4263, 4569, 8396, 18643, 4904]
[1324676647@qtp-178658894-0] INFO app.managers.SearchManager - Received list: [17306, 17303, 17305, 17304, 16062, 16156, 16153, 16154, 16061, 9098, 2491, 4368, 22134, 1008, 16152, 16151, 16148, 16155, 16147, 16149]

如您所见,我使用了两个不同的 Http 库(JavaLite HttpUnirest)来查看问题是否出在我使用的库上,但似乎并非如此,因为它们都产生了同样的问题。

这里的问题是,第一个n(机器上的处理器数量)查询同时开始和结束。这是正常的,但是它们花费的时间也比应有的要长。假设一个请求在正常情况下需要t 时间。在这种情况下,第一个 n 查询每个都需要大约 n * t 时间,其余查询每个都需要 t 时间。我是否错误地使用了并发 API?

编辑:SEARCH_URL 上运行的服务器部署在 Azure 上,它可以处理多个请求。

我还尝试了以下方法:

  • 使用ExecutorService.newFixedThreadPool(),但我使用的Executor 似乎不是问题的原因。
  • 调用 invokeAny() 而不是 invokeAll(),但是前者会阻塞主线程,直到其中一项任务完成,并且只返回该任务的结果。

编辑 2: 所以我玩弄了服务器和我目前正在开发的应用程序。奇怪的是,服务器在不同的时间响应 n 个请求,但是应用程序在从第一个请求到达服务器的时间开始到第 n'th 时间结束的时间框架之后收到这些响应响应到达应用程序。我无法解释这种行为。

【问题讨论】:

  • 如果你只是设置一个固定的睡眠而不是 http 调用会发生什么?这样我们就可以确定是执行器服务的问题还是服务器的问题。另外,请将日期时间添加到您的记录器并打印输出。将您解释的问题可视化会更容易。
  • @MadPiranha 当我不发送请求时它按预期工作。问题确实似乎与服务器本身有关。我在 localhost 上运行服务器,第一个 n 响应确实在更长的时间内发送。
  • 根据“前 n 个请求”的含义,您的问题可能是由于“java.util.concurrent”包中相关类的加载和初始化。我记得在某处读过它,但遗憾的是我记不起具体在哪里。
  • 如果我们知道延迟来自服务器端,那么我们需要看看那部分!请更新问题。
  • @MadPiranha 我更多地使用了代码,似乎服务器在合理的时间内(不是同时)发送回响应,但是客户端收集结果的时间比每个时间都长的各个请求,都彼此相似。我会更新这个问题。

标签: java http concurrency parallel-processing java-8


【解决方案1】:

您是否查看过为 java 8 引入的 completableFuture 框架?我可以帮助你尝试异步发送所有内容。

List<CompletableFuture<List<String>>> futures = keywords.parallelStream()
            .map(key -> CompletableFuture.supplyAsync(() -> sendSearchQuery(key), executor))
            .collect(toList());

CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));

try {
    allOf.join();
} catch (Exception e){
    e.printStackTrace();
}

List<String> all = futures.stream().filter(CompletableFuture::isCompletedExceptionally)
            .flatMap(future -> future.join().stream())
            .collect(toList());

return all;

这将发送你所有的搜索异步,然后你调用 allOf.join(),你等待一切返回。

最后的流然后将每个结果映射回一个列表并返回

【讨论】:

    【解决方案2】:
    invokeAll(Collection<? extends Callable<T>> tasks)
    
    • 执行给定的任务,返回一个持有其状态的期货列表和全部完成后的结果。

    您的来源是否也有能力处理多个请求?

    【讨论】:

    • 你说得对,文档确实说“全部完成后”,但是其余查询正常发送,Future 对象及时返回,没有任何问题。此外,似乎没有任何替代 invokeAll() 的替代品,它执行彼此独立的任务。服务器部署在 Azure 上,可以处理多个请求。
    • invokeAny() 在那里。如果你使用Executors.newFixedThreadPool(),你可以一个一个提交多个任务,它们将并行执行。
    • 另外,除了等待所有四个请求完成之外,这个时间没有告诉我们什么。
    • invokeAny() 运行一堆任务并且只返回 Future 对象以完成第一个任务。甚至它们的返回类型也不同。在这个例子中它不是很明显,但是如果我运行了 20 个任务,那么剩下的任务确实并行运行没有问题会更明显。
    • 固定线程池或者一个个提交任务你怎么看,但是它们会并行执行。
    猜你喜欢
    • 1970-01-01
    • 2016-08-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-07-24
    相关资源
    最近更新 更多