用Java将[chained] CompletableFutures写入CSV

artin2

HashMap<String, CompletableFuture<HashMap<String, String>>>将一个项目映射到它的属性,例如{ "pizza" -> { "calories" -> "120", "fat" -> "12" } },从不同的数据源检索属性。

例如,我们"fat"从数据库中获取"calories"属性,而我们从Solr中获取属性。

最初"fat"从数据库检索时supplyAsync,为了不阻塞主线程,我使用了类似以下内容:

  public CompletableFuture<HashMap<String, String>> getFat(String name, boolean) {
    return CompletableFuture.supplyAsync(new Supplier<HashMap<String, String>>() {
      @Override
      public HashMap<String, String> get() {
        HashMap<String, String> attributes = new HashMap<>();
        
        ... do work ...
      
        attributes.put(name, attributes);
        return attributes;
      }
   })
 }

然后,我将其链接到对Solr的异步调用,以便最终得到一个异步Hashmap映射项到它们的属性,即HashMap<String, CompletableFuture<HashMap<String, String>>> itemsToAttributesMapping;(因此,我遍历了hashmap的键并用新属性更新值,使用来访问旧属性thenApply)。

我首先将数据映射到csv,这就是问题所在:

       File file = new File(home + "/Downloads/rmsSkuValidationResults.csv");

       try{
          FileWriter outputfile = new FileWriter(file);
          CSVWriter writer = new CSVWriter(outputfile);

            for(String itemKey : itemsToAttributesMapping.keySet()) {
                itemsToAttributesMapping.get(itemKey).thenAccept(attributes -> {

                String[] row = { attributes.get("calories"),
                            attributes.get("fat")
                        
                        ... more attributes ...

                        };
                writer.writeNext(row);
                });
            }

         writer.close();
      }
      catch(Exception e){
        e.printStackTrace();
      }

照原样,打印到CSV文件对于某些800-1100项可以正常工作,但是在此之后停止写入,程序终止。

我已经尝试了以上方法的变体,包括使用get代替thenAcceptjointhenAccept导致程序挂起之后添加(异步计算速度很快,不应挂起)。

我也尝试过存储thenAcceptsI运行的结果,然后调用allOf它们,但这会导致奇怪的行为,Solr的属性在几百个项目后就会停止显示。数据库中的属性确实会出现在每个项目中,这使我认为第一个supplyAsync始终有效,但随后thenApply的会将属性添加到HashMap<String, CompletableFuture<HashMap<String, String>>> itemsToAttributesMapping;supplyAsnc停止工作提供的原始属性中

对于可能是问题的任何见解将不胜感激。也许我对CompletableFuture的理解是不正确的,尤其是在链接和解决期货方面?也许这是超时问题,或者线程丢失了?我扩展的最后一种方法表明问题可能是thenApplys?

松弛翼

这是上面的代码在做的粗略说明:

get(itemKey1) then at some unspecified time in the future writeNext(attr1)
get(itemKey2) then at some unspecified time in the future writeNext(attr2)
get(itemKey3) then at some unspecified time in the future writeNext(attr3)
get(itemKey4) then at some unspecified time in the future writeNext(attr4)
get(itemKey5) then at some unspecified time in the future writeNext(attr5)
get(itemKey6) then at some unspecified time in the future writeNext(attr6)
get(itemKey7) then at some unspecified time in the future writeNext(attr7)
attr1 finally delivered; writeNext(attr1)
get(itemKey8) then at some unspecified time in the future writeNext(attr8)
attr2 finally delivered; writeNext(attr2)
attr3 finally delivered; writeNext(attr3)
get(itemKey9) then at some unspecified time in the future writeNext(attr9)
no more items; writer.close()
attr4 finally delivered; oops, writer closed
attr5 finally delivered; oops, writer closed
attr6 finally delivered; oops, writer closed
attr7 finally delivered; oops, writer closed
attr8 finally delivered; oops, writer closed
attr9 finally delivered; oops, writer closed

您提到您尝试过.get().join()这基本上可以使程序同步,但这是一个很好的调试步骤。它将执行更改为:

get(itemKey1) then at some unspecified time in the future writeNext(attr1)
attr1 finally delivered; writeNext(attr1)
get(itemKey2) then at some unspecified time in the future writeNext(attr2)
attr2 finally delivered; writeNext(attr2)
get(itemKey3) then at some unspecified time in the future writeNext(attr3)
attr3 finally delivered; writeNext(attr3)
...
...
...
get(itemKey9) then at some unspecified time in the future writeNext(attr9)
attr9 finally delivered; writeNext(attr9)
no more items; writer.close()

这应该起作用了。向每个阶段(thenApply未显示的和thenAccept添加输出会显示什么?确实如您所说的那样进行吗?

请显示更多代码。尤其是在链接部分,如果您认为这是个问题,可能是这样的地方。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章