【问题标题】:How to correctly write to a file using Parallel.ForEach?如何使用 Parallel.ForEach 正确写入文件?
【发布时间】:2016-05-24 05:44:11
【问题描述】:

我有一个任务,它逐行读取一个大文件,用它做一些逻辑,然后返回一个我需要写入文件的字符串。输出的顺序无关紧要。但是,当我尝试下面的代码时,它会在读取 15-20k 行文件后停止/变得非常慢。

public static Object FileLock = new Object();
...
Parallel.ForEach(System.IO.File.ReadLines(inputFile), (line, _, lineNumber) =>
{
    var output = MyComplexMethodReturnsAString(line);
    lock (FileLock)
    {
        using (var file = System.IO.File.AppendText(outputFile))
        {
            file.WriteLine(output);
        }
    }
});

为什么我的程序运行一段时间后变慢了?是否有更正确的方法来执行此任务?

【问题讨论】:

  • 是否需要输出行的顺序与输入的顺序相对应?如果是这样,Parallel.ForEach 不是正确的工具。
  • 我不确定,但感觉以这种方式使用并行正在创建/恶化 IO 瓶颈,而不是避免它。除非您在这些线路上进行非常昂贵的操作..
  • var file = System.IO.File.AppendText(outputFile) 可以放在 foreach 之外,因为您正在锁定它。检查这是否会提高性能。
  • 你有一个用于同步的“锁”,以确保只有一个线程可以写入文件。这肯定会减慢速度,因为它会按顺序限制写操作。多个线程会一直等待,直到文件被第一个线程写入。
  • 你的代码相当于说你需要用一把剪刀修剪你的草坪,但是,不要单独做(因为这会花很长时间),你会得到 100 个朋友帮你,但你说他们都得共用一把剪刀。

标签: c# parallel.foreach file-writing


【解决方案1】:

您实际上是通过让所有线程尝试写入文件来序列化您的查询。相反,您应该计算需要编写的内容,然后在它们结束时编写它们。

var processedLines = File.ReadLines(inputFile).AsParallel()
    .Select(l => MyComplexMethodReturnsAString(l));
File.AppendAllLines(outputFile, processedLines);

如果您需要刷新数据,请打开一个流并启用自动刷新(或手动刷新):

var processedLines = File.ReadLines(inputFile).AsParallel()
    .Select(l => MyComplexMethodReturnsAString(l));
using (var output = File.AppendText(outputFile))
{
    output.AutoFlush = true;
    foreach (var processedLine in processedLines)
        output.WriteLine(processedLine);
}

【讨论】:

  • 如果文件真的是文件,我不确定这种方法是否足够,因为第一步需要读取整个文件。
  • 不是在使用File.ReadLines() 时,它会为您提供一个枚举值,该枚举值将在读取文件的行时进行枚举。这与使用 File.ReadAllLines() 形成对比,后者返回一个包含文件所有行的数组。 读入整个文件。
【解决方案2】:

这与Parallel.ForEach 的内部负载平衡器的工作方式有关。当它看到你的线程花费大量时间阻塞时,它的理由是它可以通过向问题抛出更多线程来加快处理速度,从而导致更高的并行开销、FileLock 的争用和整体性能下降。

为什么会这样?因为Parallel.ForEach 不适合 IO 工作。

你怎么能解决这个问题? Parallel.ForEach 仅用于 CPU 工作,并在并行循环之外执行所有 IO。

一种快速的解决方法是通过使用接受ParallelOptions 的重载来限制Parallel.ForEach 允许登记的线程数,如下所示:

Parallel.ForEach(
    System.IO.File.ReadLines(inputFile),
    new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount },
    (line, _, lineNumber) =>
    {
        ...
    }

【讨论】:

  • 在“快速解决方法...”之前,我真的很喜欢您的回答。与你之前所说的一切相比,这似乎是一个倒退。也许如果你充实代码对我来说会更有意义。
  • 好奇:我一直认为 Environment.ProcessorCount 是 MaxDegreeOfParallelism 的自然限制。有错吗?
  • @TaW,不,它会远远超出Environment.ProcessorCount。这是一个小提琴,它显示每秒添加大约 1 个线程,直到你终止进程(我在 100 之后放弃了):dotnetfiddle.net/dT1eBM(不用说,你可能不应该在生产服务器上运行它)
猜你喜欢
  • 1970-01-01
  • 2019-05-09
  • 2020-11-08
  • 2012-05-28
  • 2020-10-23
  • 1970-01-01
  • 2013-10-18
  • 2015-07-02
  • 1970-01-01
相关资源
最近更新 更多