【发布时间】:2018-02-22 00:28:25
【问题描述】:
我有一个要并行化的 for 循环。在下面的代码中,我迭代了最外层的 for 循环并将条目放入各种数据结构中,它工作正常。所有这些数据结构在同一个类中都有一个 getter,我稍后会使用它来获取所有细节,一旦在其他类的这个 for 循环中完成所有操作。我正在填充info、itemToNumberMapping、catToValueHolder、tasksByCategory、catHolder、itemIds 数据结构,它们也有吸气剂。
// want to parallelize this for loop
for (Task task : tasks) {
if (task.getCategories().isEmpty() || task.getEventList() == null
|| task.getMetaInfo() == null) {
continue;
}
String itemId = task.getEventList().getId();
String categoryId = task.getCategories().get(0).getId();
Processor fp = new Processor(siteId, itemId, categoryId, poolType);
Map<String, Integer> holder = fp.getDataHolder();
if (!holder.isEmpty()) {
for (Map.Entry<String, Integer> entry : holder.entrySet()) {
info.putIfAbsent(entry.getKey(), entry.getValue());
}
List<Integer> values = new ArrayList<>();
for (String key : holder.keySet()) {
values.add(info.get(key));
}
itemToNumberMapping.put(itemId, StringUtils.join(values, ","));
catToValueHolder.put(categoryId, StringUtils.join(values, ","));
}
Category cat = getCategory(task, holder.isEmpty());
tasksByCategory.add(cat);
LinkedList<String> ids = getCategoryIds(task);
catHolder.put(categoryId, ids.getLast());
itemIds.add(itemId);
}
现在我知道如何并行化一个 for 循环,如下例所示,但令人困惑的是 - 就我而言,在下面的示例中,我没有像 output 这样的对象。就我而言,我有多个数据结构,我通过迭代 for 循环来填充这些数据结构,所以我很困惑如何并行化最外层的 for 循环并仍然填充所有这些数据结构?
private final ExecutorService service = Executors.newFixedThreadPool(10);
List<Future<Output>> futures = new ArrayList<Future<Output>>();
for (final Input input : inputs) {
Callable<Output> callable = new Callable<Output>() {
public Output call() throws Exception {
Output output = new Output();
// process your input here and compute the output
return output;
}
};
futures.add(service.submit(callable));
}
service.shutdown();
List<Output> outputs = new ArrayList<Output>();
for (Future<Output> future : futures) {
outputs.add(future.get());
}
更新:-
我正在并行化一个在 do while 循环内的 for 循环,并且我的 do while 循环运行直到 number 小于或等于 pages。所以也许我做得不对。因为我的 do while 循环将一直运行,直到所有页面都完成并且对于每个页面,我都有一个我正在尝试并行化的 for 循环以及我设置它的方式,它给出了 rejectedexecutionexception。
private void check() {
String endpoint = "some_url";
int number = 1;
int pages = 0;
do {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 1; i <= retryCount; i++) {
try {
HttpEntity<String> requestEntity =
new HttpEntity<String>(getBody(number), getHeader());
ResponseEntity<String> responseEntity =
HttpClient.getInstance().getClient()
.exchange(URI.create(endpoint), HttpMethod.POST, requestEntity, String.class);
String jsonInput = responseEntity.getBody();
Process response = objectMapper.readValue(jsonInput, Process.class);
pages = (int) response.getPaginationResponse().getTotalPages();
List<Task> tasks = response.getTasks();
if (pages <= 0 || tasks.isEmpty()) {
continue;
}
// want to parallelize this for loop
for (Task task : tasks) {
Callable<Void> c = new Callable<>() {
public void call() {
if (!task.getCategories().isEmpty() && task.getEventList() != null
&& task.getMetaInfo() != null) {
// my code here
}
}
};
executorService.submit(c);
}
// is this at right place? because I am getting rejectedexecutionexception
executorService.shutdown();
number++;
break;
} catch (Exception ex) {
// log exception
}
}
} while (number <= pages);
}
【问题讨论】:
标签: java multithreading thread-safety executorservice