【发布时间】:2018-11-08 21:41:49
【问题描述】:
我有一个 Web 服务,它可以对另一个服务进行 http 调用。 Web 服务分解一对多请求并尝试发出并行的一对一请求。为了测试性能,我保持后端的吞吐量不变。例如,我能够以 100 毫秒的 99% 延迟实现 1000 个请求/秒的吞吐量。因此,为了测试每个对 Web 服务的请求分解为对后端的 2 个请求的并行请求,我发送了 500 个请求/秒,但仅实现了 150 毫秒的第 99 个百分位延迟。我是否使用以下代码创建线程争用和/或阻塞 http 调用?
import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
public class Foo {
private HTTPClient myHTTPClient = new HTTPClient("http://my_host.com"); //java ws rs http client
private interface Handler<REQ, RES> {
RES work(REQ req);
}
private <REQ, RES> CompletableFuture<RES> getAsync(REQ req, Handler<REQ, RES> handler) {
CompletableFuture<RES> future = CompletableFuture.supplyAsync(() -> {
return handler.work(req);
});
return future;
}
public RouteCostResponse getRouteCost(Point sources, List<Point> destinations) {
Map<String, Request> requests = new HashMap<>();
// create request bodies and keep track of request id's
for (Point destination : destinations) {
requests.put(destination.getId(), new RouteCostRequest(source, destination))
}
//create futures
ConcurrentMap<String, CompletableFuture<RouteCost>> futures = requests.entrySet().parallelStream()
.collect(Collectors.toConcurrentMap(
entry -> entry.getKey(),
entry -> getAsync(entry.getValue(), route -> myHTTPClient.getRoute(route)))
));
//retrieve results
ConcurrentMap<String, RouteCost> result = futures.entrySet().parallelStream()
.collect(Collectors.toConcurrentMap(
entry -> entry.getKey(),
entry -> entry.getValue().join()
));
RouteCostResponse response = new RouteCostResponse(result);
return response;
}
}
【问题讨论】:
-
你的future有线程池支持吗?如果没有,那么创建线程将会有很多开销。
-
@Steve11235 我假设 supplyAsync() 默认使用 ForkJoinPool.commonPool(),这可能不是最佳实践。我应该手动管理线程池吗?
-
@Chadderall 当然,在调用 supplyAsync 时明确添加 ForkJoinPool.commonPool()
-
@Chadderall parallelStream() 这里没有意义。
-
谢谢@AlexeiKaigorodov。我已使用您建议的修复更新了问题的答案
标签: java http asynchronous stream