【问题标题】:Why is parallel.Invoke not working in this case为什么是 parallel.Invoke 在这种情况下不起作用
【发布时间】:2014-06-24 13:56:35
【问题描述】:

我有一个这样的文件数组..

字符串[] 解压缩文件; 这个想法是我想并行解析这些文件。当它们被解析时,一条记录被放置在一个并发包上。随着记录的放置,我想启动更新功能。

这是我在 Main() 中所做的:

    foreach(var file in unZippedFiles)
    {    Parallel.Invoke
             (   
                                  () => ImportFiles(file),
                                  () => UpdateTest()


                             );
            }

Update 的代码是这样的。

   static void UpdateTest( )
    {
        Console.WriteLine("Updating/Inserting merchant information.");

        while (!merchCollection.IsEmpty || producingRecords )
        {
            merchant x;
            if (merchCollection.TryTake(out x))
            {

                UPDATE_MERCHANT(x.m_id, x.mInfo, x.month, x.year);
            }
        }



    }

这就是导入代码的样子。它几乎是一个巨大的字符串解析器。

      System.IO.StreamReader SR = new System.IO.StreamReader(fileName);
             long COUNTER = 0;
             StringBuilder contents = new StringBuilder( );
             string M_ID = "";

             string BOF_DELIMITER = "%%MS_SKEY_0000_000_PDF:";
             string EOF_DELIMITER = "%%EOF";

             try
             {
                 record_count = 0;
                 producingRecords = true;
                 for (COUNTER = 0; COUNTER <= SR.BaseStream.Length - 1; COUNTER++)
                 {
                     if (SR.EndOfStream)
                     {
                         break;
                     }
                     contents.AppendLine(Strings.Trim(SR.ReadLine()));
                     contents.AppendLine(System.Environment.NewLine);
                     //contents += Strings.Trim(SR.ReadLine());
                     //contents += Strings.Chr(10);
                     if (contents.ToString().IndexOf((EOF_DELIMITER)) > -1)
                     {
                         if (contents.ToString().StartsWith(BOF_DELIMITER) & contents.ToString().IndexOf(EOF_DELIMITER) > -1)
                         {
                             string data = contents.ToString();
                             M_ID = data.Substring(data.IndexOf("_M") + 2, data.Substring(data.IndexOf("_M") + 2).IndexOf("_"));
                             Console.WriteLine("Merchant: " + M_ID);
                             merchant newmerch;
                             newmerch.m_id = M_ID;
                             newmerch.mInfo = data.Substring(0, (data.IndexOf(EOF_DELIMITER) + 5));
                             newmerch.month = DateTime.Now.AddMonths(-1).Month;
                             newmerch.year = DateTime.Now.AddMonths(-1).Year;
                             //Update(newmerch);
                             merchCollection.Add(newmerch);
                         }
                         contents.Clear();
                         //GC.Collect();
                     }
                 }

                 SR.Close();
                 // UpdateTest();

             }
             catch (Exception ex)
             {
                 producingRecords = false;

             }
             finally
             {
                 producingRecords = false;
             }
         }

我遇到的问题是更新运行一次,然后 importfile 函数只是接管并且不屈服于更新函数。关于我做错了什么的任何想法都会有很大帮助。

【问题讨论】:

  • 看起来是时间问题。您的UpdateTestImportFiles 有机会将项目放入metchCollection 或将producingRecords 设置为true 之前完成。在没有看到其余代码的情况下,这是在黑暗中的狂野刺痛。
  • 我同意这是一个时间问题。但我想我要问的是为什么 ImportFile 不产生。我也会添加导入代码。
  • 以下是您可以修补它的方法:将merchCollection 声明为BlockingCollection&lt;merchant&gt; foreach 范围内;完全抛弃producingRecords 变量——它不适合线程同步;更改您的更新以通过foreach (var m in merchCollection.GetConsumingEnumerable()) 遍历阻塞集合;在您的ImportFiles finally 块中调用merchCollection.CompleteAdding(),从而完成您的更新。瞧!

标签: c#-4.0 task-parallel-library parallel.foreach


【解决方案1】:

这是我修复线程同步的尝试。请注意,我没有从功能的角度更改任何代码(除了取出 catch - 这通常是一个坏主意;需要传播异常)。

如果有些东西不能编译,请原谅——我是根据不完整的 sn-ps 写这篇文章的。

主要

foreach(var file in unZippedFiles)
{
    using (var merchCollection = new BlockingCollection<merchant>())
    {
        Parallel.Invoke
        (   
            () => ImportFiles(file, merchCollection),
            () => UpdateTest(merchCollection)
        );
    }
}

更新

private void UpdateTest(BlockingCollection<merchant> merchCollection)
{
    Console.WriteLine("Updating/Inserting merchant information.");

    foreach (merchant x in merchCollection.GetConsumingEnumerable())
    {
        UPDATE_MERCHANT(x.m_id, x.mInfo, x.month, x.year);
    }
}

导入

不要忘记将merchCollection 作为参数传入 - 它不应该是静态的。

         System.IO.StreamReader SR = new System.IO.StreamReader(fileName);
         long COUNTER = 0;
         StringBuilder contents = new StringBuilder( );
         string M_ID = "";

         string BOF_DELIMITER = "%%MS_SKEY_0000_000_PDF:";
         string EOF_DELIMITER = "%%EOF";

         try
         {
             record_count = 0;

             for (COUNTER = 0; COUNTER <= SR.BaseStream.Length - 1; COUNTER++)
             {
                 if (SR.EndOfStream)
                 {
                     break;
                 }
                 contents.AppendLine(Strings.Trim(SR.ReadLine()));
                 contents.AppendLine(System.Environment.NewLine);
                 //contents += Strings.Trim(SR.ReadLine());
                 //contents += Strings.Chr(10);
                 if (contents.ToString().IndexOf((EOF_DELIMITER)) > -1)
                 {
                     if (contents.ToString().StartsWith(BOF_DELIMITER) & contents.ToString().IndexOf(EOF_DELIMITER) > -1)
                     {
                         string data = contents.ToString();
                         M_ID = data.Substring(data.IndexOf("_M") + 2, data.Substring(data.IndexOf("_M") + 2).IndexOf("_"));
                         Console.WriteLine("Merchant: " + M_ID);
                         merchant newmerch;
                         newmerch.m_id = M_ID;
                         newmerch.mInfo = data.Substring(0, (data.IndexOf(EOF_DELIMITER) + 5));
                         newmerch.month = DateTime.Now.AddMonths(-1).Month;
                         newmerch.year = DateTime.Now.AddMonths(-1).Year;
                         //Update(newmerch);
                         merchCollection.Add(newmerch);
                     }
                     contents.Clear();
                     //GC.Collect();
                 }
             }

             SR.Close();
             // UpdateTest();

         }
         finally
         {
             merchCollection.CompleteAdding();
         }
     }

【讨论】:

  • 谢谢基里尔。这看起来很有希望。在尝试“加速这段代码”的过程中,我做了很多阅读和学习。我不知道 BlockingCollection。我从没想过 ConcurrentBag
  • @Waddaulookingat,BlockingCollection&lt;T&gt; 的美妙之处在于它包括开箱即用的同步生产者和消费者的方法(通过 CompleteAdding / GetConsumingEnumerable),所以你不需要无需担心拥有自己的线程同步结构——您可以免费获得它们。祝你好运。
猜你喜欢
  • 2017-11-26
  • 2010-12-29
  • 1970-01-01
  • 2021-08-05
  • 2011-11-14
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多