【问题标题】:How can I report progress from a PLINQ query?如何报告 PLINQ 查询的进度?
【发布时间】:2019-04-09 02:02:16
【问题描述】:

我想报告长期运行的 PLINQ 查询的进度。

我真的找不到任何允许我这样做的本机 LINQ 方法(就像 implemented 用于 cancellation 一样)。

我读过this article,它显示了一个用于常规序列化查询的简洁扩展函数。

我一直在使用以下代码测试行为。

var progress = new BehaviorSubject<int>(0);
DateTime start = DateTime.Now;
progress.Subscribe(x => { Console.WriteLine(x); });
Enumerable.Range(1,1000000)
    //.WithProgressReporting(i => progress.OnNext(i)) //Beginning Progress
    .AsParallel()
    .AsOrdered()
    //.WithProgressReporting(i => progress.OnNext(i)) //Middle Progress reporting
    .Select(v => { Thread.Sleep(1); return v * v; })
    //.WithProgressReporting(i => progress.OnNext(i)) //End Progress Reporting
    .ToList();
Console.WriteLine("Completed in: " + (DateTime.Now - start).TotalSeconds + " seconds");

编辑:
使用IEnumerable&lt;T&gt; 扩展从中间报告进度会删除并行性。

结束报告在计算并行计算时不报告任何进度,然后在最后快速报告所有进度。我认为这是将并行计算的结果编译成列表的过程。

我最初认为 开始 的进度报告导致 LINQ 无法并行运行。在对此进行睡眠并从Peter Duniho 读取cmets 之后,我发现它实际上是并行工作的,但是我收到了很多进度报告,处理这么多导致我的测试/应用程序显着变慢。

是否有一种并行/线程安全的方法可以从 PLINQ 以增量方式报告进度,从而允许用户知道正在取得的进度,而不会对方法运行时产生重大影响?

【问题讨论】:

  • 你的问题不是很清楚。为什么非并行 WithProgressReporting() 方法不能充分满足您的目的?通常,无论如何,您都将从IEnumerable&lt;T&gt; 开始......只需将您的源代码IEnumerable&lt;T&gt;WithProgressReporting() 的调用一起调用,然后再调用AsParallel(),就像您在测试中所做的那样。最终吞吐量将是相同的,无论您报告源还是结果的进度。您需要更具体:发布minimal reproducible example准确地解释您期望的输出,以及您得到的结果。

标签: c# .net linq parallel-processing


【解决方案1】:

这个答案可能不那么优雅,但它完成了工作。

使用 PLINQ 时,有多个线程处理您的集合,因此使用这些线程报告进度会导致多个(且无序的)进度报告,例如 0% 1% 5% 4 % 3% 等等...

相反,我们可以使用这些多个线程来更新一个存储进度的共享变量。在我的示例中,它是一个局部变量completed。然后,我们使用 Task.Run() 生成另一个线程,以 0.5 秒的间隔报告该进度变量。

扩展类:

static class Extensions
    public static ParallelQuery<T> WithProgressReporting<T>(this ParallelQuery<T> sequence, Action increment)
    {
        return sequence.Select(x =>
        {
            increment?.Invoke();
            return x;
        });
    }
}

代码:

static void Main(string[] args)
    {
        long completed = 0;
        Task.Run(() =>
        {
            while (completed < 100000)
            {
                Console.WriteLine((completed * 100 / 100000) + "%");
                Thread.Sleep(500);
            }
        });
        DateTime start = DateTime.Now;
        var output = Enumerable.Range(1, 100000)
            .AsParallel()
            .WithProgressReporting(()=>Interlocked.Increment(ref completed))
            .Select(v => { Thread.Sleep(1); return v * v; })
            .ToList();
        Console.WriteLine("Completed in: " + (DateTime.Now - start).TotalSeconds + " seconds");
        Console.ReadKey();
    }

【讨论】:

    【解决方案2】:

    此扩展可以位于 LINQ 查询的开头或结尾。如果定位在开始将立即开始报告进度,但会在工作完成之前错误地报告 100%。如果定位在末尾将准确报告查询的结束,但会延迟报告进度,直到源的第一项完成。

    public static ParallelQuery<TSource> WithProgressReporting<TSource>(
        this ParallelQuery<TSource> source,
        long itemsCount, IProgress<double> progress)
    {
        int countShared = 0;
        return source.Select(item =>
        {
            int countLocal = Interlocked.Increment(ref countShared);
            progress.Report(countLocal / (double)itemsCount);
            return item;
        });
    }
    

    使用示例:

    // The Progress captures the current SynchronizationContext at construction.
    var progress = new Progress<double>(); 
    progress.ProgressChanged += (object sender, double e) =>
    {
        Console.WriteLine($"Progress: {e:0%}");
    };
    
    var numbers = Enumerable.Range(1, 10);
    
    var sum = numbers
        .AsParallel()
        .WithDegreeOfParallelism(3)
        .WithMergeOptions(ParallelMergeOptions.NotBuffered)
        .Select(n => { Thread.Sleep(500); return n; }) // Simulate some heavy computation
        .WithProgressReporting(10, progress) // <--- the extension method
        .Sum();
    
    Console.WriteLine($"Sum: {sum}");
    

    输出:

    有些跳跃是因为有时工作线程会互相抢占。

    System.Progress&lt;T&gt; 类具有在捕获的上下文(通常是 UI 线程)调用 ProgressChanged 事件的好特性,因此可以安全地更新 UI 控件。另一方面,在控制台应用程序中,在 ThreadPool 上调用事件,这可能会被并行查询完全利用,因此事件将在一定的延迟下触发(ThreadPool 每 500 毫秒产生新线程)。这就是我在示例中将并行度限制为 3 的原因,以保留一个空闲线程来报告进度(我有一台四核机器)。

    【讨论】:

      猜你喜欢
      • 2019-12-17
      • 1970-01-01
      • 1970-01-01
      • 2011-06-03
      • 1970-01-01
      • 1970-01-01
      • 2020-07-04
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多