【问题标题】:Executing Dependent tasks in parallel in Java在 Java 中并行执行依赖任务
【发布时间】:2012-06-01 17:50:49
【问题描述】:

我需要找到一种在java中并行执行任务(依赖和独立)的方法。

  1. 任务 A 和任务 C 可以独立运行。
  2. 任务 B 依赖于任务 A 的输出。

我检查了 java.util.concurrent Future 和 Fork/Join,但看起来我们无法向任务添加依赖项。

谁能指点我纠正Java API。

【问题讨论】:

  • 您是否考虑过让任务 A 在完成时通知任务 B?在启动任务 A 之前,实例化任务 B 并将其作为观察者添加到任务 A(参见观察者模式)。
  • Guava 的 ListenableFuture 在这些方面比普通的 Futures 更友好。

标签: java multithreading


【解决方案1】:

在 Scala 中,这很容易做到,而且我认为您最好使用 Scala。这是我从http://danielwestheide.com/(Scala 新手指南第 16 部分:从这里开始)的一个示例,这个人有一个很棒的博客(我不是那个人)

让我们请一位咖啡师煮咖啡。要做的任务是:

  1. 研磨所需的咖啡豆(无先前任务)
  2. 加热一些水(没有之前的任务)
  3. 使用研磨咖啡和热水冲泡浓缩咖啡(取决于 1 和 2)
  4. 起泡一些牛奶(没有之前的任务)
  5. 混合泡沫牛奶和浓缩咖啡(取决于 3,4)

或作为一棵树:

Grind   _
Coffe    \
          \   
Heat    ___\_Brew____ 
Water                \_____Combine
                     /
Foam    ____________/
Milk

在使用并发 api 的 java 中,这将是:

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class Barrista {

    static class HeatWater implements Callable<String> {
        @Override
        public String call() throws Exception {
            System.out.println("Heating Water");
            Thread.sleep(1000);
            return "hot water";
        }
    }

    static class GrindBeans implements Callable<String> {
        @Override
        public String call() throws Exception {
            System.out.println("Grinding Beans");
            Thread.sleep(2000);
            return "grinded beans";
        }
    }

    static class Brew implements Callable<String> {

        final Future<String> grindedBeans;
        final Future<String> hotWater;

        public Brew(Future<String> grindedBeans, Future<String> hotWater) {
            this.grindedBeans = grindedBeans;
            this.hotWater = hotWater;
        }

        @Override
        public String call() throws Exception
        {
            System.out.println("brewing coffee with " + grindedBeans.get()
                    + " and " + hotWater.get());
            Thread.sleep(1000);
            return "brewed coffee";
        }
    }

    static class FrothMilk implements Callable<String> {

        @Override
        public String call() throws Exception {
            Thread.sleep(1000);
            return "some milk";
        }
    }

    static class Combine implements Callable<String> {

        public Combine(Future<String> frothedMilk, Future<String> brewedCoffee) {
            super();
            this.frothedMilk = frothedMilk;
            this.brewedCoffee = brewedCoffee;
        }

        final Future<String> frothedMilk;
        final Future<String> brewedCoffee;

        @Override
        public String call() throws Exception {
            Thread.sleep(1000);
            System.out.println("Combining " + frothedMilk.get() + " "
                    + brewedCoffee.get());
            return "Final Coffee";
        }

    }

    public static void main(String[] args) {

        ExecutorService executor = Executors.newFixedThreadPool(2);

        FutureTask<String> heatWaterFuture = new FutureTask<String>(new HeatWater());
        FutureTask<String> grindBeans = new FutureTask<String>(new GrindBeans());
        FutureTask<String> brewCoffee = new FutureTask<String>(new Brew(grindBeans, heatWaterFuture));
        FutureTask<String> frothMilk = new FutureTask<String>(new FrothMilk());
        FutureTask<String> combineCoffee = new FutureTask<String>(new Combine(frothMilk, brewCoffee));

        executor.execute(heatWaterFuture);
        executor.execute(grindBeans);
        executor.execute(brewCoffee);
        executor.execute(frothMilk);
        executor.execute(combineCoffee);


        try {

            /**
             *  Warning this code is blocking !!!!!!!
             */         
            System.out.println(combineCoffee.get(20, TimeUnit.SECONDS));
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            System.out.println("20 SECONDS FOR A COFFEE !!!! I am !@#! leaving!!");
            e.printStackTrace();
        } finally{
                executor.shutdown();
            }
        }
    }

请确保添加超时,以确保您的代码不会永远等待某事完成,这是通过使用 Future.get(long, TimeUnit) 完成的,然后相应地处理失败。

但在 scala 中要好得多,就像在博客上一样: 准备咖啡的代码如下所示:

def prepareCappuccino(): Try[Cappuccino] = for {
  ground <- Try(grind("arabica beans"))
  water <- Try(heatWater(Water(25)))
  espresso <- Try(brew(ground, water))
  foam <- Try(frothMilk("milk"))
} yield combine(espresso, foam)

所有方法都返回一个未来(类型化的未来),例如研磨会是这样的:

def grind(beans: CoffeeBeans): Future[GroundCoffee] = Future {
   // grinding function contents
}

对于所有实现,请查看博客,但仅此而已。您也可以轻松集成 Scala 和 Java。我真的建议在 Scala 而不是 Java 中做这种事情。 Scala 需要更少的代码、更简洁和事件驱动。

【讨论】:

    【解决方案2】:

    具有依赖关系的任务的通用编程模型是Dataflow。简化模型,其中每个任务只有一个,虽然重复,依赖是Actor model。 Java 有很多 Actor 库,但用于数据流的却很少。 另见:which-actor-model-library-framework-for-javajava-pattern-for-nested-callbacks

    【讨论】:

      【解决方案3】:

      使用阻塞队列。将任务A的输出放入队列,任务B阻塞,直到队列中有可用的东西。

      文档包含实现此目的的示例代码:http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/BlockingQueue.html

      【讨论】:

        【解决方案4】:

        Java 定义了一个 CompletableFuture 类。

        https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html

        这就是你要找的。 它有助于构建执行流程。

        【讨论】:

          【解决方案5】:

          您需要的是CountDownLatch

          final CountDownLatch gate = new CountDownLatch(2);
          // thread a
          new Thread() {
              public void run() {
                  // process
                  gate.countDown();
              }
          }.start();
          
          // thread c
          new Thread() {
              public void run() {
                  // process
                  gate.countDown();
              }
          }.start();
          
          new Thread() {
              public void run() {
                  try {
                      gate.await();
                      // both thread a and thread c have completed
                      // process thread b
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
          
              }
          }.start();
          

          作为替代方案,根据您的方案,您也可以使用BlockingQueue 来实现生产者-消费者模式。请参阅文档页面上的示例。

          【讨论】:

          • A CountDownLatch 在这里是多余的,根据 OP,任务 B 仅依赖于任务 A,而不是任务 A 和 C。也就是说,-1 只是为了不处理InterruptedException 在 sn-p 中正确。
          • 谢谢,编写代码 sn-p 的想法是向他展示 CountDownLatch 是如何工作的,而不是向他展示如何正确处理异常。
          【解决方案6】:

          如果任务 B 依赖于任务 A 的输出,我首先要质疑任务 B 是否真的是一个单独的任务。如果存在以下情况,则将任务分开是有意义的:

          • 在需要任务 A 的结果之前,任务 B 可以完成的一些重要工作
          • 任务 B 是一个长期持续的过程,它处理来自任务 A 的许多不同实例的输出
          • 还有一些其他任务(例如 D)也使用任务 A 的结果

          假设它是一个单独的任务,那么您可以允许任务 A 和 B 共享一个BlockingQueue,以便任务 A 可以传递任务 B 数据。

          【讨论】:

            【解决方案7】:

            使用这个库https://github.com/familysyan/TaskOrchestration。它为您管理任务依赖关系。

            【讨论】:

              【解决方案8】:

              有一个专门用于此目的的 java 库(免责声明:我是该库的所有者),名为 Dexecutor

              这里是你如何达到预期的结果,你可以阅读更多关于它here

              @Test
              public void testDependentTaskExecution() {
              
                  DefaultDependentTasksExecutor<String, String> executor = newTaskExecutor();
              
                  executor.addDependency("A", "B");
                  executor.addIndependent("C");
              
                  executor.execute(ExecutionBehavior.RETRY_ONCE_TERMINATING);
              
              }
              
              private DefaultDependentTasksExecutor<String, String> newTaskExecutor() {
                  return new DefaultDependentTasksExecutor<String, String>(newExecutor(), new SleepyTaskProvider());
              }
              
              private ExecutorService newExecutor() {
                  return Executors.newFixedThreadPool(ThreadPoolUtil.ioIntesivePoolSize());
              }
              
              private static class SleepyTaskProvider implements TaskProvider<String, String> {
              
                  public Task<String, String> provid(final String id) {
              
                      return new Task<String, String>() {
              
                          @Override
                          public String execute() {
                              try {
                                  //Perform some task
                                  Thread.sleep(500);
                              } catch (InterruptedException e) {
                                  e.printStackTrace();
                              }
                              String result = id + "processed";
                              return result;
                          }
              
                          @Override
                          public boolean shouldExecute(ExecutionResults<String, String> parentResults) {
                              ExecutionResult<String, String> firstParentResult = parentResults.getFirst();
                              //Do some logic with parent result
                              if ("B".equals(id) && firstParentResult.isSkipped()) {
                                  return false;
                              }
                              return true;
                          }
                      };          
                  }
              
              }
              

              【讨论】:

                猜你喜欢
                • 2023-03-30
                • 1970-01-01
                • 1970-01-01
                • 2011-09-28
                • 1970-01-01
                • 1970-01-01
                • 1970-01-01
                • 2019-09-08
                • 2016-04-22
                相关资源
                最近更新 更多