【问题标题】:How to process items in collection parallely and serially both如何并行和串行处理集合中的项目
【发布时间】:2012-10-02 14:39:48
【问题描述】:

我有一个集合,其中包含要处理的元素,最多只能同时处理四个元素。在运行时,所有进程一起启动并且都进入等待状态。一次只处理四个元素。

问题是处理元素是随机选择的,因为所有线程都在等待资源释放。表示第一个元素可以是集合中的最后一个。

但是,我需要按元素在集合中的顺序进行处理。

请告诉我如何才能做到这一点?

我正在使用 TPL 和 C# 4.0

【问题讨论】:

  • 是需要按顺序处理,还是只需要处理结果按顺序?如果您需要处理它们以便在计算中产生副作用,并且在进行并行处理时这很可怕。
  • 我需要按顺序处理它们。所有进程都是独立的,因此不应该有任何副作用。
  • 你知道ParallelEnumerable.AsOrdered方法吗?
  • 我研究过这个。但这里的问题是我只是迭代集合并开始处理每个元素。每个元素都负责自己的过程。

标签: c# .net multithreading task-parallel-library


【解决方案1】:

对于并行性,总是存在定义“按顺序”的含义的问题。假设您有 100 个项目的集合。 “一次按 4 个”处理它们(按照您的要求)可能意味着:

  1. 松散排序:使用4个线程,按照原始集合的顺序发出任务。

    在这种情况下,您可以使用:

    ParallelOptions po = new ParallelOptions() { MaxDegreeOfParallelism = 4 };
    Parallel.ForEach(list.AsParallel().AsOrdered(), po,
             (item) =>
             {
                 // code
             });
    

    如果任务不平衡,这将很快失去原来的顺序,因为一些线程可能会在繁重的任务上落后,但任务会按顺序分配。

  2. 严格排序:按 4 个一组的顺序处理它们,如下所示:

                   0 1 2 3                
                   4 tasks
         _____________________________
                    barrier
    
                   4 5 6 7                
                   4 tasks
         _____________________________
                    barrier
    
                     etc.
    

    在这种情况下,您可以使用屏障:

    Barrier b = new Barrier(4);
    ParallelOptions po = new ParallelOptions() { MaxDegreeOfParallelism = 4 };
    Parallel.ForEach(list.AsParallel().AsOrdered(), po,
        (item) =>
        {
            // code
            b.SignalAndWait();
        });
    

    虽然你必须确保任务数是 4 的倍数,否则在最后的迭代中不会发出屏障信号。

  3. 在单个任务中处理 4 个项目:您可以创建一个封装原始列表的 4 个项目的任务对象,然后像第一种情况一样做一个简单的Parallel.ForEach(即每个线程将依次处理 4 个项目作为单个任务的一部分)。这将按顺序以 4 个一组的形式发出任务,但如果任务花费的时间过长,可能会再次导致某些线程滞后。

【讨论】:

  • :- 很好的解释和想法。谢谢。我们还需要处理如果一个进程“说 1”完成了,那么我们应该立即开始第五个。我们等不及要完成所有四个过程。让我试试你建议的方法。
  • AFAIK Parallel.ForEach() 看不懂ParallelQuery,所以Parallel.ForEach(list.AsParallel().AsOrdered(), …) 应该和Parallel.ForEach(list, …) 完全一样。
  • 另外,我认为方法#2 不是一个好主意,因为Parallel.ForEach()保证它将使用MaxDegreeOfParallelism 线程。这就是为什么它是MaxDegreeOfParallelism 而不仅仅是DegreeOfParallelism
  • @svick:我已经在我的系统上测试了代码,AsOrdered 按预期工作。我已经进行了几次测试,所以我认为它的顺序不是偶然的。
  • @svick:关于并行度,我同意你的说法,但我认为如果他说 8 个内核,他想将并发任务的数量一次限制为 4 个。当然,你不能在双核上同时运行 4 个线程。
【解决方案2】:

我不清楚在“随机选择元素”的情况下您到底在做什么。但是如果你使用Paralle.ForEach(),那么它会尝试提高效率,因此它会以某种方式对输入序列进行分区。如果输入序列是IList<T>,它将使用范围分区,否则,它将使用块分区(参见Chunk partitioning vs range partitioning in PLINQ)。

如果您想按顺序处理项目,您可以使用 custom partitioner 配置 Parallel.ForEach(),这会将集合划分为大小为 1 的块。

但是由于您在这里并不真正需要Parallel.ForEach(),因此可能更简单的解决方案是创建 4 个任务来逐个处理项目。对于同步,您可以使用BlockingCollection。比如:

public static class ParallelOrdered
{
    public static void ForEach<T>(IEnumerable<T> collection, Action<T> action, int degreeOfParallelism)
    {
        var blockingCollection = new BlockingCollection<T>();
        foreach (var item in collection)
            blockingCollection.Add(item);
        blockingCollection.CompleteAdding();

        var tasks = new Task[degreeOfParallelism];
        for (int i = 0; i < degreeOfParallelism; i++)
        {
            tasks[i] = Task.Factory.StartNew(
                () =>
                {
                    foreach (var item in blockingCollection.GetConsumingEnumerable())
                        action(item);
                });
        }
        Task.WaitAll(tasks);
    }
}

【讨论】:

    【解决方案3】:

    这就是我完成这项任务的方式

    public delegate void ProcessFinished(IParallelProcess process);
    public interface IParallelProcess
    {
        void Start();
        event ProcessFinished ProcessFinished;
    }
    
    public class ParallelProcessBasket : ConcurrentQueue<IParallelProcess>
    {
        public void Put(IParallelProcess process)
        {
            base.Enqueue(process);
        }
        public IParallelProcess Get()
        {
            IParallelProcess process = null;
            base.TryDequeue(out process);
            return process;
        }
    }
    public class ParallelProcessor<T> where T : class
    {
        private ParallelProcessBasket basket;
        private readonly int MAX_DEGREE_OF_PARALLELISM;
        private Action<T> action;
        public ParallelProcessor(int degreeOfParallelism, IEnumerable<IParallelProcess> processes, Action<T> action)
        {
            basket = new ParallelProcessBasket();
            this.action = action;
            processes.ToList().ForEach(
                (p) =>
                {
                    basket.Enqueue(p);
                    p.ProcessFinished += new ProcessFinished(p_ProcessFinished);
                });
            MAX_DEGREE_OF_PARALLELISM = degreeOfParallelism;
        }
    
        private void p_ProcessFinished(IParallelProcess process)
        {
            if (!basket.IsEmpty)
            {
                T element = basket.Get() as T;
                if (element != null)
                {
                    Task.Factory.StartNew(() => action(element));
                }
            }
        }
    
    
        public void StartProcessing()
        {
            // take first level of iteration
            for (int cnt = 0; cnt < MAX_DEGREE_OF_PARALLELISM; cnt++)
            {
                if (!basket.IsEmpty)
                {
                    T element = basket.Get() as T;
                    if (element != null)
                    {
                        Task.Factory.StartNew(() => action(element));
                    }
                }
            }
        }
    }
    static void Main(string[] args)    
    {
         ParallelProcessor<ParallelTask> pr = new ParallelProcessor<ParallelTask>(Environment.ProcessorCount, collection, (e) => e.Method1());
                pr.StartProcessing();
    }
    

    谢谢..

    【讨论】:

      猜你喜欢
      • 2012-11-09
      • 2020-12-22
      • 1970-01-01
      • 1970-01-01
      • 2017-01-27
      • 1970-01-01
      • 2011-11-25
      • 2015-09-22
      • 2018-04-19
      相关资源
      最近更新 更多