【问题标题】:Multiple producer/consumer interaction in .Net 4.0.Net 4.0 中的多个生产者/消费者交互
【发布时间】:2014-05-30 22:21:24
【问题描述】:

我正在使用 BlockingCollection 处理一些文件,然后将它们上传到服务器。

现在我有一个 Producer 递归文件系统并将某些文件压缩到临时位置。一旦它完成了一个文件,它会将我自己的对象添加到 BlockingCollection 中,其中包含诸如文件名、文件路径、修改日期等信息。消费者然后抓住这个对象并使用它来上传文件。当 Producer 完成搜索文件系统并处理文件后,它会调用 BlockingCollection.CompleteAdding() 方法向 Consumer 发出它已经完成的信号。

我想做的是将生产者的数量增加到 2 个或更多。原因是压缩过程需要一段时间,在多核处理器上我只利用 1 个核心。这会导致生产者有时会在更快的网络上落后于消费者。

我的问题是,当我有多个生产者且只有一个消费者时,我如何向消费者发出所有生产者都已完成工作的信号?如果我在其中一个生产者上调用 BlockingCollection.CompleteAdding() 方法,我仍然可以让一个或多个其他生产者仍在工作。

【问题讨论】:

  • 将压缩分担给消费者并让生产者尽可能瘦可能更有益。
  • 同意 Babcock 的观点,在 aprallel 中递归目录树只会阻塞文件系统。最好将要处理的文件位置添加到队列中,然后并行压缩和发送。密切关注您的 HDD 吞吐量。同时读取多个位置将导致它将磁头跨磁盘移动到不同的扇区。在那个时候它不会读取数据,因此会降低磁盘吞吐量。理想情况下,您仍然会在 1 个线程上读取,将每个文件读入一个字节 [],然后压缩并并行发送。
  • 对不起,我应该在我的原始帖子中更清楚。只有一个递归线程可以提供 2 个或更多压缩(生产者)任务,然后再提供单个消费者任务。我正在使用高压缩,即使在快速处理器上也需要相当多的时间。每个线程每秒最多写入大约 2MB 的数据。

标签: c# blockingcollection


【解决方案1】:

在调用BlockingCollection.CompleteAdding() 之前,您可以在Producer 代码中使用信号量。信号量由所有Producer 实例共享,当最后一个生产者完成时,它可以调用该方法。信号量可以实现为一个简单的计数器,在创建生产者时增加计数器,在生产者结束其工作时减少它。如果计数器达到零,则可以调用BlockingCollection.CompleteAdding()

【讨论】:

    【解决方案2】:

    我使用这样的东西来拥有多个生产者和消费者。这只是一个非常简单的解决方案,并未针对生产代码进行优化。

    public class ManageBatchProcessing 
    {
        private  BlockingCollection<Action> blockingCollection;
    
        public void Process()
        {
            blockingCollection = new BlockingCollection<Action>();
            int numberOfBatches = 10;
            Process(HandleProducers, HandleConsumers, numberOfBatches);
        }
    
        private void Process(Action<int> produce, Action<int> consume, int numberOfBatches)
        {
            produce(numberOfBatches);
            consume(numberOfBatches);
        }
    
        private void HandleConsumers(int numberOfBatches)
        {
            var consumers = new List<Task>();
    
            for (var i = 1; i <= numberOfBatches; i++)
            {
                consumers.Add(Task.Factory.StartNew(() =>
                {
                    foreach (var action in blockingCollection.GetConsumingEnumerable())
                    {
                        action();
                    }
                }));
            }
    
            Task.WaitAll(consumers.ToArray());
        }
    
        private void HandleProducers(int numberOfBatches)
        {
            var producers = new List<Task>();
    
            for (var i = 0; i <= numberOfBatches; i++)
            {
                producers.Add(Task.Factory.StartNew(() =>
                {
                    blockingCollection.Add(() => YourProdcerMethod());
                }));
            }
    
            Task.WaitAll(producers.ToArray());
            blockingCollection.CompleteAdding();
        }
    }
    

    【讨论】:

    • 所以您在等待所有生产者完成后再开始您的消费者?这违背了生产者/消费者模式的目的。
    猜你喜欢
    • 2017-07-13
    • 2015-09-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-02-04
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多