【问题标题】:A Better Way To Make My Parallel.ForEach Thread Safe?让我的 Parallel.ForEach 线程安全的更好方法?
【发布时间】:2012-10-07 03:59:55
【问题描述】:

我想让下面的代码线程安全。不幸的是,我尝试在此代码中的各个级别进行锁定,但没有成功。我似乎可以实现线程安全的唯一实例是在整个循环周围放置一个锁,这有效地使 Parallel.ForEach 不会比仅使用 foreach 更快(甚至可能更慢)。该代码相对/几乎安全,没有锁定。在大约每 20 次左右的执行中,geneTokens.Value[-1] 键和 gtCandidates.Value[-1] 键的总和似乎只显示出细微的变化。

我意识到 Dictionary 不是线程安全的。但是,我不能将此特定对象更改为 ConcurrentDictionary 而不会对下游的性能造成重大影响。我宁愿使用常规 foreach 运行这部分代码,也不愿更改该特定对象。但是,我使用 ConcurrentDictionary 来保存各个 Dictionary 对象。我也尝试过进行此更改,但它并没有解决我的种族问题。

这是我的类级别变量:

//Holds all tokens derived from each sequence chunk
public static ConcurrentBag<sequenceItem> tokenBag = 
  new ConcurrentBag<sequenceItem>();
public BlockingCollection<sequenceItem> sequenceTokens = new 
  BlockingCollection<sequenceItem>(tokenBag);
public ConcurrentDictionary<string, int> categories = new 
  ConcurrentDictionary<string, int>();
public ConcurrentDictionary<int, Dictionary<int, int>> gtStartingFrequencies = new 
  ConcurrentDictionary<int, Dictionary<int, int>>();
public ConcurrentDictionary<string, Dictionary<int, int>> gtCandidates = new 
  ConcurrentDictionary<string, Dictionary<int, int>>();
public ConcurrentDictionary<string, Dictionary<int, int>> geneTokens = new 
  ConcurrentDictionary<string, Dictionary<int, int>>();

这是 Parallel.ForEach:

Parallel.ForEach(sequenceTokens.GetConsumingEnumerable(), seqToken =>
{
  lock (locker)
  {
    //Check to see if the Sequence Token is a Gene Token
    Dictionary<int, int> geneTokenFreqs;
    if (geneTokens.TryGetValue(seqToken.text, out geneTokenFreqs))
    { //The Sequence Token is a Gene Token 


      *****************Race Issue Seems To Occur Here**************************** 
      //Increment or create category frequencies for each category provided
      int frequency;
      foreach (int category in seqToken.categories)
      {
        if (geneTokenFreqs.TryGetValue(category, out frequency))
        {   //increment the category frequency, if it already exists
            frequency++;
            geneTokenFreqs[category] = frequency;
        }
        else
        {   //Create the category frequency, if it does not exist
            geneTokenFreqs.Add(category, 1);
        }
      }

      //Update the frequencies total [-1] by the total # of categories incremented.
      geneTokenFreqs[-1] += seqToken.categories.Length;
      ******************************************************************************
    }
    else
    { //The Sequence Token is NOT yet a Gene Token
      //Check to see if the Sequence Token is a Gene Token Candidate yet
      Dictionary<int, int> candidateTokenFreqs;
      if (gtCandidates.TryGetValue(seqToken.text, out candidateTokenFreqs))
      {
        *****************Race Issue Seems To Occur Here****************************
        //Increment or create category frequencies for each category provided
        int frequency;
        foreach (int category in seqToken.categories)
        {
          if (candidateTokenFreqs.TryGetValue(category, out frequency))
          { //increment the category frequency, if it already exists
            frequency++;
            candidateTokenFreqs[category] = frequency;
          }
          else
          { //Create the category frequency, if it does not exist
            candidateTokenFreqs.Add(category, 1);
          }
        }

        //Update the frequencies total [-1] by the total # of categories incremented.
        candidateTokenFreqs[-1] += seqToken.categories.Length;
        *****************************************************************************

        //Only update the candidate sequence count once per sequence
        if (candidateTokenFreqs[-3] != seqToken.sequenceId)
        {
          candidateTokenFreqs[-3] = seqToken.sequenceId;
          candidateTokenFreqs[-2]++;

          //Promote the Token Candidate to a Gene Token, if it has been found >=
          //the user defined candidateThreshold
          if (candidateTokenFreqs[-2] >= candidateThreshold)
          {
            Dictionary<int, int> deletedCandidate;
            gtCandidates.TryRemove(seqToken.text, out deletedCandidate);
            geneTokens.TryAdd(seqToken.text, candidateTokenFreqs);
          }
        }
      }
      else
      {
        //create a new token candidate frequencies dictionary by making 
        //a copy of the default dictionary from
        gtCandidates.TryAdd(seqToken.text, new 
          Dictionary<int, int>(gtStartingFrequencies[seqToken.sequenceId]));
      }
    }
  }
});

【问题讨论】:

  • 这段代码中还有其他奇怪的东西:它是如何让你在不初始化frequency的情况下增加它的?
  • 工作得很好,因为频率被用作geneTokenFreqs.TryGetValue() 的“输出”变量。它增加的唯一时间是变量是否存在并从 TryGetValue 返回...我向您保证代码会执行。我整晚都在运行它:)
  • 抱歉,我没有看到out 部分。然后就好了。

标签: c# multithreading dictionary parallel.foreach concurrentdictionary


【解决方案1】:

显然,一些线程将在此处添加项目这一事实是一场数据竞赛:

geneTokens.TryAdd(seqToken.text, candidateTokenFreqs);

其他人将在此处阅读:

if (geneTokens.TryGetValue(seqToken.text, out geneTokenFreqs))

【讨论】:

  • 你是对的。我已经从上面删除了它们。在我发布此消息之前,它们实际上是在之前的测试运行中出现的。我之前曾尝试使用geneTokens.Contains() 并在geneTokenFreqs = geneTokens[seqToken.text]; 正上方使用锁但未成功这种方法没有解决我的问题。
  • @Jake Drew:我已经对我认为的问题进行了编辑。您可以尝试只锁定代码的这些部分。
  • 我确定如何有效地锁定 TryAdd 而不用锁定整个块,这让我回到了最初的问题。我也尝试了以下模式但没有运气: lock (locker) { isGeneToken =geneTokens.TryGetValue(seqToken.text, out geneTokenFreqs); } if(isGeneToken)
【解决方案2】:

我如何在我的项目中使用并发字典:

我在字典中放一个标志并从另一个线程检查标志是否存在。如果存在标志,我会相应地完成我的任务..

为此,我正在做的是:

1) 声明并发字典 2)使用 TryADD 方法添加标志 3) 尝试使用 TryGet 方法检索公寓。

1) 声明

  Dim cd As ConcurrentDictionary(Of Integer, [String]) = New ConcurrentDictionary(Of Integer, String)()

2) 添加

If cd.TryAdd(1, "uno") Then
        Console.WriteLine("CD.TryAdd() succeeded when it should have failed")
        numFailures += 1
    End If 

3) 检索

 If cd.TryGetValue(1, "uno") Then
        Console.WriteLine("CD.TryAdd() succeeded when it should have failed")
        numFailures += 1
    End If 

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2014-11-29
    • 1970-01-01
    • 1970-01-01
    • 2013-06-05
    • 2014-01-13
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多