您可以使用下面的ParallelLoop 方法。此方法启动一个异步工作流,其中三个任务彼此并行调用,但它们本身按顺序调用。因此,您无需在每个任务中添加同步,除非某些任务会产生对其他任务可见的全局副作用。
使用Task.Run 方法在ThreadPool 上调用任务。
/// <summary>
/// Invokes three actions repeatedly in parallel on the ThreadPool, with the
/// action2 depending on the action1, and the action3 depending on the action2.
/// Each action is invoked sequentially to itself.
/// </summary>
public static async Task ParallelLoop<TResult1, TResult2>(
Func<TResult1> action1,
Func<TResult1, TResult2> action2,
Action<TResult2> action3,
CancellationToken cancellationToken = default)
{
// Arguments validation omitted
var task1 = Task.FromResult<TResult1>(default);
var task2 = Task.FromResult<TResult2>(default);
var task3 = Task.CompletedTask;
try
{
int counter = 0;
while (true)
{
counter++;
var result1 = await task1.ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested();
task1 = Task.Run(action1); // Restart the task1
if (counter <= 1) continue; // In the first loop result1 is undefined
var result2 = await task2.ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested();
task2 = Task.Run(() => action2(result1)); // Restart the task2
if (counter <= 2) continue; // In the second loop result2 is undefined
await task3.ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested();
task3 = Task.Run(() => action3(result2)); // Restart the task3
}
}
finally
{
// Prevent fire-and-forget
Task allTasks = Task.WhenAll(task1, task2, task3);
try { await allTasks.ConfigureAwait(false); } catch { allTasks.Wait(); }
// Propagate all errors in an AggregateException
}
}
在实现中有一个明显的模式,这使得添加具有三个以上动作的重载变得微不足道。每个添加的操作都需要自己的泛型类型参数(TResult3、TResult4 等)。
使用示例:
var cts = new CancellationTokenSource();
Task loopTask = ParallelLoop(() =>
{
// First task
Thread.Sleep(1000); // Simulates synchronous work
return "OK"; // The result that is passed to the second task
}, result =>
{
// Second task
Thread.Sleep(1000); // Simulates synchronous work
return result + "!"; // The result that is passed to the third task
}, result =>
{
// Third task
Thread.Sleep(1000); // Simulates synchronous work
}, cts.Token);
如果任何任务失败,整个循环将停止(loopTask.Exception 包含错误)。由于任务相互依赖,不可能从单个失败的任务中恢复¹。您可以做的是通过 Polly Retry 策略执行整个循环,以确保在失败的情况下循环将被重生。如果您对Polly library 不熟悉,可以使用下面简单且无特色的RetryUntilCanceled 方法:
public static async Task RetryUntilCanceled(Func<Task> action,
CancellationToken cancellationToken)
{
while (true)
{
cancellationToken.ThrowIfCancellationRequested();
try { await action().ConfigureAwait(false); }
catch { if (cancellationToken.IsCancellationRequested) throw; }
}
}
用法:
Task loopTask = RetryUntilCanceled(() => ParallelLoop(() =>
{
//...
}, cts.Token), cts.Token);
在退出进程之前,建议您Cancel()CancellationTokenSource 和Wait()(或await)loopTask,以便循环正常终止。否则,某些任务可能会在工作过程中中止。
¹ 通过 Polly Retry 策略执行每个单独的任务实际上是可能的,并且可能更可取。并行循环将暂停,直到成功重试失败的任务。