【问题标题】:Speculative execution using the TPL使用 TPL 的推测执行
【发布时间】:2013-02-06 01:42:39
【问题描述】:

我有一个List<Task<bool>>,我想并行枚举它,找到第一个要完成的任务,结果为true,而不是等待或观察任何其他仍待处理的任务的异常。

var tasks = new List<Task<bool>>
{ 
    Task.Delay(2000).ContinueWith(x => false), 
    Task.Delay(0).ContinueWith(x => true), 
};

我曾尝试使用 PLINQ 执行以下操作:

var task = tasks.AsParallel().FirstOrDefault(t => t.Result);

并行执行,但一旦找到令人满意的结果就不会返回。因为访问 Result 属性是阻塞的。为了使用 PLINQ 来实现这一点,我必须写下这个令人敬畏的声明:

var cts = new CancellationTokenSource();
var task = tasks.AsParallel()
    .FirstOrDefault(t =>
    {
        try 
        { 
            t.Wait(cts.Token);
            if (t.Result)
            {
                cts.Cancel();
            }

            return t.Result;
        } 
        catch (OperationCanceledException) 
        { 
            return false;
        }
    } );

我编写了一个扩展方法,可以在任务完成时生成任务。

public static class Exts
{
    public static IEnumerable<Task<T>> InCompletionOrder<T>(this IEnumerable<Task<T>> source)
    {
        var tasks = source.ToList();
        while (tasks.Any())
        {
            var t = Task.WhenAny(tasks);
            yield return t.Result;
            tasks.Remove(t.Result);
        }
    }
}

// and run like so
var task = tasks.InCompletionOrder().FirstOrDefault(t => t.Result);

但感觉这很常见,有更好的方法。有什么建议吗?

【问题讨论】:

    标签: c# task-parallel-library plinq


    【解决方案1】:

    也许是这样的?

    var tcs = new TaskCompletionSource<Task<bool>>();
    
    foreach (var task in tasks)
    {
        task.ContinueWith((t, state) =>
        {
            if (t.Result)
            {
                ((TaskCompletionSource<Task<bool>>)state).TrySetResult(t);
            }
        },
            tcs,
            TaskContinuationOptions.OnlyOnRanToCompletion |
            TaskContinuationOptions.ExecuteSynchronously);
    }
    
    var firstTaskToComplete = tcs.Task;
    

    【讨论】:

    • 我认为您应该使用TrySetResult(),以避免未捕获的异常(尽管它们实际上不会影响.Net 4.5 上的程序)。并且可能在设置结果后取消所有未完成的延续。
    • 不知道OP的要求,所以不知道要不要取消任务。但是关于 TrySetResult 你有一点,谢谢!固定。
    • 我的意思不是取消集合中的Tasks,我的意思是继续。设置结果后,无需执行其余的延续(但也不会对性能造成很大影响)。
    • 我喜欢这里的概念,但是,我还需要知道是否所有任务都未成功完成。我通过Task.WaitAny(Task.WhenAll(tasks), tcs.Task) == 1; 找到了一种方法,感谢您的回答。
    【解决方案2】:

    也许你可以试试 Rx.Net 库。它实际上非常适合提供 Linq to Work。

    在引用 Microsoft Rx.Net 程序集后,在 LinqPad 中尝试这个 sn-p。

    using System
    using System.Linq
    using System.Reactive.Concurrency
    using System.Reactive.Linq
    using System.Reactive.Threading.Tasks
    using System.Threading.Tasks
    
    void Main()
    {
        var tasks = new List<Task<bool>>
        { 
            Task.Delay(2000).ContinueWith(x => false), 
            Task.Delay(0).ContinueWith(x => true), 
        };
    
        var observable = (from t in tasks.ToObservable()
                          //Convert task to an observable
                          let o = t.ToObservable()
                          //SelectMany
                          from x in o
                          select x);
    
    
        var foo = observable
                    .SubscribeOn(Scheduler.Default) //Run the tasks on the threadpool
                    .ToList()
                    .First();
    
        Console.WriteLine(foo);
    }
    

    【讨论】:

    • 这段代码实际上会执行这两个任务...如果您只想获得第一个返回的结果...远程.ToList()
    • 我过去也曾使用过它,但我并没有真正考虑将 rx 添加到我的项目中。我运行您的代码只是因为我很好奇,在删除 ToList 并将 First(已过时)更改为 FirstOrDefaultAsync 之后,我得到了我想要的正确行为。感谢您提供完整的复制/粘贴 sn-p 虽然 +1
    【解决方案3】:

    首先,我不明白你为什么要在这里使用 PLINQ。枚举Tasks 的列表应该不会花很长时间,所以我认为你不会从并行化中获得任何好处。

    现在,要获得已经用true 完成的第一个Task,您可以使用the (non-blocking) IsCompleted property

    var task = tasks.FirstOrDefault(t => t.IsCompleted && t.Result);
    

    如果您想获得Tasks 的集合,按完成排序,请查看 Stephen Toub 的文章Processing tasks as they complete。如果要首先列出返回 true 的那些,则需要修改该代码。如果不想修改,可以使用a version of this approach from Stephen Cleary's AsyncEx library


    此外,在您的问题的特定情况下,您可以通过将 .WithMergeOptions(ParallelMergeOptions.NotBuffered) 添加到 PLINQ 查询来“修复”您的代码。但是这样做仍然大部分时间都行不通,即使这样做也会浪费很多线程。这是因为 PLINQ 使用固定数量的线程和分区,并且使用 Result 会在大多数情况下阻塞这些线程。

    【讨论】:

    • 如果您只是枚举任务列表并检查 IsCompleted,那么如果任务尚未完成,FirstOrDefault 将返回 null,而不是等到第一个任务完成。
    • @dtb 是的,我假设至少有一个 Task 已经完成,但实际上没有办法知道这一点。我想它在某些情况下可能仍然有用。
    • @dtb 在这里是正确的,在我的情况下,我正在检查的所有任务都有时间完成。然而,Toub 的那篇文章是正确的。 +1 仅此而已。我在发布之前浏览了网页,但那个宝石让我为我的谷歌搜索技巧感到尴尬。干杯
    • 你说得对,并行处理任务没有意义
    猜你喜欢
    • 2014-07-19
    • 2017-12-29
    • 2011-01-13
    • 1970-01-01
    • 2013-02-16
    • 1970-01-01
    • 1970-01-01
    • 2018-09-27
    • 1970-01-01
    相关资源
    最近更新 更多