【问题标题】:parallelize a for loop and populate multiple data structures并行化 for 循环并填充多个数据结构
【发布时间】:2018-02-22 00:28:25
【问题描述】:

我有一个要并行化的 for 循环。在下面的代码中,我迭代了最外层的 for 循环并将条目放入各种数据结构中,它工作正常。所有这些数据结构在同一个类中都有一个 getter,我稍后会使用它来获取所有细节,一旦在其他类的这个 for 循环中完成所有操作。我正在填充infoitemToNumberMappingcatToValueHoldertasksByCategorycatHolderitemIds 数据结构,它们也有吸气剂。

  // want to parallelize this for loop
  for (Task task : tasks) {
    if (task.getCategories().isEmpty() || task.getEventList() == null
        || task.getMetaInfo() == null) {
      continue;
    }
    String itemId = task.getEventList().getId();
    String categoryId = task.getCategories().get(0).getId();
    Processor fp = new Processor(siteId, itemId, categoryId, poolType);
    Map<String, Integer> holder = fp.getDataHolder();
    if (!holder.isEmpty()) {
      for (Map.Entry<String, Integer> entry : holder.entrySet()) {
        info.putIfAbsent(entry.getKey(), entry.getValue());
      }
      List<Integer> values = new ArrayList<>();
      for (String key : holder.keySet()) {
        values.add(info.get(key));
      }
      itemToNumberMapping.put(itemId, StringUtils.join(values, ","));
      catToValueHolder.put(categoryId, StringUtils.join(values, ","));
    }
    Category cat = getCategory(task, holder.isEmpty());
    tasksByCategory.add(cat);
    LinkedList<String> ids = getCategoryIds(task);
    catHolder.put(categoryId, ids.getLast());
    itemIds.add(itemId);
  }

现在我知道如何并行化一个 for 循环,如下例所示,但令人困惑的是 - 就我而言,在下面的示例中,我没有像 output 这样的对象。就我而言,我有多个数据结构,我通过迭代 for 循环来填充这些数据结构,所以我很困惑如何并行化最外层的 for 循环并仍然填充所有这些数据结构?

private final ExecutorService service = Executors.newFixedThreadPool(10);

List<Future<Output>> futures = new ArrayList<Future<Output>>();
for (final Input input : inputs) {
  Callable<Output> callable = new Callable<Output>() {
    public Output call() throws Exception {
      Output output = new Output();
      // process your input here and compute the output
      return output;
    }
  };
  futures.add(service.submit(callable));
}

service.shutdown();

List<Output> outputs = new ArrayList<Output>();
for (Future<Output> future : futures) {
  outputs.add(future.get());
}

更新:-

我正在并行化一个在 do while 循环内的 for 循环,并且我的 do while 循环运行直到 number 小于或等于 pages。所以也许我做得不对。因为我的 do while 循环将一直运行,直到所有页面都完成并且对于每个页面,我都有一个我正在尝试并行化的 for 循环以及我设置它的方式,它给出了 rejectedexecutionexception

  private void check() {
    String endpoint = "some_url";
    int number = 1;
    int pages = 0;
    do {
      ExecutorService executorService = Executors.newFixedThreadPool(10);
      for (int i = 1; i <= retryCount; i++) {
        try {
          HttpEntity<String> requestEntity =
              new HttpEntity<String>(getBody(number), getHeader());
          ResponseEntity<String> responseEntity =
              HttpClient.getInstance().getClient()
                  .exchange(URI.create(endpoint), HttpMethod.POST, requestEntity, String.class);
          String jsonInput = responseEntity.getBody();
          Process response = objectMapper.readValue(jsonInput, Process.class);
          pages = (int) response.getPaginationResponse().getTotalPages();
          List<Task> tasks = response.getTasks();
          if (pages <= 0 || tasks.isEmpty()) {
            continue;
          }
          // want to parallelize this for loop
          for (Task task : tasks) {
            Callable<Void> c = new Callable<>() {
              public void call() {
                if (!task.getCategories().isEmpty() && task.getEventList() != null
                    && task.getMetaInfo() != null) {
                    // my code here
                }
              }
            };
            executorService.submit(c);
          }
          // is this at right place? because I am getting rejectedexecutionexception
          executorService.shutdown();
          number++;
          break;
        } catch (Exception ex) {
          // log exception
        }
      }
    } while (number <= pages);
  }

【问题讨论】:

    标签: java multithreading thread-safety executorservice


    【解决方案1】:

    您不必从并行代码中输出某些内容。您只需获取外循环的主体并为每个项目创建一个任务,如下所示:

    for (Task task : tasks) {
       Callable<Void> c = new Callable<>() {
          public void call() {
             if (task.getCategories().isEmpty() || task.getEventList() == null || task.getMetaInfo() == null) {
                   // ... rest of code here
              }
           }
        };
        executorService.submit(c);
     }
    
    // wait for executor service, check for exceptions or whatever else you want to do here
    

    【讨论】:

    • 好吧有道理.. 只是为了确保您拥有的 if 检查将适用于 isNotEmptyisNotNull 对吗?在里面如果检查我可以拥有一切吗?我已经用新代码更新了我的问题,你能告诉我这看起来是否正确吗?
    • 我在按照您建议的方式运行时收到RejectedExecutionException 异常。我再次更新了我的问题,并提供了一些关于我在做什么的细节。如果我需要做任何更改,您可以看看并告诉我吗?
    • 在完成提交所有任务之前是否调用了shutdown?如果在 for 循环中错误地关闭了,就会发生这种情况。
    • 我不这么认为它被调用了。我已经用更多细节更新了这个问题。你能检查一下我是对的还是有什么不对?
    猜你喜欢
    • 2022-01-01
    • 1970-01-01
    • 2018-09-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-09-15
    • 2021-08-15
    相关资源
    最近更新 更多