【问题标题】:How to lock threads dynamically and avoid race condition如何动态锁定线程并避免竞争条件
【发布时间】:2018-12-24 13:56:22
【问题描述】:

我正在尝试动态锁定线程,但无论我尝试什么,总是会发生某种竞争条件,这似乎是由多个线程同时启动任务引起的,但我一直无法找到很好的答案。

这是一个例子:

using System.Linq;
using System.Threading;
using System.Threading.Tasks;

public class Example
{
public static readonly Object padLock = new Object();
public readonly static ConcurrentDictionary<string, object> locks = new 
ConcurrentDictionary<string, object>();
public static List<string> processes = new List<string>();

public static void Main()
{
  //Add random list of processes (just testing with one for now)
  for (var i = 0; i < 1; i++) {
     processes.Add("random" + i.ToString()); 
  }

  while (true)
  {
     foreach (var process in processes )
     {
        var currentProc = process ;
        lock (padLock)
        {
           if (!locks.ContainsKey(currentProc))
           {
              System.Threading.Tasks.Task.Factory.StartNew(() =>
              {
                 if (!locks.ContainsKey(currentProc))
                 {
                    var lockObject = locks.GetOrAdd(currentProc, new object());
                    lock (lockObject)
                    { 
                       Console.WriteLine("Currently Executing " + currentProc); 
                       Console.WriteLine("Ended Executing " + currentProc);
                       ((IDictionary)locks).Remove(currentProc);
                    }
                 }
              });
           }
        }
       // Thread.Sleep(0);
     }
  }

  Console.ReadLine();
 }
}

输出:

Started 1

Finished 1

Started 1

Finished 1

Started 1

Finished 1

但有时会得到:

Started 1

Started 1

Finished 1

这是不希望的,动态锁应该锁定它并只执行一次

【问题讨论】:

  • hrmm Started 没有在你的代码中表示,只是说
  • 你想完成什么OP?如果您只想确保每个对象只处理一次,那么这是一种非常不寻常的模式。
  • @TheGeneral Started 当前正在执行
  • @JohnWu 你会怎么做?
  • 我们不知道您要做什么。这样做的目的是什么?

标签: c# multithreading parallel-processing thread-safety threadpool


【解决方案1】:

虽然我不知道您要做什么,但您看到的行为是因为您从多个线程向 Console.WriteLine 发送垃圾邮件,它们被乱序打印。我通过将时间戳附加到 Console.WriteLine 来验证这一点:

public class Example
{
  public static readonly Object padLock = new Object();
  public readonly static ConcurrentDictionary<string, object> locks = new ConcurrentDictionary<string, object>(); 
  public static List<string> processes = new List<string>(); 

  [ThreadStatic]
  private static bool flag = false;

 public static void Main()
 {
  //Add random list of processes (just testing with one for now)
  for (var i = 0; i < 10; i++)
  {
     processes.Add(i.ToString());
  }

  while (true)
  {
     foreach (var process in processes)
     {
        var currentProc = process; 

        if (!locks.ContainsKey(currentProc))
        {
           var lockObject = locks.GetOrAdd(currentProc, new object());
           Task.Factory.StartNew(() =>
            {
               lock (lockObject)
               {
                  if (flag) throw new Exception();
                  flag = true;
                  Console.WriteLine("Currently Executing " + currentProc);
                  Thread.Sleep(0); // You can siimulate work here
                  Console.WriteLine("Ended Executing " + currentProc);
                  flag = false;
                  ((IDictionary)locks).Remove(currentProc);
               }
            });
        }
     }
   }
  }
}

运行如下:

....
Ended Executing 1 4025734
Currently Executing 1 4026419
Ended Executing 1 4028737
Currently Executing 1 4029565
Ended Executing 1 4030472
Currently Executing 1 4031643
Currently Executing 1 3659670
Ended Executing 1 4032900
Currently Executing 1 4033582
Ended Executing 1 4034318
Ended Executing 1 4032786
Currently Executing 1 4038042
Ended Executing 1 4042484
Currently Executing 1 4044967
Currently Executing 1 4007282
...

【讨论】:

  • 这是一个有用的答案,但这并不准确,我按照您的说法附上了刻度线,是的,它们有问题,但是如果您复制并粘贴,您会注意到仍然存在
  • 我在上面的代码中添加了一个标志,如果两个事件出现乱序并且没有抛出异常,它将抛出异常。
  • 我在本地运行了您的代码,但它在@MineR 中引发了异常
  • 这很有趣,我想知道为什么这不起作用@MineR
  • 我将使用我实际运行的代码进行编辑,它运行良好
【解决方案2】:

这是一个常见的问题

我将您的要求解读为“获取进程列表,然后使用多个线程对每个进程执行一次操作。”

就我的示例而言,假设 Foo(process) 完成了必须只执行一次的工作单元。

这是一个非常普遍的需求,有几种模式。

Parallel.ForEach

这种技术可以为循环的每次迭代使用不同的线程,这些线程将同时执行。

Parallel.ForEach(processes, process => Foo(process));

是的;就一行代码。

异步任务

如果Foo() 是异步的,这种技术是合适的。它只是为所有进程安排一个任务,然后等待它们,并让 SynchronizationContext 对其进行排序。

var tasks = processes.Select( p => Foo(process) );
await Task.WhenAll(tasks);

生产者/消费者

这使用producer-consumer pattern,这是一个线程添加到队列而另一个线程从队列中取出的传统方式。通过从队列中删除一个项目,它实际上被“锁定”了,这样其他线程就不会尝试处理它。

BlockingCollection<string>() queue = new BlockingCollection<string>();

void SetUpQueue()
{
    for (int i=0; i<100; i++) queue.Add(i.ToString());
    queue.CompleteAdding();
}

void Worker()
{
    while (queue.Count > 0 || !queue.IsAddingCompleted)
    {
        var item = queue.Take();
        Foo(item);
    }
}

【讨论】:

  • Parallel.ForEach 似乎给出了正确的结果,我只是不明白为什么即使这是在一个 while 循环中也能工作,你能详细说明一下吗?
  • 对不起,我不明白你的问题。你遇到了哪个例子?
  • 我的意思是,Parallel.ForEach 似乎工作,但我不明白的是为什么它不会启动同一个线程两次,即使它在一个 while 循环内
  • source code 非常复杂,但它似乎将列表存储为数组,创建了一系列“worker”(取决于最大并行度和 CPU 中的内核数),然后每个工作人员从数组中抓取项目并对其进行处理。如果列表中的元素过多,实际上可以重用线程,但这是意料之中的。
  • 感谢@JohnWu,非常感谢您的意见,但我觉得 MineR 更符合我想要完成的目标,Parallel.Foreach 肯定值得研究,谢谢
猜你喜欢
  • 1970-01-01
  • 2019-06-12
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-01-10
  • 2015-01-30
相关资源
最近更新 更多