【问题标题】:in Java get the results from two blocks executing in parallel在 Java 中从并行执行的两个块中获取结果
【发布时间】:2018-03-19 01:52:17
【问题描述】:

研究这个有点困难,因为我不确定这个问题应该如何措辞。这是一些总结我的目标的伪代码。

public class TestService {
    Object someBigMehtod(String A, Integer I) {
        {  //block A 
            //do some long database read
        }
        { //block B
            //do another long database read at the same time as block B
        }
        {  //block C
            //get in this block when both A & B are complete
            //and access result returned or pushed from A & B
            //to build up some data object to push out to a class that called
            //this service or has subscribed to it
            return null;
        }
    }
}

我想我可以使用 RxJava 或 Spring Integration 来完成此任务,或者只是实例化多个线程并运行它们。只是它的布局让我认为 Rx 有解决方案,因为我认为数据被推送到块 C。提前感谢您可能有的任何建议。

【问题讨论】:

  • @Ankur 如果您有解决方案,请将其发布为答案。

标签: java multithreading spring-integration rx-java


【解决方案1】:

您可以使用CompletableFuture 执行此操作。特别是它的thenCombine 方法,它等待两个任务完成。

CompletableFuture<A> fa = CompletableFuture.supplyAsync(() -> {
    // do some long database read
    return a;
});

CompletableFuture<B> fb = CompletableFuture.supplyAsync(() -> {
    // do another long database read
    return b;
});

CompletableFuture<C> fc = fa.thenCombine(fb, (a, b) -> {
    // use a and b to build object c
    return c;
});

return fc.join();

这些方法都将在ForkJoinPool.commonPool() 上执行。如果你传入可选的Executors,你可以控制它们的运行位置。

【讨论】:

    【解决方案2】:

    您可以使用 Rxjava 中的 Zip 运算符。该运算符可以并行运行多个进程,然后压缩结果。

    一些文档http://reactivex.io/documentation/operators/zip.html

    这里有一个例子说明https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/combining/ObservableZip.java

    【讨论】:

      【解决方案3】:

      现在我只是接受约翰的建议。这正在获得预期的效果。我混合了 RxJava1 和 RxJava2 语法,这可能是不好的做法。看起来我有一些关于 java.util.concurrent package 的阅读内容。如果时间允许,我想做 zip 解决方案。

      @Test
      public void myBigFunction(){
          System.out.println("starting ");
          CompletableFuture<List<String>> fa =  CompletableFuture.supplyAsync( () ->
              {  //block A
                  //do some long database read
                  try {
      
                      Thread.sleep(3000);
                      System.out.println("part A");
                      return asList(new String[] {"abc","def"});
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  return null;
              }
          );
      
          CompletableFuture<List<Integer>> fb =  CompletableFuture.supplyAsync( () ->
                  {  //block B
                      //do some long database read
                      try {
                          Thread.sleep(6000);
                          System.out.println("Part B");
                          return asList(new Integer[] {123,456});
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                      return null;
                  }
          );
      
          CompletableFuture<List<String>> fc = fa.thenCombine(fb,(a,b) ->{
              //block C
              //get in this block when both A & B are complete
              int sum = b.stream().mapToInt(i -> i.intValue()).sum();
              return a.stream().map(new Function<String, String>() {
                  @Override
                  public String apply(String s) {
                      return s+sum;
                  }
              }).collect(Collectors.toList());
          });
          System.out.println(fc.join());
      }
      

      运行只需 6 秒。

      【讨论】:

        猜你喜欢
        • 2011-07-03
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2016-03-21
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多