【问题标题】:TPL Producer Consumer in a FIFO order C#FIFO 顺序的 TPL 生产者消费者 C#
【发布时间】:2018-07-12 07:52:04
【问题描述】:

我仅限于 .NET 3.5,并且我正在使用 TPL。 场景是生产者-消费者,但不存在阻塞问题。 PLINQ 不能在这种情况下使用(由于限制),我们想要实现的是生产许多项目的最快方式(其中每个生产都是长期运行的,并且项目数量超过 100,000)但每个项目必须是以先进先出的顺序消费(这意味着,我要求生产的第一个项目必须先消费,即使它是在其他项目之后创建的)并且也尽可能快地消费。

对于这个问题,我尝试使用任务列表,等待列表中的第一项完成(taskList.First().IsCompleted()),然后对其使用消耗函数,但出于某种原因,我似乎内存不足(由于等待启动的任务可能导致任务列表中的项目太多?)有没有更好的方法来做到这一点? (我正在努力以最快的速度)

非常感谢!

【问题讨论】:

  • 你要么希望工作并行或按顺序完成,你不能同时拥有两者,甚至两者都想要没有意义。仅仅为了启动并行工作而创建任务也绝对没有任何好处
  • 在这个例子中,并行运行的东西很短,运行时间也差不多,但这不是我问这个问题的原因。 @CamiloTerevinto。关于该任务,我实际案例中的并行工作是在不同的线程中调用的,所以这就是我这样做的原因(如果不这样做,我也无法同时运行消费者)
  • 为什么要使用阻塞集合?为什么不只是一个普通数组,在每个任务完成时将结果放入数组中。完成后,数组被填充。
  • @commenters:阻塞集合的要点是当集合已满时阻塞一个Add。这在生产者/消费者模式中很常见,在这种模式中,处理TryAdd 或类似的东西需要更多的工作。而且由于调用阻塞,它必须并行完成;这就是重点。
  • 感谢@JohnWu 的解释。 MineR,原因是我想阻止添加。我正在处理大量的调用并且我不想耗尽内存,这就是为什么将结果存储在数组中对我来说是有问题的(在继续之前存储的信息太多)。如果是数组,我考虑在使用结果后将任务设置为 null,但阻塞收集更适合这个问题。

标签: c# multithreading task-parallel-library producer-consumer


【解决方案1】:

我认为这是一个有趣的问题,所以我花了一些时间。

我理解的场景是这样的:

  1. 您的 BlockingCollection 已满
  2. 启动了多个线程,每个线程都试图添加到 BlockingCollection。这些调用都会阻塞;这就是为什么它们需要并行发生。
  3. 当空间变得可用时,添加调用将变得畅通无阻。
  4. Add 的调用需要按照收到的顺序完成。

首先,我们来谈谈代码结构。我建议不要使用 BlockingCollection 并围绕它编写程序代码,而是扩展 BlockingCollection 并将 Add 方法替换为您需要的功能。它可能看起来像这样:

public class QueuedBlockingCollection<T> : BlockingCollection<T>
{
    private FifoMonitor monitor = new FifoMonitor();

    public QueuedBlockingCollection(int max) : base (max) {}

    public void Enqueue(T item)
    {
        using (monitor.Lock())
        {
            base.Add(item);
        }
    }
}

这里的诀窍是使用FifoMonitor 类,它可以为您提供lock 的功能,但会强制执行顺序。不幸的是,CLR 中不存在这样的类。但是我们可以write one:

public class FifoMonitor
{
    public class FifoCriticalSection : IDisposable
    {
        private readonly FifoMonitor _parent;

        public FifoCriticalSection(FifoMonitor parent)
        {
            _parent = parent;
            _parent.Enter();
        }

        public void Dispose()
        {
            _parent.Exit();
        }
    }

    private object _innerLock = new object();
    private volatile int counter = 0;
    private volatile int current = 1;

    public FifoCriticalSection Lock()
    {
        return new FifoCriticalSection(this);
    }

    private void Enter()
    {
        int mine = Interlocked.Increment(ref counter);
        Monitor.Enter(_innerLock);
        while (current != mine) Monitor.Wait(_innerLock);
    }

    private void Exit()
    {
        Interlocked.Increment(ref current);
        Monitor.PulseAll(_innerLock);
        Monitor.Exit(_innerLock);
    }
}

现在进行测试。这是我的程序:

public class Program
{
    public static void Main()
    {
        //Setup
        var blockingCollection = new QueuedBlockingCollection<int>(10);
        var tasks = new Task[10];

        //Block the collection by filling it up
        for (int i=1; i<=10; i++) blockingCollection.Add(99);

        //Start 10 threads all trying to add another value
        for (int i=1; i<=10; i++)
        {
            int index = i; //unclose
            tasks[index-1] = Task.Run( () => blockingCollection.Enqueue(index) );
            Task.Delay(100).Wait();  //Wait long enough for the Enqueue call to block
        }

        //Purge the collection, making room for more values
        while (blockingCollection.Count > 0)
        {
            var n = blockingCollection.Take();
            Console.WriteLine(n);
        }

        //Wait for our pending adds to complete
        Task.WaitAll(tasks);

        //Display the collection in the order read
        while (blockingCollection.Count > 0)
        {
            var n = blockingCollection.Take();
            Console.WriteLine(n);
        }

    }
}

输出:

99
99
99
99
99
99
99
99
99
99
1
2
3
4
5 
6
7
8
9
10

看起来有效!但为了确定起见,我将Enqueue 改回Add,以确保该解决方案确实有所作为。果然,它以常规的Add 结束。

99
99
99
99
99
99
99
99
99
99
2
3
4
6
1
5
7
8
9
10

查看DotNetFiddle上的代码

【讨论】:

  • 看起来不错,今天晚些时候去看看!
【解决方案2】:

编辑后确定 - 不是在 BlockingCollection 中添加结果,而是在阻塞集合中添加任务。这具有按顺序处理项目的功能,并且有最大的并行度,这将防止过多的线程启动并耗尽所有内存。

https://dotnetfiddle.net/lUbSqB

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;

public class Program
{
    private static BlockingCollection<Task<int>> BlockingCollection {get;set;}  

    public static void Producer(int numTasks)
    {
        Random r = new Random(7);
        for(int i = 0 ; i < numTasks ; i++)
        {
            int closured = i;
            Task<int> task = new Task<int>(()=>
            { 
                Thread.Sleep(r.Next(100));
                Console.WriteLine("Produced: " + closured);
                return closured;
            });
            BlockingCollection.Add(task);
            task.Start();
        }
        BlockingCollection.CompleteAdding();
    }


    public static void Main()
    {
        int numTasks = 20;
        int maxParallelism = 3;

        BlockingCollection = new BlockingCollection<Task<int>>(maxParallelism);

        Task.Factory.StartNew(()=> Producer(numTasks));

        foreach(var task in BlockingCollection.GetConsumingEnumerable())
        {
            task.Wait();
            Console.WriteLine("              Consumed: "+ task.Result);
            task.Dispose();
        }

    }
}

结果:

Produced: 0
              Consumed: 0
Produced: 1
              Consumed: 1
Produced: 3
Produced: 2
              Consumed: 2
              Consumed: 3
Produced: 4
              Consumed: 4
Produced: 6
Produced: 5
              Consumed: 5
              Consumed: 6
Produced: 7
              Consumed: 7
Produced: 8
              Consumed: 8
Produced: 10
Produced: 9
              Consumed: 9
              Consumed: 10
Produced: 12
Produced: 13
Produced: 11
              Consumed: 11
              Consumed: 12
              Consumed: 13
Produced: 15
Produced: 14
              Consumed: 14
              Consumed: 15
Produced: 17
Produced: 16
Produced: 18
              Consumed: 16
              Consumed: 17
              Consumed: 18
Produced: 19
              Consumed: 19

【讨论】:

  • 这使得 BlockingCollection 的功能的使用变得无关紧要。因为它迫使我创建一个与项目数量一样大的任务数组,这是由于速度(新任务 N 次)而有问题的数组,而不是阻塞,它只是将所有内容存储在数组中。
  • 看起来真的很棒,适合我的问题,非常感谢!
猜你喜欢
  • 2012-05-21
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-03-12
  • 1970-01-01
  • 2018-09-24
  • 1970-01-01
相关资源
最近更新 更多