【问题标题】:How to wrap ConcurrentDictionary in BlockingCollection?如何在 BlockingCollection 中包装 ConcurrentDictionary?
【发布时间】:2012-05-24 11:02:18
【问题描述】:

我尝试通过将 ConcurrentDictionary 包装在 BlockingCollection 中来实现它,但似乎没有成功。

我了解一个变量声明可与 BlockingCollection 一起使用,例如 ConcurrentBag<T>ConcurrentQueue<T> 等。

所以,要创建一个包装在 BlockingCollection 中的 ConcurrentBag,我会像这样声明和实例化:

BlockingCollection<int> bag = new BlockingCollection<int>(new ConcurrentBag<int>());

但是如何为 ConcurrentDictionary 做呢?我需要 BlockingCollection 在生产者和消费者端的阻塞功能。

【问题讨论】:

  • 字典(以及 ConcurrentDictionary 也是)不保留项目的顺序。你能描述一下你的生产者-消费者场景吗?
  • @Dennis,我知道这一点。生产者将 KeyValuePairs 存储在 concurrentDictionary 中,如果 int 与相应的键匹配,则消费者任务递增一个 int 并删除 KeyValuePair。我这样做是因为工作任务用值填充并发字典,但以任意顺序,消费者任务确保接收到的值以正确的顺序传递/处理。可以将 ConcurrentDictionary 包装在 BlockingCollection 中吗?
  • 你想出了什么解决方案?我正在尝试为类似问题找到一个很好的解决方案,即生产者没有按照消费者所需的顺序生产物品。 (旧帖我知道,但值得一试)

标签: c# concurrency concurrentdictionary


【解决方案1】:

也许你需要一个blockingCollection的并发字典

        ConcurrentDictionary<int, BlockingCollection<string>> mailBoxes = new ConcurrentDictionary<int, BlockingCollection<string>>();
        int maxBoxes = 5;

        CancellationTokenSource cancelationTokenSource = new CancellationTokenSource();
        CancellationToken cancelationToken = cancelationTokenSource.Token;

        Random rnd = new Random();
        // Producer
        Task.Factory.StartNew(() =>
        {
            while (true)
            {
                int index = rnd.Next(0, maxBoxes);
                // put the letter in the mailbox 'index'
                var box = mailBoxes.GetOrAdd(index, new BlockingCollection<string>());
                box.Add("some message " + index, cancelationToken);
                Console.WriteLine("Produced a letter to put in box " + index);

                // Wait simulating a heavy production item.
                Thread.Sleep(1000);
            }
        });

        // Consumer 1
        Task.Factory.StartNew(() =>
        {
            while (true)
            {
                int index = rnd.Next(0, maxBoxes);
                // get the letter in the mailbox 'index'
                var box = mailBoxes.GetOrAdd(index, new BlockingCollection<string>());
                var message = box.Take(cancelationToken);
                Console.WriteLine("Consumed 1: " + message);

                // consume a item cost less than produce it:
                Thread.Sleep(50);
            }
        });

        // Consumer 2
        Task.Factory.StartNew(() =>
        {
            while (true)
            {
                int index = rnd.Next(0, maxBoxes);
                // get the letter in the mailbox 'index'
                var box = mailBoxes.GetOrAdd(index, new BlockingCollection<string>());
                var message = box.Take(cancelationToken);
                Console.WriteLine("Consumed 2: " + message);

                // consume a item cost less than produce it:
                Thread.Sleep(50);
            }
        });

        Console.ReadLine();
        cancelationTokenSource.Cancel();

通过这种方式,在邮箱 5 中期待某物的消费者将等到生产者将一封信放入邮箱 5 中。

【讨论】:

    【解决方案2】:

    您需要编写自己的适配器类 - 类似于:

        public class ConcurrentDictionaryWrapper<TKey,TValue> : IProducerConsumerCollection<KeyValuePair<TKey,TValue>>
    {
        private ConcurrentDictionary<TKey, TValue> dictionary;
    
        public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator()
        {
            return dictionary.GetEnumerator();
        }
    
        IEnumerator IEnumerable.GetEnumerator()
        {
            return GetEnumerator();
        }
    
        public void CopyTo(Array array, int index)
        {
            throw new NotImplementedException();
        }
    
        public int Count
        {
            get { return dictionary.Count; }
        }
    
        public object SyncRoot
        {
            get { return this; }
        }
    
        public bool IsSynchronized
        {
            get { return true; }
        }
    
        public void CopyTo(KeyValuePair<TKey, TValue>[] array, int index)
        {
            throw new NotImplementedException();
        }
    
        public bool TryAdd(KeyValuePair<TKey, TValue> item)
        {
            return dictionary.TryAdd(item.Key, item.Value);
        }
    
        public bool TryTake(out KeyValuePair<TKey, TValue> item)
        {
            item = dictionary.FirstOrDefault();
            TValue value;
            return dictionary.TryRemove(item.Key, out value);
        }
    
        public KeyValuePair<TKey, TValue>[] ToArray()
        {
            throw new NotImplementedException();
        }
    }
    

    【讨论】:

    • 感谢代码建议。但是我使用 BlockingCollection 的主要目的是能够将集合标记为添加完成并检查它的状态以及它的添加是否完成和空,类似于 BlockingCollection 提供的功能。我知道我可以轻松添加此类功能,但我正在寻找如何直接通过 BlockingCollection 执行此操作的建议。到目前为止,我没有看到它不能直接通过 Blocking 集合工作的原因。也许只需要 IProducerConsumerCollection
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-03-02
    • 1970-01-01
    • 1970-01-01
    • 2021-03-08
    • 1970-01-01
    相关资源
    最近更新 更多