【问题标题】:Read Multiple Big Files in Using C# Async Programming使用 C# 异步编程读取多个大文件
【发布时间】:2020-07-06 14:36:14
【问题描述】:

我想读取可能包含数百万行的多个文件。

  1. 文件是多个我想同时处理多个文件。
  2. 每个文件包含数百万行,因此如果按顺序读取行,则需要时间。所以我想同时读取多行。
  3. 在读取的每一行上,根据行中的值进行一些处理。

我有几个问题。

问题: 由于文件读取是 I/O 操作,因此我应该使用异步编程来同时读取多个文件

  1. 简单的异步/等待模型
  2. Task.Run(Read_File(filePath))

【问题讨论】:

标签: c# async-await task-parallel-library streamreader


【解决方案1】:

您不会通过同时读取多个文件来节省时间,除非每个文件都位于不同的存储硬件中。从文件系统读取文件的速度受限于文件所在的存储硬件的能力,而不是 CPU 的能力。

为了节省时间,您可以做的是在不间断的工作流程中读取文件,并在不同的工作流程中处理行,这两个工作流程同时独立工作。生产者工作流和消费者工作流这两个工作流之间的通信可以使用具有阻塞或异步功能的中间缓冲区来实现。有许多可用选项,包括BlockingCollection 类、ChannelsTPL Dataflow 库。

【讨论】:

    【解决方案2】:

    当你读取一个文件,尤其是硬盘上的文件时,读取速度相对较慢,因为设备必须等到正确的扇区位于读取磁头下方。同时读取多个文件,不会提高性能,因为读取文件A的一个扇区后,读头必须移动读取文件B的一个扇区,然后返回读取文件A的下一个扇区。

    所以同时读取两个文件是不明智的。

    您想要做的是生产者-消费者模式:生产者尽可能快地生产数据,而消费者以不同的速度处理数据:有时比生产者慢,有时比生产者更快。尚未处理的数据必须被缓冲。

    如果您使用生产者-消费者模式,生产者会读取文件并将读取的行尽快放入缓冲区。每当文件读取器必须等待下一批数据时,消费者就有一些时间来处理已经生成的行。

    微软为此提供了一个简单的 Nuget 包:Microsoft Task Parallel Library

    首先创建一个缓冲区。所有读取的行都将存储在此缓冲区中:

    private BufferBlock<string> buffer = new BufferBlock<string>();
    

    读取文件并将读取的行存储在缓冲区中的异步过程:

    async Task ProduceLinesAsync(string fileName)
    {
        using (TextReader fileReader = File.OpenText(fileName))
        {
            string readLine = await fileReader.ReadLineAsync();
            while (readLine != null)
            {
                // a line has been read; put it on the buffer:
                buffer.SendAsync(readLine);
    
                // read the next line                
                readLine = await fileReader.ReadLineAsync();
            }
        }
    }
    

    处理多个文件的过程:

    async Task ProduceLinesAsync(IEnumerable<string> fileNames)
    {
        foreach (var fileName in fileNames)
        {
            await ProduceLinesAsync(fileName);
        }
    
        // If here, nothing to produce anymore.
        // tell the buffer that producing is finished:
        buffer.Complete();
    }
    

    如果需要,您可以让每个文件在不同的缓冲区上产生数据,并且每个缓冲区有一个消费者。

    消费者

    您在生产时看到所有等待:每当进程等待读取下一行时,消费者将有时间处理已经生产的行:

    Task ConsumeAsync()
    {
        while (await buffer.OutputAvailableAsync())
        {
            // there is something on the buffer; fetch it and process it:
            var line = await buffer.ReceiveAsync();
            this.ProcessLine(line);
        }
    
        // if here, producer marked Complete(), indicating that no data is to be expected
    }
    

    把它们放在一起:

    async Task ProcessFiles(IEnumerable<string> fileNames)
    {
        // start producing, but do not await:
        Task taskProduce = ProduceLinesAsync(fileNames);
    
        // because we did not await, we are free to do the following as soon as the
        // TextReader has to await for a line.
        // again, do not await.
        Task taskConsume = ConsumeAsync();
    
        // await until both the producer and the consumer are finished:
        await Task.WhenAll(new Task[] {taskProduce, taskConsume})
    }
    

    结论

    在async await中使用BufferBlock,只有当所有数据都处理完,下一行还没有读完,进程才会空闲等待。

    在所有其他情况下,每当您的进程必须等待 TextReader 生成下一行时,它将处理尚未处理的行。有工作要做时,您的进程永远不会无所事事。

    【讨论】:

      猜你喜欢
      • 2013-01-26
      • 1970-01-01
      • 1970-01-01
      • 2016-11-17
      • 2018-04-29
      • 2020-10-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多