【问题标题】:Java Parallel HTTP requests with CompleteableFuture not very performant具有 CompleteableFuture 的 Java 并行 HTTP 请求性能不是很好
【发布时间】:2018-11-08 21:41:49
【问题描述】:

我有一个 Web 服务,它可以对另一个服务进行 http 调用。 Web 服务分解一对多请求并尝试发出并行的一对一请求。为了测试性能,我保持后端的吞吐量不变。例如,我能够以 100 毫秒的 99% 延迟实现 1000 个请求/秒的吞吐量。因此,为了测试每个对 Web 服务的请求分解为对后端的 2 个请求的并行请求,我发送了 500 个请求/秒,但仅实现了 150 毫秒的第 99 个百分位延迟。我是否使用以下代码创建线程争用和/或阻塞 http 调用?

import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;

public class Foo {
  private HTTPClient myHTTPClient = new HTTPClient("http://my_host.com");  //java ws rs http client

  private interface Handler<REQ, RES> {
    RES work(REQ req);
  }

  private <REQ, RES> CompletableFuture<RES> getAsync(REQ req, Handler<REQ, RES> handler) {
    CompletableFuture<RES> future = CompletableFuture.supplyAsync(() -> {
      return handler.work(req);
    });

    return future;
  }

  public RouteCostResponse getRouteCost(Point sources, List<Point> destinations) {
    Map<String, Request> requests = new HashMap<>();

    // create request bodies and keep track of request id's
    for (Point destination : destinations) {
      requests.put(destination.getId(), new RouteCostRequest(source, destination))
    }

    //create futures
    ConcurrentMap<String, CompletableFuture<RouteCost>> futures = requests.entrySet().parallelStream()
        .collect(Collectors.toConcurrentMap(
            entry -> entry.getKey(),
            entry -> getAsync(entry.getValue(), route -> myHTTPClient.getRoute(route)))
        ));

    //retrieve results
    ConcurrentMap<String, RouteCost> result = futures.entrySet().parallelStream()
        .collect(Collectors.toConcurrentMap(
            entry -> entry.getKey(),
            entry -> entry.getValue().join()
        ));

    RouteCostResponse response = new RouteCostResponse(result);

    return response;
  }
}

【问题讨论】:

  • 你的future有线程池支持吗?如果没有,那么创建线程将会有很多开销。
  • @Steve11235 我假设 supplyAsync() 默认使用 ForkJoinPool.commonPool(),这可能不是最佳实践。我应该手动管理线程池吗?
  • @Chadderall 当然,在调用 supplyAsync 时明确添加 ForkJoinPool.commonPool()
  • @Chadderall parallelStream() 这里没有意义。
  • 谢谢@AlexeiKaigorodov。我已使用您建议的修复更新了问题的答案

标签: java http asynchronous stream


【解决方案1】:

以下代码没有线程争用,尽管我似乎遇到了 I/O 问题。关键是使用显式线程池。 ForkJoinPoolExecutors.fixedThreadPool

import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;

public class Foo {
  private HTTPClient myHTTPClient = new HTTPClient("http://my_host.com");  //java ws rs http client
  private static final ForkJoinPool pool = new ForkJoinPool(1000);

  private interface Handler<REQ, RES> {
    RES work(REQ req);
  }

  private <REQ, RES> CompletableFuture<RES> getAsync(REQ req, Handler<REQ, RES> handler) {
    CompletableFuture<RES> future = CompletableFuture.supplyAsync(() -> {
      return handler.work(req);
    });

    return future;
  }

  public RouteCostResponse getRouteCost(Point sources, List<Point> destinations) {
    Map<String, Request> requests = new HashMap<>();

// create request bodies and keep track of request id's
    for (Point destination : destinations) {
      requests.put(destination.getId(), new RouteCostRequest(source, destination))
    }

    //create futures
    ConcurrentMap<String, CompletableFuture<RouteCost>> futures = requests.entrySet().stream()
    .collect(Collectors.toConcurrentMap(
        entry -> entry.getKey(),
        entry -> getAsync(entry.getValue(), route -> myHTTPClient.getRoute(route)))
    ));

    //retrieve results
    ConcurrentMap<String, RouteCost> result = futures.entrySet().stream()
    .collect(Collectors.toConcurrentMap(
        entry -> entry.getKey(),
        entry -> entry.getValue().join()
    ));

    RouteCostResponse response = new RouteCostResponse(result);

    return response;
  }
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-10-09
    • 2010-12-06
    • 2020-10-17
    • 2020-03-13
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多