【问题标题】:C# Aggregate of TasksC# 任务聚合
【发布时间】:2018-12-05 16:54:50
【问题描述】:

假设我有许多任务,我想一个一个地执行它们,中间有延迟。我的想法是通过与ContinueWith 组合并在每对之间插入Task.Delay() 将它们与Aggregate 折叠成一个任务:

var tasks = new[] {1, 2, 3, 4, 5}.Select(async x => Console.WriteLine(x));
var superTask =
    tasks.Aggregate(Task.CompletedTask, async (task1, task2) =>
        await (await task1.ContinueWith(_ => Task.Delay(1000))).ContinueWith(_ => task2));

await superTask;

问题是为什么它不起作用?

【问题讨论】:

  • “我想一个一个地执行它们,中间有延迟”——听起来你根本不应该使用任务。或者,如果它们都需要从您的主线程异步运行,则将它们全部放入一个任务中。
  • 在同步方法中添加async 并没有做任何有用的事情,您可能会期待神奇的“延迟异步执行代码”......请确认您已将“任务”用作“Task”而不是“作品”?
  • 您可以使用 Microsoft 的响应式扩展来执行此操作:await Observable.Range(1, 5).Delay(n => Observable.Timer(TimeSpan.FromSeconds(n * 1.0))).ToArray()

标签: c# linq async-await fold


【解决方案1】:

有几个问题。首先,您不是在等待延迟,因此它甚至可能不存在。其次,当以这种方式调用时,Tasks 会立即自动启动——您需要改用构造方法(参见this question)。

这段代码会做你想做的事。

var tasks = new[] { 1, 2, 3, 4, 5 }
    .Select
    (
        x => new Task( () => Console.WriteLine(x) )
    );
var superTask =
    tasks.Aggregate
    (
        Task.CompletedTask,
        async (task1, task2) =>
        {
            await task1;
            await Task.Delay(1000);
            task2.Start();
        }
    );
await superTask;

说了这么多,我非常怀疑这是正确的方法。只需编写一个简单的循环来遍历数组并一次处理一个;甚至不需要异步。

【讨论】:

    【解决方案2】:

    你写道:

    我想一个一个地执行它们,中间有一个延迟。

    您的代码有几个问题。

    在您的示例中,您创建了一个可枚举但没有对其进行枚举。因此,您的任务尚未开始。一旦您开始枚举,它们就会启动。因此,只要您使用foreach,或ToList(),或低级枚举:GetEnumerator()MoveNext()

    var tasks = new[] {1, 2, 3, 4, 5}.Select(async x => Console.WriteLine(x))
         .ToList();
    

    每个任务一创建就开始运行,所以现在它们同时运行。

    您的聚合函数还逐个枚举每个元素,并执行您的ContinueWith。枚举已经启动了任务。

    看看source code of Enumerable.Aggregate

    public static TSource Aggregate<TSource>(
        this IEnumerable<TSource> source,
        Func<TSource, TSource, TSource> func)
        {
            ... // some test for null
            using (IEnumerator<TSource> e = source.GetEnumerator())
            {
                if (!e.MoveNext()) throw Error.NoElements(); // there must be at least one element
    
                TSource result = e.Current;
                while (e.MoveNext()) result = func(result, e.Current);
                return result;
            }
        }
    

    那么它的作用:

    • 第一个MoveNext() 执行一次Select 语句。第一个任务已创建并计划尽快运行,可能立即运行。
    • e.Current用来保存这个正在运行的任务变量Result
    • 第二个MoveNext() 再次执行Select 语句。将创建第二个任务并安排其尽快运行。
    • e.Current 包含您的第二个任务。用于执行func(result, e.Current)

    此函数将执行以下操作:

    <first task>.ContinueWith(delay task).ContinueWith(<2ndTask>).
    

    结果放入变量result中,MoveNext完成等

    记住:第一个任务和第二个任务已经在运行了!

    所以实际上你的聚合是这样的:

    Task result = empty task
    for every task
    {
         Create it and schedule it to start running as soon as possible
         Result = Result.ContinueWith(delayTask).ContinueWith(already running created task)
    }
    

    现在如果你接受一个正在运行的任务,而ContinueWith另一个已经在运行的任务会发生什么?

    如果你看一下Task.ContinueWith,它会说ContinueWithcontinuationAction参数意味着:

    任务完成时运行的操作。

    如果您运行一个已经在运行的任务会发生什么?我不知道,一些测试代码会给你答案。我最好的猜测是它不会做任何事情。它当然不会停止已经在运行的任务。

    这不是你想要的!

    在我看来,这不是你想要的。您想指定一些操作以按顺序执行,中间有延迟时间。像这样的:

    Create a Task for Action 1, and wait until finished
    Delay
    Create a Task for Action 2, and wait until finished
    Delay
    ...
    

    所以你想要的是一个输入一系列动作和延迟时间的函数。每个 Action 都会启动,等待直到完成,然后等待 DelayTime。

    您可以使用聚合来执行此操作,其中输入是一系列操作。结果将是可怕的。难以阅读、测试和维护。一个过程会更容易理解。

    为了使它与 LINQ 兼容,我将创建一个扩展方法。见extension methods demystified

    static Task PerformWithDelay(this IEnumerable<Action> actionsToPerform, TimeSpan delayTime)
    {
        var actionEnumerator = actionsToPerform.GetEnumerator();
    
        // do nothing if no actions to perform
        if (!actionEnumerator.MoveNext())
            return Task.CompletedTask;
        else
        {    // Current points to the first action
             Task result = actionEnumerator.Current;
    
             // enumerate over all other actions:
             while (actionEnumerator.MoveNext())
             {
                 // there is a next action. ContinueWith delay and next task
                 result.ContinueWith(Task.Delay(delayTime)
                       .ContinueWith(Task.Run( () => actionEnumerator.Current);
             } 
         }
    }
    

    好吧,如果你真的非常想用聚合来打动你的同事:

    Task resultTask = actionsToPerform.Aggregate<Action, Task> (
        action =>
        {
             previousResultTask.ContinueWith(Task.Delay(delayTime))
                               .ContinueWith(Task.Run( () => action));
         });
    

    因此,对于您的操作序列中的每个操作,我们使用您序列的第一项为聚合播种:ContinueWith 一个新的 Task.Delay 和 ContinueWith 一个执行该操作的新 Task。

    问题:如果您的输入为空,则会出现异常。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2012-12-28
      • 2017-07-23
      • 1970-01-01
      • 2013-07-22
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多