【问题标题】:How to properly parallelize worker tasks?如何正确并行化工作任务?
【发布时间】:2018-02-06 06:54:53
【问题描述】:

考虑以下代码 sn-p 并注意将 numberTasksToSpinOff 设置为 1 和 3,4 或更多(取决于您机器上的线程资源)之间的总运行时间差异。我注意到在拆分更多任务时运行时间要长得多。

我故意将数据集合传递到每个工作人员任务同时读取的每个工作人员实例中。我认为只要这些操作只是读取或枚举,任务就可以访问共享数据结构而不会阻塞。

我的目标是分拆多个任务,这些任务通过读取操作在相同的共享数据结构上进行迭代,并在大约同一时间完全完成,而不管分拆的任务数量是多少。

编辑:请查看我实现Parallel.Foreach() 的第二个代码 sn-p 并创建每个工作人员自己的数据集,因此不同任务/线程不会访问相同的数据结构。然而,我仍然看到不可接受的开销。

class Program
{
    static void Main(string[] args)
    {
        Console.WriteLine($"Entry Main Function Thread Id: {Thread.CurrentThread.ManagedThreadId}");

        //run
        var task = Task.Run(async () =>
        {
            Console.WriteLine($"Entry RunMe Task Thread Id: {Thread.CurrentThread.ManagedThreadId}");
            await RunMe();

            Console.WriteLine($"Exit RunMe Task Thread Id: {Thread.CurrentThread.ManagedThreadId}");
        });

        task.Wait();

        Console.WriteLine($"Exit Main Function Thread Id: {Thread.CurrentThread.ManagedThreadId}");

        Console.WriteLine("Press key to quit");
        Console.ReadLine();
    }

    private static async Task RunMe()
    {
        var watch = new Stopwatch();
        var numberTasksToSpinOff = 6;
        var numberItems = 20000;
        var random = new Random((int)DateTime.Now.Ticks);
        var dataPoints = Enumerable.Range(1, numberItems).Select(x => random.NextDouble()).ToList();
        var tasks = new List<Task>();
        var workers = new List<Worker>();

        //structure workers
        for (int i = 1; i <= numberTasksToSpinOff; i++)
        {
            workers.Add(new Worker(i, dataPoints));
        }

        //start timer
        watch.Restart();

        //spin off tasks
        foreach (var worker in workers)
        {
            tasks.Add(Task.Run(() =>
            {
                Console.WriteLine($"Entry WorkerId: {worker.WorkerId} -> New Tasks spun off with in Thread Id: {Thread.CurrentThread.ManagedThreadId}");
                worker.DoSomeWork();
                Console.WriteLine($"Exit WorkerId: {worker.WorkerId} -> New Tasks spun off with in Thread Id: {Thread.CurrentThread.ManagedThreadId}");
            }));

        }

        //completion tasks
        await Task.WhenAll(tasks);

        //stop timer
        watch.Stop();

        Console.WriteLine($"Time it took to complete in Milliseconds: {watch.ElapsedMilliseconds}");
    }
}

public class Worker
{
    public int WorkerId { get; set; }
    private List<double> _data;

    public Worker(int workerId, List<double> data)
    {
        WorkerId = workerId;
        _data = data;
    }

    public void DoSomeWork()
    {
        var indexPos = 0;

        foreach (var dp in _data)
        {
            var subSet = _data.Skip(indexPos).Take(_data.Count - indexPos).ToList();
            indexPos++;
        }
    }
}

第二个代码片段:

class Program
{
    static void Main(string[] args)
    {
        var watch = new Stopwatch();
        var numberTasksToSpinOff = 1;
        var numberItems = 20000;
        //var random = new Random((int)DateTime.Now.Ticks);
        //var dataPoints = Enumerable.Range(1, numberItems).Select(x => random.NextDouble()).ToList();
        var workers = new List<Worker>();

        //structure workers
        for (int i = 1; i <= numberTasksToSpinOff; i++)
        {
            workers.Add(new Worker(i));
        }

        //start timer
        watch.Restart();

        //parellel work

        if (workers.Any())
        {
            var processorCount = Environment.ProcessorCount;
            var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = processorCount };
            Parallel.ForEach(workers, parallelOptions, DoSomeWork);
        }

        //stop timer
        watch.Stop();
        Console.WriteLine($"Time it took to complete in Milliseconds: {watch.ElapsedMilliseconds}");

        Console.WriteLine("Press key to quit");
        Console.ReadLine();
    }

    private static void DoSomeWork(Worker worker)
    {
        Console.WriteLine($"WorkerId: {worker.WorkerId} -> New Tasks spun off with in Thread Id: {Thread.CurrentThread.ManagedThreadId}");

        var indexPos = 0;

        foreach (var dp in worker.Data)
        {
            var subSet = worker.Data.Skip(indexPos).Take(worker.Data.Count - indexPos).ToList();
            indexPos++;
        }
    }
}

public class Worker
{
    public int WorkerId { get; set; }
    public List<double> Data { get; set; }

    public Worker(int workerId)
    {
        WorkerId = workerId;

        var numberItems = 20000;
        var random = new Random((int)DateTime.Now.Ticks);
        Data = Enumerable.Range(1, numberItems).Select(x => random.NextDouble()).ToList();

    }
}

【问题讨论】:

  • 你有没有做过任何事情来调试这个?您的问题中没有证据表明您已经尝试解释您的观察结果。使用分析器,找出所有时间都花在了哪里。我敢打赌你会发现它是垃圾收集器,因为你的任务做的主要事情是创建 lot 的垃圾。
  • @PeterDuniho,是的,我已经调试了代码,我什至在代码中打印了线程 ID。你是不正确的,即使我只是迭代数据结构而不创建任何额外的数据,我看到总执行时间同样爆炸。在这种情况下,我发现您的“近距离投票”在没有首先澄清的情况下过分了。如果我已经弄清楚了,我就不会发布问题了。我认为这正是这个网站的用途。不是每个人都可能在此时拥有与您相同的知识,因此寻求建议。
  • 这似乎是一个合理的问题,对我来说有一些合理的辩论......不知道为什么它被关闭,特别是如果可能找到解决方案?

标签: c# multithreading task task-parallel-library


【解决方案1】:

注意:以下答案基于测试和观察,而非确定性知识。

分拆的任务越多,产生的开销就越多,因此总执行时间也会增加。 但是如果您从另一个角度考虑它,您会发现实际处理的“数据点”会增加您启动的任务越多(直到达到可用硬件线程的限制):

在我的机器(4C/8T)上生成以下值,每个列表有 10000 个点:

  • 1 个工人 -> 1891 毫秒 -> 5288 p/s
  • 2 个工人 -> 1921 毫秒 -> 10411 p/s
  • 4 个工人 -> 2670 毫秒 -> 14981 p/s
  • 8 个工人 -> 4871 毫秒 -> 16423 p/s
  • 12 个工人 -> 7449 毫秒 -> 16109 p/s

在那里你看到,直到我达到我的“核心限制”,处理的数据显着增加,然后直到我达到我的“线程限制”它仍然明显增加,但之后它再次减少,因为增加的开销并且没有更多可用的硬件资源。

【讨论】:

  • 我同意您上面的所有意见,但只要硬件线程的数量不限制并行运行的工人数量,您的上述推理就不成立。我在具有 24 个内核和 48 个超线程的双 Xeon 机器上运行。在这种情况下,硬件规格绝对不会限制具有 10 个工作人员的作业,但总执行时间最终仍然是具有 1 或 2 个工作人员的作业的倍数。分拆更多的任务/线程会花费一点,但不会太多。似乎还有其他事情发生。
  • @MattWolf 我确实通过我的个人资料运行了代码,所有增加的时间都来自ToList 调用。并行线程越多,速度就越慢...
  • ...因为它分配了空间,然后像@Peter Duniho暗示的那样被垃圾收集?我现在正在分析,使用 GC.TryStartNoGCRegion GC.EndNoGCRegion
  • 我仍然坚持这一点,阻止垃圾收集器在关键区域工作根本没有任何好处。但显然 CLR 的其他一些资源管理仍在干扰,可能是为创建的对象分配内存?
  • @MattWolf 您的生产代码是否也使用ToList?也许您可以避免使用它来加快速度?
【解决方案2】:

您看过并行任务吗?然后你可以做这样的事情。

例如:

if (workers.Any())
{
    var parallelOptions = new ParallelOptions {MaxDegreeOfParallelism = Environment.ProcessorCount};
    Parallel.ForEach(workers, parallelOptions, DoSomeWork);
}

private static void DoSomeWork(Worker worker)
{
}

【讨论】:

  • 警告.. 理想情况下,如果工作负载不是 IO 绑定操作,即使用 IO 完成端口的事情。
  • 据我所知,当每个操作的工作量很大时,这很有用,对吧?
  • 我专门将它用于长时间运行的文件 IO 任务并且效果很好。
  • @MarkRedman,我会检查一下,但是是什么导致我的原始代码中的额外工作量?我是否错误地使用了异步/等待或任务?我之所以问,是因为稍后我希望能够在运行时动态添加新的工作人员/任务,而无需事先知道在任何给定时间点我手头有多少工作人员/任务。
  • 我理解它的方式,使用 async/away 就像使用 WhenAll 一样,是利用同一线程上的资源,而 Parallel 将在单个线程中完成工作。
猜你喜欢
  • 1970-01-01
  • 2012-06-03
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-03-29
  • 1970-01-01
相关资源
最近更新 更多