【问题标题】:Trying to efficiently parallel process some data from a File Stream in .NET (read -and- write)尝试有效地并行处理 .NET 中文件流中的一些数据(读写)
【发布时间】:2017-11-09 18:36:33
【问题描述】:

我正在尝试了解如何有效地从文件中读取一些数据,执行一些并行工作(每行),然后将新行写回文件系统。

我知道我可以做到这一点,一次一行.. 但我希望一次做几行 - 或者 - .. 如果一行“忙”等待异步工作完成,然后继续下一行,等等。

这里有一些示例数据和逻辑...

Header
SomeId#1, SomeId#2, SomeId#3, Name, Has this line been processed and cleaned(true/false)

File Data
444,2,12,Leia Organa, true
121,33333,4,Han Solo, true
1,2,3,Jane Doe, false
1,4,11,John Doe, false

所以前两行已经处理完毕,我将跳过这些行。 第 3 行和第 4 行需要处理。检查数据后,我希望将其保存回文件中,如

1,33333,3,Jane Doe, true

所以这是一般逻辑...

  • 读线
  • 致电DoWorkAsync()
  • 再次将此行保存回文件。

我只是希望不必等待DoWorkAsync() 完成,然后我就可以保存然后阅读下一行。我希望我可以开始阅读下一行......如果上一行完成......很好......然后将该行保存到文件中的相同行号......然后再次移动到下一行。

这就像我可以同时工作 5 或 10 行......等待结果从 3rd 方 api 返回......并行工作或其他。

这可以在 .NET 中完成吗?我确定 .NET 具有此功能.. 我只是看不到执行此操作的模式。

注意:我通常对 I/O 密集型操作(例如访问文件系统或调用某些 3rd 方 api 端点)执行 async/await 与我用于 CPU 密集型工作的 Parallel.ForEach。

注意:为什么true/false 在行尾?因为我不能一次处理所有的行。我有 api 限制。

其他想法是有两个文件,一个用于 PENDING,一个用于 PROCESSED。

【问题讨论】:

  • 好吧...如果您正在更新文件,您将不得不重新阅读它...不是吗?将所有行加载到内存有什么问题(使用 id = line_number,val = line,bool = status 的 poco),更新它,然后写回一次(避免反复读取和写入同一个文件?
  • 我试图看看我是否可以用低内存占用来做到这一点。当然我可以加载整个文件。不过我有大约 360 万行(截至目前)。
  • 好的开始... 3.6M ... 看起来无论哪种方式都会很糟糕... 我不知道您将如何同时进行阅读/更新 TBH ...我可能会将它分解为多个文件,或者只是输出到部分文件然后重新编译它们,但也许其他人有一个想法:)
  • heh @ multiple files -> 我刚刚在 OP 中对此发表了评论 :)
  • 我认为没有一种简单的方法可以只写入文件的一部分而不完全覆盖它(例如,没有简单的文件“随机访问”)。也许如果你不改变文件的字节长度,你可以使用某种流来写入特定的位置,但对我来说并行做这种工作似乎真的很困难。

标签: c# .net parallel-processing stream async-await


【解决方案1】:

这是一个并行处理器的存根,它在批量处理行时使用async/await

这种方法确保写入时保持相同的顺序。

public async Task ProcessFile()
{
    const int parallelism = 5;

    using (var readStream = File.OpenRead(@"c:\myinputfile"))
    {
        // put HERE your logic for skipping to a specific line
        // e.g. readStream.Seek(lastPosition); 

        using (var reader = new StreamReader(readStream))
        {
            while (!reader.EndOfStream)
            {
                var tasks = new List<Task<string>>();

                for (var i = 0; i < parallelism; i++)
                {   
                    var line = await reader.ReadLineAsync();

                    tasks.Add(DoWorkAsync(line));

                    if (reader.EndOfStream)
                        break;
                }

                var results = await Task.WhenAll(tasks);

                using (var writeStream = File.Open(@"d:\myresultfile", FileMode.Append))
                using (var writer = new StreamWriter(writeStream))
                {
                    foreach (var line in results)
                        await writer.WriteLineAsync(line);
                }
            }
        }
    }
}

public async Task<string> DoWorkAsync(string line)
{
    await Task.Delay(new Random().Next(1000, 5000));
    // do some work and return line with last parameter = true
    return line.Replace("false", "true"); // e.g.
}

它肯定需要改进,但它应该为你自己编写一个良好的基础。

【讨论】:

  • 良好的基础工作,但我仍然看不到他如何解决附加问题,这会将行写入文件的末尾。 MSDN: ``` 如果文件存在,则打开文件并查找文件末尾,或创建一个新文件。这需要 FileIOPermissionAccess.Append 权限。 FileMode.Append ... 试图寻找到文件末尾之前的位置会抛出 IOException 异常,任何尝试读取都会失败并抛出 NotSupportedException 异常。```
  • @Noctis 你能详细说明一下吗?此方法只是写入(附加)到另一个文件(与输入文件无关),写入新行时无需查找输出文件。我没有找到任何可靠的方法来读取和写入同一个文件而不在每次写入时都覆盖它。
  • 是的,这就是我的意思,你输出到不同的文件,而不是读取和写入同一个文件......我们在同一个页面上。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2015-12-19
  • 2011-09-27
  • 2022-01-27
  • 1970-01-01
  • 2022-08-10
  • 1970-01-01
  • 2017-12-12
相关资源
最近更新 更多