【问题标题】:How to synchronize the recurrent execution of three tasks that depend on each other?如何同步三个相互依赖的任务的循环执行?
【发布时间】:2021-11-12 01:21:44
【问题描述】:

我想请教 C# 方面的专家开发人员。我的程序需要执行三项经常性任务。任务 2 依赖于任务 1,任务 3 依赖于任务 2,但任务 1 无需等待其他两个任务完成即可重新启动(程序一直在运行)。由于每个任务都需要一些时间,我想在一个线程或 C#Task 中运行每个任务。任务 1 完成后,任务 2 启动,任务 1 再次启动……等等。

我不确定实现这一点的最佳方法是什么。我希望有人可以指导我。

【问题讨论】:

  • 你可以结帐等待异步
  • 在 SO 上有很多与 c# 多线程概念相关的好问答,其中一个与您的 Q 相关的问答有一个 here

标签: c# multithreading parallel-processing task-parallel-library threadpool


【解决方案1】:

实现此目的的一种方法是使用称为Task Parallel Library 的东西。这提供了一组类,允许您将任务安排到“块”中。您创建一个按顺序执行 A、B 和 C 的方法,然后 TPL 将负责同时运行该方法的多个调用。这是一个小例子:

async Task Main()
{
    var actionBlock = new ActionBlock<int>(DoTasksAsync, new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = 2 // This is the number of simultaneous executions of DoTasksAsync that will be run
    };
    
    await actionBlock.SendAsync(1);
    await actionBlock.SendAsync(2);
    
    actionBlock.Complete();
    await actionBlock.Completion;
}

async Task DoTasksAsync(int input)
{
    await DoTaskAAsync();
    await DoTaskBAsync();
    await DoTaskCAsync();
}

【讨论】:

    【解决方案2】:

    我可能会使用某种队列模式。

    我不确定任务 1 是否是线程安全的要求,所以我会保持简单:

    • 任务 1 始终在执行。完成后,它会在某个队列上发布一条消息并重新开始。
    • 任务 2 正在侦听队列。只要有消息可用,它就会开始处理它。
    • 只要任务 2 完成工作,它就会调用任务 3,以便它可以完成它的工作。

    作为提到的 cmets 之一,您应该能够在代码中成功使用 async/await。尤其是在任务 2 和 3 之间。请注意,任务 1 可以与任务 2 和 3 并行运行,因为它不依赖于任何其他任务。

    【讨论】:

      【解决方案3】:

      您可以使用下面的ParallelLoop 方法。此方法启动一个异步工作流,其中三个任务彼此并行调用,但它们本身按顺序调用。因此,您无需在每个任务中添加同步,除非某些任务会产生对其他任务可见的全局副作用。

      使用Task.Run 方法在ThreadPool 上调用任务。

      /// <summary>
      /// Invokes three actions repeatedly in parallel on the ThreadPool, with the
      /// action2 depending on the action1, and the action3 depending on the action2.
      /// Each action is invoked sequentially to itself.
      /// </summary>
      public static async Task ParallelLoop<TResult1, TResult2>(
          Func<TResult1> action1,
          Func<TResult1, TResult2> action2,
          Action<TResult2> action3,
          CancellationToken cancellationToken = default)
      {
          // Arguments validation omitted
          var task1 = Task.FromResult<TResult1>(default);
          var task2 = Task.FromResult<TResult2>(default);
          var task3 = Task.CompletedTask;
          try
          {
              int counter = 0;
              while (true)
              {
                  counter++;
      
                  var result1 = await task1.ConfigureAwait(false);
                  cancellationToken.ThrowIfCancellationRequested();
                  task1 = Task.Run(action1); // Restart the task1
                  if (counter <= 1) continue; // In the first loop result1 is undefined
      
                  var result2 = await task2.ConfigureAwait(false);
                  cancellationToken.ThrowIfCancellationRequested();
                  task2 = Task.Run(() => action2(result1)); // Restart the task2
                  if (counter <= 2) continue; // In the second loop result2 is undefined
      
                  await task3.ConfigureAwait(false);
                  cancellationToken.ThrowIfCancellationRequested();
                  task3 = Task.Run(() => action3(result2)); // Restart the task3
              }
          }
          finally
          {
              // Prevent fire-and-forget
              Task allTasks = Task.WhenAll(task1, task2, task3);
              try { await allTasks.ConfigureAwait(false); } catch { allTasks.Wait(); }
              // Propagate all errors in an AggregateException
          }
      }
      

      在实现中有一个明显的模式,这使得添加具有三个以上动作的重载变得微不足道。每个添加的操作都需要自己的泛型类型参数(TResult3TResult4 等)。

      使用示例:

      var cts = new CancellationTokenSource();
      Task loopTask = ParallelLoop(() =>
      {
          // First task
          Thread.Sleep(1000); // Simulates synchronous work
          return "OK"; // The result that is passed to the second task
      }, result =>
      {
          // Second task
          Thread.Sleep(1000); // Simulates synchronous work
          return result + "!"; // The result that is passed to the third task
      }, result =>
      {
          // Third task
          Thread.Sleep(1000); // Simulates synchronous work
      }, cts.Token);
      

      如果任何任务失败,整个循环将停止(loopTask.Exception 包含错误)。由于任务相互依赖,不可能从单个失败的任务中恢复¹。您可以做的是通过 Polly Retry 策略执行整个循环,以确保在失败的情况下循环将被重生。如果您对Polly library 不熟悉,可以使用下面简单且无特色的RetryUntilCanceled 方法:

      public static async Task RetryUntilCanceled(Func<Task> action,
          CancellationToken cancellationToken)
      {
          while (true)
          {
              cancellationToken.ThrowIfCancellationRequested();
              try { await action().ConfigureAwait(false); }
              catch { if (cancellationToken.IsCancellationRequested) throw; }
          }
      }
      

      用法:

      Task loopTask = RetryUntilCanceled(() => ParallelLoop(() =>
      {
         //...
      }, cts.Token), cts.Token);
      

      在退出进程之前,建议您Cancel()CancellationTokenSourceWait()(或awaitloopTask,以便循环正常终止。否则,某些任务可能会在工作过程中中止。

      ¹ 通过 Polly Retry 策略执行每个单独的任务实际上是可能的,并且可能更可取。并行循环将暂停,直到成功重试失败的任务。

      【讨论】:

      • 注意:取消cancellationToken 会取消并行循环,而所有操作都没有执行相同的次数。 action1action2 多执行一次,action2action3 多执行一次。
      • 我已经在this GitHub 存储库上上传了上述想法的完善实现。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2011-09-18
      • 2013-10-21
      • 1970-01-01
      • 1970-01-01
      • 2012-11-13
      • 1970-01-01
      • 2023-03-16
      相关资源
      最近更新 更多