【问题标题】:How to design an execution engine for a sequence of tasks如何为一系列任务设计执行引擎
【发布时间】:2014-12-08 13:27:58
【问题描述】:

我正在尝试用 Java 编写一个我必须执行一堆任务的问题。

问题

执行一个由多个任务组成的作业,这些任务之间存在依赖关系。

一个作业会有一个任务列表,每个这样的任务还会有一个后续任务列表(每个后续任务都有自己的后续任务 - 您可以在此处查看递归性质)。如果满足以下条件,每个后续任务都可以开始执行 -

  1. 它被配置为在其前任任务的部分执行时执行。在这种情况下,前任任务将通知它已部分完成,我的后继任务可以开始

  2. 成功完成其前置任务。

示例

作业有 2 个初始任务 A 和 B。A 有 2 个后续任务 M 和 N。B 有 1 个后续任务 P。P 有 2 个后续任务 Y 和 Z。

M 可以在其前置任务 A 部分完成时开始。 Z 可以在其前任任务 P 部分完成时开始。 N、P 和 Y 只能在分别完成其前置任务 A、B 和 P 后开始。

我必须设计这样一个工作流/作业的执行。在设计中,我们必须确认前导任务发送的部分完成事件,以便可以启动其后继任务。我应该怎么做?有没有适合这个并发问题的设计模式?

【问题讨论】:

  • 一个任务能否创建和启动另一个任务(可能通过父作业)?如果是这样,则不需要“部分完成事件”,作业和子任务可以使用 ForkJoinPool 之类的东西来启动(子)任务并等待完成。
  • 没有父/子任务..任务不会分叉成子任务...从这个意义上说,任务是独立的(但它们之间有顺序)
  • 如果任务是独立的,那么究竟什么是“部分”完成而不是“完成”? “后续”任务是线性的还是并行的?当你说“A 有 2 个后继任务 M 和 N”时,你的意思是“A 有一个后继任务 M,而后者有一个后继任务 N”吗?
  • 排序是依赖关系,您给出的示例是具有父/子关系的任务树。只是树中一个分支的执行依赖于树中另一个分支的任务完成状态。由于作业是任务树的根,因此在我看来,作业需要跟踪整体进度并在接收到(部分)任务完成事件后触发分支继续执行。
  • @SteveK M 可以在 A 部分完成时开始,而 N 只能在 A 完成后开始。因此,N 不是 M 的后续任务

标签: java multithreading concurrency


【解决方案1】:

看看akka - http://akka.io

使用 akka 您可以创建参与者(事件驱动、异步处理消息的并发实体)

每个任务都可以表示为一个演员(您可以选择何时启动它)

您可以在部分完成或完全完成时触发其他参与者(任务)(实际上您可以随时触发它们)

【讨论】:

  • 感谢您的回复..我不想使用第三方工具..我想自己实现..
  • 我建议坚持逻辑,将执行引擎的实现留给其他人
【解决方案2】:

您的问题看起来像是 Java 的 ForkJoin Framework 的一个很好的用例。您可以将您的任务实现为RecursiveActions 或RecursiveTasks(取决于您是否需要返回值),这将在您需要的任何条件下启动它们的子任务。您还可以控制子任务是按顺序运行还是并行运行。

例子:

public class TaskA extends RecursiveAction {
  // ...

  protected void compute() {
    if (conditionForTaskM) {
      TaskM m = new TaskM();
      // Run task M asynchronously or use m.invoke() to run it synchronously.
      invokeAll(m);
    }

    // Run task N at the end of A
    invokeAll(new TaskN());
  }

}

您需要一个 ForkJoinPool 的实例来运行您的任务:

public static void main(String[] args) {
  ForkJoinPool pool = new ForkJoinPool();
  pool.submit(new TaskA());

  // Properly shutdown your pool...
}

此示例在实现示例问题的一部分时非常简单。但一般来说,ForkJoin 框架允许您创建任务的树状结构,其中每个父任务(例如 A、B 和 P)都允许您控制其直接子任务的执行。

【讨论】:

    【解决方案3】:

    在模式使用方面有 2 个选项,但本质上它们非常相似。在任何一种情况下,循环依赖情况都需要作为任务依赖配置中的错误来处理。例如A -> B -> A

    1. 中介模式

    每个task[i]完成后,会通知Mediator,Mediator会通知task[i]的所有后续任务。当执行引擎启动时,将读取任务依赖关系图作为 Mediator 使用的数据结构。

    1. 通过消息总线发布/订阅模式。

    任务依赖关系图将在引擎启动时被读取,每个任务将订阅其前任任务的 (task[i]) 在 topic[i] 上的完成消息的 MessageBus

    当每个task[i]完成时,它会向topic[i]中的MessageBus发送完成消息。每个订阅 topic[i] 的任务都会收到通知并开始工作。

    【讨论】:

      【解决方案4】:

      您的问题很有趣,因为有人可以使用这种设计模拟一个简单的神经网络。就答案而言,我想将问题视为任务排序而不是多线程/并发问题,因为只需执行排序任务即可实现并发。现在让我们尝试使用event driven programming 来实现这一点,因为它允许很好的松散耦合组件。所以,现在我们的设计本质上是响应式的,所以我们会担心一旦我们完成了依赖任务的信号,让我们使用observer pattern here

      您的任务既是可观察的又是观察者,因为它们等待来自前任的通知并通知后继者,这为我们提供了以下构造。

      // Task.java
      public abstract class Task extends Observable implements Runnable, Observer {
          private final Mutex lock = new Mutex();
          private final String taskId;
      
          public String getTaskId() {
              return this.taskId;
          }
      
          private final Set<String> completedTasks;
          private final Set<String> shouldCompletedTasksBeforeStart;
      
          public Task(final String taskId) {
              this.taskId = taskId;
              this.completedTasks = new HashSet<>();
              this.shouldCompletedTasksBeforeStart = new HashSet<>();
          }
      
          @Override
          public void run() {
              while (true) {
                  this.lock.getLock();
                  if (this.completedTasks.equals(this.shouldCompletedTasksBeforeStart)) {
                      doWork();
                      setChanged();
                      notifyObservers(this.taskId);
                      // reset
                      this.completedTasks.clear();
                  }
                  this.lock.freeLock();
                  try {
                      // just some sleep, you change to how it fits you
                      Thread.sleep(1000);
                  } catch (final InterruptedException e) {
                      // TODO Auto-generated catch block
                  }
              }
          }
      
          @Override
          public void update(final Observable observable, final Object arg) {
              this.lock.getLock();
              this.completedTasks.add((String) arg);
              this.lock.freeLock();
          }
      
          public void addPredecessorTask(final Task task) {
              if (this.taskId.equals(task.taskId)) {
                  return;
              }
              this.lock.getLock();
              // Notice here, it is a little logic make your predecessor/successor work
              task.addObserver(this);
              this.shouldCompletedTasksBeforeStart.add(task.taskId);
              this.lock.freeLock();
          }
      
          protected abstract void doWork();
      
      }
      
      //HelloTask.java
      public static class HelloTask extends Task {
          public HelloTask(final String taskId) {
              super(taskId);
          }
      
          @Override
          protected void doWork() {
              System.out.println("Hello from " + getTaskId() + "!");
          }
      }
      
      //Main.java
      public class Main {
          public static void main(final String[] args) {
              final HelloTask helloTaskA = new HelloTask("A");
              final HelloTask helloTaskB = new HelloTask("B");
              final HelloTask helloTaskC = new HelloTask("C");
      
              helloTaskA.addPredecessorTask(helloTaskB);
              helloTaskC.addPredecessorTask(helloTaskB);
      
              final ExecutorService pool = Executors.newFixedThreadPool(10);
              pool.execute(helloTaskC);
              pool.execute(helloTaskA);
              pool.execute(helloTaskB);
      
          }
      }
      

      实现是非常基本的实现,您可以对其进行改进,但它为您提供了基础结构。知道你在哪里应用它会很有趣?

      【讨论】:

        【解决方案5】:

        如果我能很好地理解您的需求,您可以使用像 Activity 这样的工作流引擎来解决它。 我认为这比根据您的特定需求重新发明工作流引擎要容易。

        【讨论】:

        • 感谢您的回复。我不想使用任何工作流引擎。我认为它不够复杂,不足以保证使用工作流引擎。
        • 在这种情况下,您将不得不实现一个线程定序器。启动一个主线程,其中包含一个线程队列以供您执行所有任务。流程将由主线程(您首先创建的主线程,持有队列)控制。
        【解决方案6】:

        如果您想重新发明轮子并自己开发解决方案,那很好 - 这是您的选择。然而,正确地做到这一点相当困难,尤其是螺纹部分。但是,如果您至少可以在构建模块方面考虑一些外部帮助,则可以是:

        • guava's ListenableFuture - 使用这个库,您可以创建 Callables 并将它们提供给特殊的线程池执行器,然后允许在 ListenableFuture 完成时自定义回调。
        • 您可以查看RX Java Observable,它允许混合和匹配不同的任务。这使用了非命令式编码风格,所以要小心!

        【讨论】:

          【解决方案7】:

          您的问题似乎是观察者模式的修改版本。 下面的解决方案比它允许依赖任务列表来进行更通用。

          创建一个类Task如下:

          class Task{
          List<Task> partialCompletionSuccessors;//List of tasks dependent on the partial completeion of this Task
          List<Task> fullCompletetionSuccessors;//List of tasks dependent on the full completeion of this Task
          
          List<Task> partialCompletionPredeccessor;//List of tasks that this task depends on their partial completion to start
          List<Task> fullCompletetionPredeccessor;//List of tasks that this task depends on their full completion to start
          
          
          private void notifySuccessorsOfPartialCompletion(){
              for(Task task: partialCompletionSuccessors){
                     task.notifyOfPartialCompletion(this);
              }
          
          }
          
          private void notifySuccessorsOfFullCompletion(){
              for(Task task: fullCompletetionSuccessors){
                     task.notifyOfPartialCompletion(this);
              }
          
          }
          
          private tryToProcceed(){
           if(partialCompletionPredeccessor.size() == 0 && fullCompletetionPredeccessor.size()==0 ){
               //Start the following task...
                ....
               //When this task partially completes
              notifySuccessorsOfPartialCompletion();
          
                //When this task fully completes
                notifySuccessorsOfFullCompletion();
          
            }
          
          }
          
          
          public void notifyOfPartialCompletion(Task task){// A method to notify the following task that a predeccessor task has partially completed
                partialCompletionPredeccessor.remove(task);
                tryToProcceed();
          
          }
          
          public void notifyOfFullCompletion(Task task){// A method to notify the following task that a predeccessor task has partially completed
                fullCompletetionPredeccessor.remove(task);
                tryToProcceed();
          
          }
          
          }
          

          【讨论】:

            【解决方案8】:

            这个问题很重。我可以在您的设计中看到至少三个不同的子系统:

            • task DAG(对我来说部分完成只是意味着您实际上有 2 个节点而不是 1 个),例如可以将其持久化在数据库中;
            • 工作队列,必须将要执行的下一个任务排入队列(这可能是某种消息队列,同样用于持久性/事务性);
            • 实际执行框架,可能涉及访问服务/数据库等外部资源,并且可能是分布式的。

            您的描述非常高级,因此您可以开始设计这三个部分所需的抽象以及它们如何相互交互(在接口方面)。

            完成所有这些后,您就可以开始提供一些简单的实现了。我将从“本地模式”开始,使用简单的内存 DAG、阻塞队列和某种类型的 java 执行器。

            您的问题没有提供有关 SLA、作业时长、失败/重试策略、事务等的详细信息,因此很难说您的模块应该如何实施。但我建议从高级抽象的角度思考并迭代实现。优秀的代码永远无法修复糟糕的设计。

            您可以就此打住,如果需要,也可以开始用第三方产品替换每个实施。

            【讨论】:

              【解决方案9】:

              有一个专门用于此目的的框架,称为Dexecutor,使用 Dexecutor,您可以根据图形对您的需求进行建模,在执行时,dexecutor 将负责以可靠的方式执行它。

              例如:

              @Test
              public void testDependentTaskExecution() {
              
                  ExecutorService executorService = newExecutor();
                  ExecutionEngine<Integer, Integer> executionEngine = new DefaultExecutionEngine<>(executorService);
              
                  try {
                      DefaultDependentTasksExecutor<Integer, Integer> executor = new DefaultDependentTasksExecutor<Integer, Integer>(
                              executionEngine, new SleepyTaskProvider());
              
                      executor.addDependency(1, 2);
                      executor.addDependency(1, 2);
                      executor.addDependency(1, 3);
                      executor.addDependency(3, 4);
                      executor.addDependency(3, 5);
                      executor.addDependency(3, 6);
                      executor.addDependency(2, 7);
                      executor.addDependency(2, 9);
                      executor.addDependency(2, 8);
                      executor.addDependency(9, 10);
                      executor.addDependency(12, 13);
                      executor.addDependency(13, 4);
                      executor.addDependency(13, 14);
                      executor.addIndependent(11);
              
                      executor.execute(ExecutionConfig.NON_TERMINATING);
              
                      Collection<Node<Integer, Integer>> processedNodesOrder = Deencapsulation.getField(executor, "processedNodes");
                      assertThat(processedNodesOrder).containsAll(executionOrderExpectedResult());
                      assertThat(processedNodesOrder).size().isEqualTo(14);
              
                  } finally {
                      try {
                          executorService.shutdownNow();
                          executorService.awaitTermination(1, TimeUnit.SECONDS);
                      } catch (InterruptedException e) {
              
                      }
                  }
              }
              
              private Collection<Node<Integer, Integer>> executionOrderExpectedResult() {
                  List<Node<Integer, Integer>> result = new ArrayList<Node<Integer, Integer>>();
                  result.add(new Node<Integer, Integer>(1));
                  result.add(new Node<Integer, Integer>(2));
                  result.add(new Node<Integer, Integer>(7));
                  result.add(new Node<Integer, Integer>(9));
                  result.add(new Node<Integer, Integer>(10));
                  result.add(new Node<Integer, Integer>(8));
                  result.add(new Node<Integer, Integer>(11));
                  result.add(new Node<Integer, Integer>(12));
                  result.add(new Node<Integer, Integer>(3));
                  result.add(new Node<Integer, Integer>(13));
                  result.add(new Node<Integer, Integer>(5));
                  result.add(new Node<Integer, Integer>(6));
                  result.add(new Node<Integer, Integer>(4));
                  result.add(new Node<Integer, Integer>(14));
                  return result;
              }
              
              private ExecutorService newExecutor() {
                  return Executors.newFixedThreadPool(ThreadPoolUtil.ioIntesivePoolSize());
              }
              
              private static class SleepyTaskProvider implements TaskProvider<Integer, Integer> {
              
                  public Task<Integer, Integer> provideTask(final Integer id) {
              
                      return new Task<Integer, Integer>() {
              
                          private static final long serialVersionUID = 1L;
              
                          public Integer execute() {
                              if (id == 2) {
                                  throw new IllegalArgumentException("Invalid task");
                              }
                              return id;
                          }
                      };
                  }
              }
              

              这是建模图

              这意味着任务 #1 、12 和 11 将并行运行,一旦其中一个任务完成,它的依赖任务就会启动,例如,一旦任务#1 完成,它的依赖任务 #2 和 #3 就会启动

              【讨论】:

                【解决方案10】:

                从多线程中抽象出来

                class TaskA{
                    SimpleTask Y;
                    SimpleTask Z;
                
                    SimpleTask PJ;
                    SimpleTask RJ;
                
                    Run(){
                        // do the partial job
                        PJ.Run();
                        Y.Run();
                        // do the remaining job
                        RJ.Run();
                        Z.Run();
                        // return;
                    }
                } 
                
                class TaskB{
                    TaskA P;
                    SimpleTask J;
                
                    Run(){
                        // do the job
                        J.Run();
                        P.Run();
                        // return;
                    }
                }
                

                【讨论】:

                  猜你喜欢
                  • 1970-01-01
                  • 1970-01-01
                  • 2015-02-18
                  • 2012-10-07
                  • 1970-01-01
                  • 2015-09-06
                  • 2010-09-10
                  • 2018-03-28
                  • 2013-03-05
                  相关资源
                  最近更新 更多