【问题标题】:parallelize do while loop depending on how many pages we need to call for?根据我们需要调用多少页并行执行while循环?
【发布时间】:2018-07-05 14:32:26
【问题描述】:

我必须通过传递标头和正文来发出 HTTP POST 请求。在正文中,我需要在发布数据之前提供pageNumber,所以我最初从“1”开始。之后我会发布数据,我会收到如下所示的 JSON 响应。

{
    "response": {
        "pageNumber": 1,
        "entries": 200,
        "numberOfPages": 3
    },
    "list": [
        {
            // some stuff here
        }
    ],
    "total": 1000
}

现在根据pageNumber 1 的回复,我将决定我还需要拨打多少电话。现在在上面的响应中numberOfPages 是 3,所以我需要对同一个 URL 进行总共三个调用。由于我们已经拨打了 1 个电话,我将再拨打 2 个电话,其中 pageNumber 正文中包含“2”和“3”。

以下是我的工作代码。我只需要通过更改正文来调用相同的 URL,直到 numberOfPages 次。对于每个呼叫,都应该使用相应的pageNumber,所以如果numberOfPages 是 3,那么我将总共拨打 3 个电话。从每个页面收集数据后,我正在填充两张地图。

public class AppParser {
  private static final ObjectMapper objectMapper = new ObjectMapper();
  private static final String lastParentIdJsonPath = "......";    
  private final Map<String, String> processToTaskIdHolder = new HashMap<>();
  private final Multimap<String, Category> itemsByCategory = LinkedListMultimap.create();
  private final int entries;
  private final String siteId;

  public AppParser(int entries, String id) {
    this.entries = entries;
    this.id = id;
    collect();
  }

  // this is only called from above constructor
  private void collect() {
    String endpoint = "url_endpoint";
    int number = 1;
    int expectedNumber;
    do {
      HttpEntity<String> requestEntity = new HttpEntity<String>(getBody(number), getHeader());
      ResponseEntity<String> responseEntity =
          HttpClient.getInstance().getClient()
              .exchange(URI.create(endpoint), HttpMethod.POST, requestEntity, String.class);
      String jsonInput = responseEntity.getBody();
      Stuff response = objectMapper.readValue(jsonInput, Stuff.class);
      expectedNumber = (int) response.getPaginationResponse().getNumberOfPages();
      if (expectedNumber <= 0) {
        break;
      }
      List<Postings> postings = response.getPostings();
      for (Postings posting : postings) {
        if (posting.getClientIds().isEmpty()) {
          continue;
        }
        List<String> lastParent = JsonPath.read(jsonInput, lastParentIdJsonPath);
        String clientId = posting.getClientIds().get(0).getId();
        Category category = getCategory(posting);
        // populate two maps now
        itemsByCategory.put(clientId, category);
        processToTaskIdHolder.put(clientId, lastParent.get(0));
      }
      number++;
    } while (number <= expectedNumber);
  }

  private String getBody(final int number) {
    Input input = new Input(entries, number, 0);
    Body body = new Body("Stuff", input);
    return gson.toJson(body);
  }

  // getters for those two above maps
}

现在我上面的代码正在为每一页按顺序收集数据,所以如果我有很高的numberOfPages,那么收集所有这些页码的所有数据需要一些时间。假设numberOfPages 是 500,那么我的代码将为每个 pageNumber 一个接一个地依次运行。有什么方法可以并行化我的上述代码,以便我们可以同时收集 5 页的数据?这可能吗?而且我想我需要确保我的代码是线程安全的。

注意:HttpClient 是线程安全的单例类。

【问题讨论】:

    标签: java multithreading performance thread-safety guava


    【解决方案1】:

    Java 8 解决方案(按顺序执行):

    并行流可以成为你的朋友:

    IntStream.range(1,numberOfPages)
            .parallel()
            .forEachOrdered(page -> {
                // ...
                postings.parallelStream()
                        .forEachOrdered(posting -> {
                            // ...
                });                    
            });
    

    在 lambda 中使用的任何变量都需要声明为 final。

    如果输出的顺序不重要,foreachOrdered 可以用 foreach 代替。

    请参阅此主题以控制并发运行的线程数: How many threads are spawned in parallelStream in Java 8?

    Java 7 解决方案(无序执行):

    灵感来自:wait until all threads finish their work in java

    ExecutorService es = Executors.newFixedThreadPool(4);
    for(int page=1 ; page < numberOfPages ; ++page) {
        es.execute(new Runnable() {
            @Override
            public void run() {
                /*  your task */  
            }});
    }
    es.shutdown();
    boolean finished = false;
    try {
        finished = es.awaitTermination(1, TimeUnit.MINUTES);
    } catch (InterruptedException e) {}
    

    这 4 个可以随时换成您想要的最大线程数。 awaitTermination 需要一些适当的超时时间。

    【讨论】:

    • 我仍在使用 Java 7,所以还不能使用 Jav 8。
    • 有流的反向移植,例如:github.com/streamsupport/streamsupport 我自己没有尝试过这些,但我猜他们可以帮助你。
    • hmm 以前从未使用过反向移植。有没有其他方法可以通过普通的java来做到这一点?
    • 添加了一个 java 7 的方法,它会无序执行,但你正在填充地图,所以希望没问题。您可能需要检查您正在使用的那些映射是否也能容忍多个线程写入它们。
    【解决方案2】:

    我尝试使用多线程修改您的代码,但这并不容易,因为您没有提供包含所有导入的完整类源。此外,您的代码也不够干净。 您的任务是异步请求的常见情况。我将您的收集代码包装到java.util.concurrent.Callable。它通过 ExecutorService 异步为我提供使用任务,并在需要时将结果作为ParseResult 对象获取。在下面的代码中,我发出了 1 个请求来填充 expectedNumber 变量,并且循环内应用程序创建任务并将它们提交到 executorService 并使用它们正在运行的专用线程池。 代码:

    private static final ObjectMapper objectMapper = new ObjectMapper();
    private static final String URL_ENDPOINT = "url_endpoint";
    private final Map<String, String> processToTaskIdHolder = new HashMap<>();
    private final Multimap<String, Category> itemsByCategory = LinkedListMultimap.create();
    private static final String lastParentIdJsonPath = "......";
    
    class ParseResult {
        private String clientId;
        private Category category;
        private String lastParent;
        private int expectedNumber;
    }
    
    class ParseTask implements Callable<ParseResult> {
    
        private int pageNumber;
    
        public ParseTask(int pageNumber) {
            this.pageNumber = pageNumber;
        }
    
        @Override
        public ParseResult call() throws Exception {
            HttpEntity<String> requestEntity = new HttpEntity<String>(getBody(pageNumber), getHeader());
            ResponseEntity<String> responseEntity =
                    HttpClient.getInstance().getClient()
                            .exchange(URI.create(URL_ENDPOINT), HttpMethod.POST, requestEntity, String.class);
            String jsonInput = responseEntity.getBody();
            Stuff response = objectMapper.readValue(jsonInput, Stuff.class);
            int expectedNumber = (int) response.getPaginationResponse().getNumberOfPages();
            if (expectedNumber <= 0) {
                return null; // or throw exception
            }
            List<Postings> postings = response.getPostings();
            for (Postings posting : postings) {
                if (posting.getClientIds().isEmpty()) {
                    continue;
                }
                List<String> lastParent = JsonPath.read(jsonInput, lastParentIdJsonPath);
                String clientId = posting.getClientIds().get(0).getId();
                Category category = getCategory(posting);
    
                //collecting the result
                ParseResult parseResult = new ParseResult();
                parseResult.clientId = clientId;
                parseResult.category = category;
                parseResult.expectedNumber = expectedNumber;
                parseResult.lastParent = lastParent.get(0);
                writeResult(parseResult); // writing the result
                return parseResult;
            }
        }
    }
    
    public AppParser(int entries, String id) {
        // .....
        collect();
    }
    
    // this is only called from above constructor
    private void collect() {
        int number = 1;
        int expectedNumber = 0;
        ParseTask parseTask = new ParseTask(number);
        try {
            ParseResult firstResult = parseTask.call();
            expectedNumber = firstResult.expectedNumber; // fill the pages amount
        } catch (Exception e) {
            e.printStackTrace();
        }
    
        ExecutorService executorService = Executors.newCachedThreadPool();
        while (number <= expectedNumber) {
            executorService.submit(new ParseTask(number));
        }
    }
    
    private String getBody(final int number) {
        Input input = new Input(entries, number, 0);
        Body body = new Body("Stuff", input);
        return gson.toJson(body);
    }
    
    private void writeResult(ParseResult result) {
        // populate two maps now
        itemsByCategory.put(result.clientId, result.category);
        processToTaskIdHolder.put(result.clientId, result.lastParent);
    }
    

    我们可以花费大量时间来升级您的代码,但这是一个带有多线程的原始版本。我不确定它是否会起作用,因为正如我之前所说,您没有提供完整版本。也许它需要一些语法修复。

    【讨论】:

    • 有必要使用Executors.newCachedThreadPool() or there is some benefit using that吗?我们不能在这里使用FixedThreadPoolSize 吗?
    • 这只是一个例子。如果您知道要使用固定线程池的并行线程数。您可以阅读《Thinking in Java》一书获得更多信息
    • 好的。我现在在问题中提供了完整的课程。你能检查一下吗?而且你的代码线程安全吗?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2013-05-31
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-07-18
    • 1970-01-01
    • 2020-07-22
    相关资源
    最近更新 更多