【问题标题】:Linq Or IEnumerable taking Long to run when using Parallel.ForEach [closed]使用 Parallel.ForEach 时,Linq 或 IEnumerable 需要很长时间才能运行 [关闭]
【发布时间】:2017-04-12 14:13:49
【问题描述】:

我有一个读取 csv (200 mb) 的应用程序。

var lines = File.ReadLines(file).ToList();

csv 存储定价信息,其中包含大约 50 万条记录。 下面的代码 sn-p 调用 StoreValues 大约需要 18 秒。 有没有办法加快速度?

distinctMarketIds = 54 个整数值 行集合将有 500k 行,每行 [0] 都有匹配的 marketId。

IEnumerable<string[]> newList = (lines.Where(t => distinctMarketIds.Contains(t.Split(',')[0]))
                                      .Select(t => t.Split(',')));

log.Info(("Time Taken To Get Filtered Prices " + elapsed.Elapsed.TotalSeconds +" seconds."));

StoreValues(newList, file); //store the prices
log.Info(("Time Taken To Store Prices " + elapsed.Elapsed.TotalSeconds + " seconds."));

存储值方法使用Parallel.ForEach

Parallel.ForEach(finalLines, new ParallelOptions { MaxDegreeOfParallelism = MaxThreads }, (line) =>
{
});

我似乎无法找到为什么需要 18 秒才能完成此循环。 我在另一台类似规格的机器上测试过,StoreValue 方法需要 2.5 秒

#region LoadPriceDataFromCsvFile

        public int LoadPriceDataFromCsvFile(string filename, string[] marketIdList, int maxThreads)
        {
             MaxThreads = maxThreads;
             int filteredRows = 0;
             string[] files = Directory.GetFiles(filename, "*.csv");
            elapsed.Start();
            log.InfoFormat("Total Csv files to Scan {0}",files.Length);
            Parallel.ForEach(files, new ParallelOptions { MaxDegreeOfParallelism = MaxThreads }, (file) =>
            {
                try
                {
                    log.InfoFormat("About to Scan File {0}", file);
                    ScanCsvFilesAndGetPrices(file);
                }
               catch (System.OutOfMemoryException e)
               {
                  log.Info(e);
               }
                catch (Exception e)
                {
                    log.Info(e);
                }

            });


            return PriceCollection.Count;
        }

        #endregion

#region ScanCsvFilesAndGetPrices
        private void ScanCsvFilesAndGetPrices(string file)
        {
            try
            {

                log.Info(("Time Taken " + elapsed.Elapsed.TotalSeconds + " seconds."));
                var lines = File.ReadLines(file).ToList();
                log.Info(("Time Taken To Read csv " + elapsed.Elapsed.TotalSeconds + " seconds."));

                if (lines.Any())
                {
                    log.Info(("Time Taken To Read Any " + elapsed.Elapsed.TotalSeconds + " seconds."));
                    var firstLine = lines.ElementAt(1); // This is the First Line with Headers
                    log.Info(("Time Taken To Read First Line " + elapsed.Elapsed.TotalSeconds + " seconds."));
                    var lastLine = lines.Last(); // This is the Last line in the csv file
                    log.Info(("Time Taken To Read Last Line " + elapsed.Elapsed.TotalSeconds + " seconds."));
                    var header = lines.First().Split(',');
                    log.Info(("Time Taken To Split Header Line " + elapsed.Elapsed.TotalSeconds + " seconds."));

                    GetIndexOfFields(header);
                    log.Info(("Time Taken To Read Header " + elapsed.Elapsed.TotalSeconds + " seconds."));
                    // Get the Publish Date Time
                    if (PublishedDatetime_Index != -1)
                    {
                        var fLine = firstLine.Split(',');
                        var lLine = lastLine.Split(',');

                        var firstLineDatetime = (fLine[PublishedDatetime_Index].Contains("+"))? fLine[PublishedDatetime_Index].Remove(fLine[PublishedDatetime_Index].IndexOf("+",StringComparison.Ordinal)): fLine[PublishedDatetime_Index];
                        var publishDateTimeFirstLine =FileNameGenerator.GetCorrectTime(Convert.ToDateTime(firstLineDatetime));

                        string lastLineDatetime = (lLine[PublishedDatetime_Index].Contains("+"))? lLine[PublishedDatetime_Index].Remove(lLine[PublishedDatetime_Index].IndexOf("+",StringComparison.Ordinal)): lLine[PublishedDatetime_Index];
                        var publishDateTimeLastLine =FileNameGenerator.GetCorrectTime(Convert.ToDateTime(lastLineDatetime));
                        // check if the order execution date time of any order lieas between the date time of first and last line of csv so we can add that csv to our filtered list


                        string[] distinctMarketIds = OrderEntityColection.FindAll(obj =>obj.OrderLastChangeDateTimeUtc >= publishDateTimeFirstLine &&obj.OrderLastChangeDateTimeUtc <= publishDateTimeLastLine).Select(obj => obj.MarketId.ToString())
                                    .Distinct()
                                    .ToArray();

                        log.InfoFormat("Total Markets Identified {0}",distinctMarketIds.Length);

                        List<OrderEntity> foundOrdersList = OrderEntityColection.FindAll(obj =>obj.OrderLastChangeDateTimeUtc >= publishDateTimeFirstLine &&obj.OrderLastChangeDateTimeUtc <= publishDateTimeLastLine);

                        lock (FoundOrdersList)
                        {
                            FoundOrdersList.AddRange(foundOrdersList);
                        }
                        log.InfoFormat("Total Orders Identified {0}", FoundOrdersList.Count());

                        log.Info(("Time Taken To Read Execution Times and Market " + elapsed.Elapsed.TotalSeconds +" seconds."));
                        if (distinctMarketIds.Length != 0)
                        {
                            IEnumerable<string[]> newList =
                                                            (lines.Where(
                                                                t => distinctMarketIds.Contains(t.Split(',')[0]))
                                                                .Select(t => t.Split(','))
                                                                );

                            log.Info(("Time Taken To Get Filtered Prices " + elapsed.Elapsed.TotalSeconds +" seconds."));
                            // this is taking longer than expected. Somthing to do with IEnumerable<string[]> 
                            StoreValues(newList, file); //store the prices
                            log.Info(("Time Taken To Store Prices " + elapsed.Elapsed.TotalSeconds + " seconds."));



                        }
                    }

                }
            }
            catch (Exception e)
            {
                log.Info(e);
            }

         }


        #endregion

#region GetIndexOfFields

        // These are the fields we want to Look for from the headers and accordingly get their location
        private void GetIndexOfFields(IEnumerable<string> lineHeader)
        {
            int index = 0;
            foreach (var column in lineHeader)
            {
                if (column == "MarketId")
                {
                   MarketId_Index= index;
                }
                if (column == "Bid")
                {
                    Bid_Index = index; ;
                }
                 if (column == "Ask")
                {
                    Ask_Index = index; 
                }
                 if (column == "Mid")
                {
                    Mid_Index = index;
                }
                 if (column == "Is_Indicative")
                {
                    Is_Indicative_Index = index;
                }
                 if (column == "Price_Engine")
                {
                    Price_Engine_Index = index; 
                }
                 if (column == "PublishedDatetime")
                {
                    PublishedDatetime_Index = index; 
                }
                 if (column == "Market_Is_Open")
                {
                    Market_Is_Open_Index = index; 
                }
                 if (column == "AuditId")
                {
                    AuditId_Index = index; 
                }
                 if (column == "Row_Update_Version")
                {
                    Row_Update_Version_Index = index; 
                }
                 if (column == "DontPublish")
                {
                    DontPublish_Index = index; ;
                }
                index++;
            }


        }

        #endregion

        #region StoreValues

        private void StoreValues(IEnumerable<string[]> finalLines, string file)
        {

            log.InfoFormat("total Finel Lines Sent for Storing {0}", finalLines.Count());

            Parallel.ForEach(finalLines, new ParallelOptions { MaxDegreeOfParallelism = MaxThreads }, (line) =>
               {
                   var prices = new Prices();
                  // the code that you want to measure comes here
                  var datetime = (line[PublishedDatetime_Index].Contains("+")) ? line[PublishedDatetime_Index].Remove(line[PublishedDatetime_Index].IndexOf("+", StringComparison.Ordinal)) : line[PublishedDatetime_Index];


                   if (!IsNullOrEmpty(datetime))
                   {
                       prices.PublishedDatetime = Convert.ToDateTime(datetime);
                   }

                   if (!IsNullOrEmpty(line[MarketId_Index]))
                   {
                       prices.MarketId = Convert.ToInt32(line[MarketId_Index]);
                   }
                   if (!IsNullOrEmpty(line[Bid_Index]))
                   {
                       prices.Bid = Convert.ToDecimal(line[Bid_Index]);
                   }
                   if (!IsNullOrEmpty(line[Ask_Index]))
                   {
                       prices.Ask = Convert.ToDecimal(line[Ask_Index]);
                   }
                   if (!IsNullOrEmpty(line[Mid_Index]))
                   {
                       prices.Mid = Convert.ToDecimal(line[Mid_Index]);
                   }
                   if (!IsNullOrEmpty(line[Is_Indicative_Index]))
                   {
                       prices.Is_Indicative = Convert.ToBoolean(line[Is_Indicative_Index]);
                   }
                   else
                   {
                       prices.Is_Indicative = false;
                   }
                   if (!IsNullOrEmpty(line[Price_Engine_Index]))
                   {
                       prices.Price_Engine = Convert.ToString(line[Price_Engine_Index]);
                   }

                   if (!IsNullOrEmpty(line[Market_Is_Open_Index]))
                   {
                       prices.Market_Is_Open = line[Market_Is_Open_Index] == "1";
                   }
                   if (!IsNullOrEmpty(line[AuditId_Index]))
                   {
                       prices.AuditId = Convert.ToString(line[AuditId_Index]);
                   }
                   if (!IsNullOrEmpty(line[Row_Update_Version_Index]))
                   {
                       prices.Row_Update_Version = Convert.ToString(line[Row_Update_Version_Index]);
                   }
                   if (!IsNullOrEmpty(line[DontPublish_Index]))
                   {
                       if (DontPublish_Index != 0)
                       {
                           prices.DontPublish = line[DontPublish_Index] == "1";
                       }
                   }
                   prices.SbProdFile = file;

                   lock (PriceCollection)
                   {
                       PriceCollection.Add(prices);
                   }
               });

        }

【问题讨论】:

  • 发布 storevalues 方法的完整代码,而不仅仅是一个空的 foreach,如果它在另一台机器上运行得更快,则它不是代码
  • Parallel.ForEach 在单个文件上? finalLines 是什么?
  • 您是否尝试过任何分析?应该有更好的方法来做到这一点。
  • distinctMarketId 是什么...您将每行解析两次。
  • 如果您正在写入文件,Parallel.ForEach 不会提高性能,但可能会导致以错误的顺序写入行。

标签: c# visual-studio linq ienumerable parallel.foreach


【解决方案1】:
  • 当您需要处理单个文件时,我看不出Parallel.ForEach 如何帮助提高性能
  • 不要使用File.ReadLines(file).ToList(),如果您希望所有行都在内存中,请使用ReadAllLines;如果您想逐行处理,请使用ReadLines
  • 为什么要多次拆分线路?
  • distinctMarketIds 使用HashSet&lt;string&gt;

这应该更有效:

var marketIdSet = new HashSet<string>(OrderEntityColection.FindAll(obj =>obj.OrderLastChangeDateTimeUtc >= publishDateTimeFirstLine &&obj.OrderLastChangeDateTimeUtc <= publishDateTimeLastLine).Select(obj => obj.MarketId.ToString()));
IEnumerable<string[]> allFields = File.ReadLines(file)
    .Select(line => line.Split(','))
    .Where(arr => marketIdSet.Contains(arr[0]));

请注意,由于SelectWhere 的延迟执行,这只是一个查询,尚未执行。因此,每当您使用allFields 时,您将再次执行此查询。所以创建一个集合是个好主意,f.e.用allFields.ToList() 传递给StoreValues

StoreValues(allFields.ToList(), file); //store the prices

如果您传递一个集合,您可以真正受益于在StoreValues 中使用Parallel.ForEach

【讨论】:

  • 使用 Parallel.ForEach 因为我会扫描大约 100 个文件。 cs 文件将包含从 00:00 到 23:59 的定价数据。我只对我的订单执行在那个时间内的 csv 文件感兴趣。所以我检查第一行(获取列值dor publishedDatetime)和最后一行。然后我检查我的订单执行时间,如果它在这些时间之内,那么订单将使用该 csv 文件中的价格执行。延迟执行似乎导致延迟,因为当我将 IEnumerable 作为 ToList() 传递时,首先需要时间来转换它
  • @user1535623:延迟执行不会导致延迟,如果您调用ToList,则执行查询。看看我的 cmets 以及我刚刚添加的最后一行。
  • Tim: 设法用你的方法减少 8-9 秒 :) 最后我在你的帮助下把它减少到 10 秒,但它应该是最多 2.5 到 3 秒。任何其他有用的建议
  • @user1535623 如果您在Parallel.ForEach() 中的任务更长,您可能会更快。每个任务都很短,因此在线程池上执行任务的开销太高。因此,您可以尝试对一些行进行分组,而不是在 IEnumerable&lt;string[]&gt; 上运行并行 foreach。比如IEnumerable&lt;IEnumerable&lt;string[]&gt;&gt;,比如:var result = list.Select((item, index) =&gt; new {item, index}).GroupBy(itemWithIndex =&gt; itemWithIndex.index % 16);
【解决方案2】:
static void Main()
{
    //var lines = File.ReadLines(file).ToList();

    // this is just a fast generation for sample data
    var lines = Enumerable.Range(0, 500000)
                            .Select(i => string.Join(",", i % 7, i, i & 2))
                            .ToList();

    // HashSet will work as an indexed store and will match faster in your loop
    var distinctMarketIds = new HashSet<string>{
        "0", "2", "3", "5"
    };

    // Do this if you are to use the method syntax instead of the query syntax
    // var newList = lines.Select(l=>l.Split(','))
    //                    .Where(ps=>distinctMarketIds.Contains(ps[0]));

    var newList = from l in lines
                  // this will parse the string once versus twice as you were doing before
                  let ps = l.Split(',')
                  where distinctMarketIds.Contains(ps[0])
                  select ps;

    // can't see the content of your `StoreValues` method but writing to a 
    //    file in parallel will never work as expected.  
    using (var stream = new StreamWriter("outfile.txt"))
        foreach (var l in newList)
            stream.WriteLine(string.Join(";", l));

}

【讨论】:

    猜你喜欢
    • 2013-10-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-04-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多