【问题标题】:Parallel ForEach using a ConcurrentBag not working as expected使用 ConcurrentBag 的并行 ForEach 未按预期工作
【发布时间】:2020-09-02 06:12:46
【问题描述】:

我有这段代码可以处理列表中的项目:

    static readonly object _Lock = new object();

    public class Item
    {
        public string Name;
        public string ID;
    }

    static void Main(string[] args)
    {
        var items = new List<Item>
        {
            new Item { Name = "One", ID = "123" },
            new Item { Name = "Two", ID = "234" },
            new Item { Name = "Three", ID = "123" }
        };

        var itemsProcess = new ConcurrentBag<Item>();
        Parallel.ForEach(items, (item) =>
        {
            Item itemProcess = null;
            // lock (_Lock)
            {
                itemProcess = itemsProcess.FirstOrDefault(a => a.ID == item.ID);
            }
            if (itemProcess != null)
            {
                Console.WriteLine($"Item [{item.Name}] was already processed as [{itemProcess.Name}]");
            }
            else
            {
                itemsProcess.Add(item);
                Console.WriteLine($"Processing item [{item.Name}]");
                Thread.Sleep(1000); // do some work...
            }
        });

        Console.ReadKey();
      }

我基本上是使用ConcurrentBag 根据几个条件检查对象是否存在。
期望总能得到类似的输出(顺序可能会有所不同):

Processing item [One]
Item [Three] was already processed as [One]
Processing item [Two]

但我有时会得到一个输出,这表明我的代码不是线程安全的:

Processing item [Three]
Processing item [One]
Processing item [Two]

所以我认为itemsProcess.FirstOrDefault() 会阻塞的假设是错误的。
使用lock 不会改变任何东西。显然,这里出了点问题,我真的不明白为什么?

我知道我可以通过其他方式“解决”这个问题(一种是在输入 Parallel.ForEach() 之前准备好列表),但我真的很想知道为什么会出现这种行为?

【问题讨论】:

  • Concurrentbag 已经是线程安全的,所以 lock 在这里不会改变任何东西。并且并行 foreach 循环也启动 3 个并行线程,因此每次 itemProcess 将为空,因此 else 部分将执行。如果我将 ConcurrentBag 更改为 List 并使用 lock 那么它对我有用,因为现在 lock 将被视为普通列表。
  • 您的示例包含两个具有相同IDItem 对象。这是故意的吗?
  • @TheodorZoulias 是的,ID 可以复制,并且只需要处理具有给定ID 的第一项。
  • @JohnathanBarclay 哦,我明白了。可能这个澄清应该是问题的一部分。

标签: c# concurrency parallel-processing parallel.foreach


【解决方案1】:

您的并行循环中有 2 个独立的操作:FirstOrDefaultAdd

ConcurrentBag 无法确保这两个操作之间的线程安全。

另一种选择是ConcurrentDictionary,它有一个GetOrAdd 方法,只会在不存在密钥时添加一个项目:

var itemsProcess = new ConcurrentDictionary<string, Item>();
Parallel.ForEach(items, item =>
{
    // Returns existing item with same ID or adds this item
    var itemProcess = itemsProcess.GetOrAdd(item.Id, item);
    if (!object.ReferenceEquals(item, itemProcess))
    {
        Console.WriteLine($"Item [{item.Name}] was already processed as [{itemProcess.Name}]");
    }
    else
    {
        Console.WriteLine($"Processing item [{item.Name}]");
        // do some work...
    }
});

如果您随后需要以ICollection 形式处理的项目,可以通过itemsProcess.Values 访问它们。

【讨论】:

    【解决方案2】:

    原因,是因为仍然存在 数据竞争... 2 个线程仍然可以在 非线程安全 中读取并添加到 ConcurrentBag方式。使用任何并发集合只意味着你有一个自洽的结构,但它并不能保护你免于编写其他非线程安全 代码

    lock 你的想法是对的

    var itemsProcess = new Dictionary<string, Item>();
    Parallel.ForEach(items, (item) =>
    {
    
       lock (_Lock)
       {
          if (itemsProcess.TryGetValue(item.ID, out var val))
          {
             Console.WriteLine($"Item [{item.Name}] was already processed as [{val.Name}]");
             return;
          }
    
          itemsProcess.TryAdd(item.ID, item);
       }
    
       Console.WriteLine($"Processing item [{item.Name}]");
       Thread.Sleep(1000); // do some work...
    
    });
    

    注意:您还可以在并行处理之前过滤列表中的重复项,这样就不需要锁定或收集了

    【讨论】:

    • 我会去预过滤列表。使用Parallel.ForEach()lock() 有什么意义?
    • @TanveerBadar 完全同意 :) 尽管可能还有一些其他逻辑依赖于当前的实现。
    【解决方案3】:

    不借助锁,您可以“滥用”ConcurrentDictionary 并避免此处的所有锁定以确保唯一性。

    通过 ID 将项目添加到您的字典中,数据结构将保持一致,一旦完成,您就可以使用 dictionary.Values 字段来获取唯一项目。

    P.S.:我觉得您的示例涉及更多,因为没有人使用 Parallel.ForEach() 执行 Distinct(),这就是您的代码的含义。

    最后,为了解决发生这种情况的原因,当涉及到并发时,这几乎总是一种反模式,并且与作者在这里的意思不符。

    if(!collection.Contains(item))
          collection.Add(item);
    

    Contains() 执行并返回 false 时,另一个线程可能已经执行了相同的操作,领先并添加了相同的项目。

    这种竞争条件是几乎所有集合修改操作都有两种风格的原因:您要么有一个 collection.TryAdd(),它会尝试自动添加一个项目并返回 true/false 来告诉您结果,或者您有类似 @987654328 之类的东西@ 和 AddOrUpdate() 再次以原子方式插入一个项目并在之后获取/更新它。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-10-06
      • 2012-06-19
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多