【问题标题】:Writing [chained] CompletableFutures to CSV in Java用 Java 将 [chained] CompletableFutures 写入 CSV
【发布时间】:2021-03-31 18:02:00
【问题描述】:

我有一个 HashMap<String, CompletableFuture<HashMap<String, String>>> 将一个项目映射到它的属性,例如{ "pizza" -> { "calories" -> "120", "fat" -> "12" } },其中的属性是从不同的数据源中检索出来的。

例如,我们从数据库中获取"fat" 属性,而我们从Solr 中获取"calories" 属性。

当我最初从数据库中检索"fat" 时,我使用supplyAsync 以不阻塞主线程,例如:

  public CompletableFuture<HashMap<String, String>> getFat(String name, boolean) {
    return CompletableFuture.supplyAsync(new Supplier<HashMap<String, String>>() {
      @Override
      public HashMap<String, String> get() {
        HashMap<String, String> attributes = new HashMap<>();
        
        ... do work ...
      
        attributes.put(name, attributes);
        return attributes;
      }
   })
 }

然后我将它与对 Solr 的异步调用链接起来,这样我最终会有一个异步 Hashmap 将项目映射到它们的属性,即 HashMap&lt;String, CompletableFuture&lt;HashMap&lt;String, String&gt;&gt;&gt; itemsToAttributesMapping;(所以我循环遍历 hashmap 的键并使用新属性更新值,使用thenApply访问旧的)。

我通过将数据映射到 csv 来完成,这就是问题出现的地方:

       File file = new File(home + "/Downloads/rmsSkuValidationResults.csv");

       try{
          FileWriter outputfile = new FileWriter(file);
          CSVWriter writer = new CSVWriter(outputfile);

            for(String itemKey : itemsToAttributesMapping.keySet()) {
                itemsToAttributesMapping.get(itemKey).thenAccept(attributes -> {

                String[] row = { attributes.get("calories"),
                            attributes.get("fat")
                        
                        ... more attributes ...

                        };
                writer.writeNext(row);
                });
            }

         writer.close();
      }
      catch(Exception e){
        e.printStackTrace();
      }

按原样,打印到 CSV 文件对于大约 800-1100 项工作正常,但之后停止写入并且程序终止。

我已经尝试了上述的变体,包括使用get而不是thenAccept,或者在thenAccept之后添加join导致程序挂起(异步计算很快,不应该挂起)。

我还尝试存储我运行的 thenAccepts 的结果,然后在它们上调用 allOf,但这会导致奇怪的行为,即 Solr 的属性在几百个项目后停止显示。数据库中的属性确实会出现在每个项目中,这让我认为第一个 supplyAsync 始终有效,但随后的 thenApplys 将属性添加到 supplyAsnc 提供的原始 HashMap&lt;String, CompletableFuture&lt;HashMap&lt;String, String&gt;&gt;&gt; itemsToAttributesMapping; 停止工作。

任何关于可能是什么问题的见解将不胜感激。也许我对 CompletableFuture 的理解是不正确的,尤其是在链接和解析期货方面?也许这是一个超时问题,或者线程正在丢失?我扩展的最后一种方法表明问题可能是thenApplys?

【问题讨论】:

  • 一旦你克服了这个问题并完成了你的程序,我建议转换为使用 CompletionStage 而不是 CompletableFuture。 stackoverflow.com/a/49158702/925913
  • @slackwing 你有一个很好的评论,我会选择它作为接受的答案,因为它提供了很好的洞察力,但我想在这里澄清一下,我能够在重构 @ 后让它工作987654343@s!该代码并非微不足道,因此我无法在此处共享所有代码(尤其是在重构之前/之后),但是我有一个 thenApply ,在那里我检索了项目名称并进行了 Solr 查找,而不是添加属性。就在那里,我为这些新属性返回了一个新的CompletableFuture。以及后来的双链现有属性和新属性。在两个thenApplys 中。现在我只是添加新的属性。立即。
  • 我还要补充一点,我使用这种方法写入 csv 文件,这需要相当多的时间,但它不像 get 那样挂起:我首先添加了 @ 的结果987654347@ 到 ArrayList&lt;CompletableFuture&lt;Void&gt;&gt; async = new ArrayList&lt;CompletableFuture&lt;Void&gt;&gt;(); 然后运行以下命令:CompletableFuture[] asyncArray = async.toArray(CompletableFuture[]::new); var all = CompletableFuture.allOf(asyncArray); all.whenComplete((aVoid, throwable) -&gt; { System.out.println("Completed all"); }); all.join(); 抱歉格式化
  • 嗯。 Tbh,很难想象你描述的代码,但很高兴它成功了。是的,使用allOf 有效。但通常情况下(如果您可以控制服务器),我们会公开批处理 API(或更高级的流式 API)。如果您使用的是外部 API,您可能希望批处理您的请求以不达到速率限制,例如一次 20 个。即使没有速率限制,您也应该这样做,因为您可能会阻塞线程/不允许其他任务获得间歇性优先级。 (不过,我实际上并不完全确定最后的说法。)
  • @slackwing 我们确实有一个流 api,这是我最初打算使用的,但是有一些复杂情况迫使我不使用它,必须相应地使代码异步,以便它可以在合理的时间范围。

标签: java asynchronous solr hashmap completable-future


【解决方案1】:

这是您上面的代码的粗略说明,正如您所拥有的那样:

get(itemKey1) then at some unspecified time in the future writeNext(attr1)
get(itemKey2) then at some unspecified time in the future writeNext(attr2)
get(itemKey3) then at some unspecified time in the future writeNext(attr3)
get(itemKey4) then at some unspecified time in the future writeNext(attr4)
get(itemKey5) then at some unspecified time in the future writeNext(attr5)
get(itemKey6) then at some unspecified time in the future writeNext(attr6)
get(itemKey7) then at some unspecified time in the future writeNext(attr7)
attr1 finally delivered; writeNext(attr1)
get(itemKey8) then at some unspecified time in the future writeNext(attr8)
attr2 finally delivered; writeNext(attr2)
attr3 finally delivered; writeNext(attr3)
get(itemKey9) then at some unspecified time in the future writeNext(attr9)
no more items; writer.close()
attr4 finally delivered; oops, writer closed
attr5 finally delivered; oops, writer closed
attr6 finally delivered; oops, writer closed
attr7 finally delivered; oops, writer closed
attr8 finally delivered; oops, writer closed
attr9 finally delivered; oops, writer closed

您提到您尝试过.get().join()。这基本上会使程序同步,但这是一个很好的调试步骤。它将执行更改为:

get(itemKey1) then at some unspecified time in the future writeNext(attr1)
attr1 finally delivered; writeNext(attr1)
get(itemKey2) then at some unspecified time in the future writeNext(attr2)
attr2 finally delivered; writeNext(attr2)
get(itemKey3) then at some unspecified time in the future writeNext(attr3)
attr3 finally delivered; writeNext(attr3)
...
...
...
get(itemKey9) then at some unspecified time in the future writeNext(attr9)
attr9 finally delivered; writeNext(attr9)
no more items; writer.close()

这应该有效。将输出添加到您的每个阶段(您未显示的thenApplys 以​​及thenAccept)揭示了什么?真的有你说的那么快吗?

请显示更多代码。尤其是链接部分,如果你认为问题可能出在那个地方。

【讨论】:

  • 谢谢,这很有见地:) 请看我在原帖下留下的 cmets,它现在似乎可以工作了!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2017-05-21
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-09-19
  • 2015-07-16
  • 1970-01-01
相关资源
最近更新 更多