【发布时间】:2021-03-31 18:02:00
【问题描述】:
我有一个 HashMap<String, CompletableFuture<HashMap<String, String>>> 将一个项目映射到它的属性,例如{ "pizza" -> { "calories" -> "120", "fat" -> "12" } },其中的属性是从不同的数据源中检索出来的。
例如,我们从数据库中获取"fat" 属性,而我们从Solr 中获取"calories" 属性。
当我最初从数据库中检索"fat" 时,我使用supplyAsync 以不阻塞主线程,例如:
public CompletableFuture<HashMap<String, String>> getFat(String name, boolean) {
return CompletableFuture.supplyAsync(new Supplier<HashMap<String, String>>() {
@Override
public HashMap<String, String> get() {
HashMap<String, String> attributes = new HashMap<>();
... do work ...
attributes.put(name, attributes);
return attributes;
}
})
}
然后我将它与对 Solr 的异步调用链接起来,这样我最终会有一个异步 Hashmap 将项目映射到它们的属性,即 HashMap<String, CompletableFuture<HashMap<String, String>>> itemsToAttributesMapping;(所以我循环遍历 hashmap 的键并使用新属性更新值,使用thenApply访问旧的)。
我通过将数据映射到 csv 来完成,这就是问题出现的地方:
File file = new File(home + "/Downloads/rmsSkuValidationResults.csv");
try{
FileWriter outputfile = new FileWriter(file);
CSVWriter writer = new CSVWriter(outputfile);
for(String itemKey : itemsToAttributesMapping.keySet()) {
itemsToAttributesMapping.get(itemKey).thenAccept(attributes -> {
String[] row = { attributes.get("calories"),
attributes.get("fat")
... more attributes ...
};
writer.writeNext(row);
});
}
writer.close();
}
catch(Exception e){
e.printStackTrace();
}
按原样,打印到 CSV 文件对于大约 800-1100 项工作正常,但之后停止写入并且程序终止。
我已经尝试了上述的变体,包括使用get而不是thenAccept,或者在thenAccept之后添加join导致程序挂起(异步计算很快,不应该挂起)。
我还尝试存储我运行的 thenAccepts 的结果,然后在它们上调用 allOf,但这会导致奇怪的行为,即 Solr 的属性在几百个项目后停止显示。数据库中的属性确实会出现在每个项目中,这让我认为第一个 supplyAsync 始终有效,但随后的 thenApplys 将属性添加到 supplyAsnc 提供的原始 HashMap<String, CompletableFuture<HashMap<String, String>>> itemsToAttributesMapping; 停止工作。
任何关于可能是什么问题的见解将不胜感激。也许我对 CompletableFuture 的理解是不正确的,尤其是在链接和解析期货方面?也许这是一个超时问题,或者线程正在丢失?我扩展的最后一种方法表明问题可能是thenApplys?
【问题讨论】:
-
一旦你克服了这个问题并完成了你的程序,我建议转换为使用 CompletionStage 而不是 CompletableFuture。 stackoverflow.com/a/49158702/925913
-
@slackwing 你有一个很好的评论,我会选择它作为接受的答案,因为它提供了很好的洞察力,但我想在这里澄清一下,我能够在重构 @ 后让它工作987654343@s!该代码并非微不足道,因此我无法在此处共享所有代码(尤其是在重构之前/之后),但是我有一个 thenApply ,在那里我检索了项目名称并进行了 Solr 查找,而不是添加属性。就在那里,我为这些新属性返回了一个新的
CompletableFuture。以及后来的双链现有属性和新属性。在两个thenApplys 中。现在我只是添加新的属性。立即。 -
我还要补充一点,我使用这种方法写入 csv 文件,这需要相当多的时间,但它不像
get那样挂起:我首先添加了 @ 的结果987654347@ 到ArrayList<CompletableFuture<Void>> async = new ArrayList<CompletableFuture<Void>>();然后运行以下命令:CompletableFuture[] asyncArray = async.toArray(CompletableFuture[]::new); var all = CompletableFuture.allOf(asyncArray); all.whenComplete((aVoid, throwable) -> { System.out.println("Completed all"); }); all.join();抱歉格式化 -
嗯。 Tbh,很难想象你描述的代码,但很高兴它成功了。是的,使用
allOf有效。但通常情况下(如果您可以控制服务器),我们会公开批处理 API(或更高级的流式 API)。如果您使用的是外部 API,您可能希望批处理您的请求以不达到速率限制,例如一次 20 个。即使没有速率限制,您也应该这样做,因为您可能会阻塞线程/不允许其他任务获得间歇性优先级。 (不过,我实际上并不完全确定最后的说法。) -
@slackwing 我们确实有一个流 api,这是我最初打算使用的,但是有一些复杂情况迫使我不使用它,必须相应地使代码异步,以便它可以在合理的时间范围。
标签: java asynchronous solr hashmap completable-future