【发布时间】:2017-02-04 20:28:38
【问题描述】:
我正在用 Java 编写一个 Web 应用程序(即使用 JavaLite)。在这个 Web 应用程序中,我有一个端点,它应该在调用时向服务器发送一堆其他请求。由于这些请求的数量可能会增加,因此我决定使用 Java 8 中引入的 Java Concurrency API 并行发送这些请求。我的并行发送多个请求的代码如下:
public List<String> searchAll(List<String> keywords) {
ExecutorService executor = Executors.newWorkStealingPool();
List<Callable<List<String>>> tasks = new ArrayList<>();
for (String key : keywords) {
tasks.add(() -> {
LOGGER.info("Sending query for key: " + key);
return sendSearchQuery(key);
});
}
List<String> all = new ArrayList<>();
try {
executor.invokeAll(tasks)
.stream()
.map(future -> {
try {
return future.get();
}
catch (Exception e) {
throw new IllegalStateException(e);
}
})
.forEach((list) ->
{
LOGGER.info("Received list: " + list);
all.addAll(list);
});
} catch (InterruptedException e) {
e.printStackTrace();
}
return all;
}
private List<String> sendSearchQuery(String query) throws UnirestException {
long startTime = System.nanoTime();
HttpResponse<JsonNode> response = Unirest.get(SEARCH_URL)
.queryString("q", query).asString();
Map<String, Object> result = JsonHelper.toMap(response.getBody());
// Get get = Http.get(SEARCH_URL + "?q=" + query);
// Map<String, Object> result = JsonHelper.toMap(get.text());
LOGGER.info("Query received in " + (System.nanoTime() - startTime) / 1000000 + " ms for key: " + query);
return (List<String>) result.get("result");
}
这段代码的输出如下:
[ForkJoinPool-2-worker-1] INFO app.managers.SearchManager - Sending query for key: sky
[ForkJoinPool-2-worker-2] INFO app.managers.SearchManager - Sending query for key: outdoor
[ForkJoinPool-2-worker-3] INFO app.managers.SearchManager - Sending query for key: bridge
[ForkJoinPool-2-worker-0] INFO app.managers.SearchManager - Sending query for key: water
[ForkJoinPool-2-worker-0] INFO app.managers.SearchManager - Query received in 1331 ms for key: water
[ForkJoinPool-2-worker-0] INFO app.managers.SearchManager - Sending query for key: building
[ForkJoinPool-2-worker-1] INFO app.managers.SearchManager - Query received in 1332 ms for key: sky
[ForkJoinPool-2-worker-2] INFO app.managers.SearchManager - Query received in 1332 ms for key: outdoor
[ForkJoinPool-2-worker-3] INFO app.managers.SearchManager - Query received in 1332 ms for key: bridge
[ForkJoinPool-2-worker-0] INFO app.managers.SearchManager - Query received in 302 ms for key: building
[1324676647@qtp-178658894-0] INFO app.managers.SearchManager - Received list: [16973, 4564, 12392, 1195, 1207, 682, 10518, 10532, 10545, 19328, 10524, 10537, 10551, 19334, 10522, 10535, 10548, 19332, 10521, 10534]
[1324676647@qtp-178658894-0] INFO app.managers.SearchManager - Received list: []
[1324676647@qtp-178658894-0] INFO app.managers.SearchManager - Received list: [4303, 2844, 4366]
[1324676647@qtp-178658894-0] INFO app.managers.SearchManager - Received list: [9490, 1638, 20006, 17715, 17758, 18788, 6071, 11230, 13384, 4940, 18039, 17871, 16629, 6148, 19172, 4263, 4569, 8396, 18643, 4904]
[1324676647@qtp-178658894-0] INFO app.managers.SearchManager - Received list: [17306, 17303, 17305, 17304, 16062, 16156, 16153, 16154, 16061, 9098, 2491, 4368, 22134, 1008, 16152, 16151, 16148, 16155, 16147, 16149]
如您所见,我使用了两个不同的 Http 库(JavaLite Http 和 Unirest)来查看问题是否出在我使用的库上,但似乎并非如此,因为它们都产生了同样的问题。
这里的问题是,第一个n(机器上的处理器数量)查询同时开始和结束。这是正常的,但是它们花费的时间也比应有的要长。假设一个请求在正常情况下需要t 时间。在这种情况下,第一个 n 查询每个都需要大约 n * t 时间,其余查询每个都需要 t 时间。我是否错误地使用了并发 API?
编辑:SEARCH_URL 上运行的服务器部署在 Azure 上,它可以处理多个请求。
我还尝试了以下方法:
- 使用
ExecutorService.newFixedThreadPool(),但我使用的Executor似乎不是问题的原因。 - 调用
invokeAny()而不是invokeAll(),但是前者会阻塞主线程,直到其中一项任务完成,并且只返回该任务的结果。
编辑 2: 所以我玩弄了服务器和我目前正在开发的应用程序。奇怪的是,服务器在不同的时间响应 n 个请求,但是应用程序在从第一个请求到达服务器的时间开始到第 n'th 时间结束的时间框架之后收到这些响应响应到达应用程序。我无法解释这种行为。
【问题讨论】:
-
如果你只是设置一个固定的睡眠而不是 http 调用会发生什么?这样我们就可以确定是执行器服务的问题还是服务器的问题。另外,请将日期时间添加到您的记录器并打印输出。将您解释的问题可视化会更容易。
-
@MadPiranha 当我不发送请求时它按预期工作。问题确实似乎与服务器本身有关。我在 localhost 上运行服务器,第一个
n响应确实在更长的时间内发送。 -
根据“前 n 个请求”的含义,您的问题可能是由于“java.util.concurrent”包中相关类的加载和初始化。我记得在某处读过它,但遗憾的是我记不起具体在哪里。
-
如果我们知道延迟来自服务器端,那么我们需要看看那部分!请更新问题。
-
@MadPiranha 我更多地使用了代码,似乎服务器在合理的时间内(不是同时)发送回响应,但是客户端收集结果的时间比每个时间都长的各个请求,都彼此相似。我会更新这个问题。
标签: java http concurrency parallel-processing java-8