【问题标题】:How to process directory files in Task parallel library?如何处理任务并行库中的目录文件?
【发布时间】:2015-12-05 14:16:17
【问题描述】:

我有一个场景,我必须基于处理器内核并行处理多个文件(例如 30 个)。我必须根据处理器内核的数量将这些文件分配给单独的任务。我不知道如何对要处理的每个任务进行开始和结束限制。例如,每个任务都知道它必须处理多少个文件。

    private void ProcessFiles(object e)
    {
        try
        {
            var diectoryPath = _Configurations.Descendants().SingleOrDefault(Pr => Pr.Name == "DirectoryPath").Value;

            var FilePaths = Directory.EnumerateFiles(diectoryPath);
            int numCores = System.Environment.ProcessorCount;
            int NoOfTasks = FilePaths.Count() > numCores ? (FilePaths.Count()/ numCores) : FilePaths.Count();


            for (int i = 0; i < NoOfTasks; i++)
            {
                Task.Factory.StartNew(
                        () =>
                        {
                            int startIndex = 0, endIndex = 0;
                            for (int Count = startIndex; Count < endIndex; Count++)
                            {
                                this.ProcessFile(FilePaths);
                            }
                        });

            }
        }
        catch (Exception ex)
        {
            throw;
        }
    }

【问题讨论】:

  • 任务并行库将在后台处理多核架构。创建任务时,我们不需要关心可用的系统内核
  • 我绝对不是任务并行库方面的专家,但 TPL 不应该自己处理 CPU 内核的数量,并确定“拆分”工作负载的最佳方式吗?
  • 这里的问题可能是,如果目录中有100个文件,那么创建100个任务并不是一个好主意。所以你可以使用 Parallel.For 循环。它将在内部进行分区,并依靠自己的分区器建立并行处理。
  • 请注意,任务和并发算法的知识还需要并发集合和线程/并发安全数据交换算法的知识。在这里,您正在同时从多个任务访问 var FilePaths、IEnumerable。真是个坏主意。
  • 仅依赖于 TPL 的默认行为并不总是一个好主意。在许多情况下,需要限制并发级别,这可能就是这种情况。

标签: c# multithreading task-parallel-library file-processing


【解决方案1】:

对于像您这样的问题,C# 中有可用的并发数据结构。您想使用BlockingCollection 并将所有文件名存储在其中。

您使用机器上可用的内核数来计算任务数的想法不是很好。为什么?因为ProcessFile() 可能不会为每个文件花费相同的时间。因此,最好将任务数作为您拥有的核心数开始。然后,让每个任务从 BlockingCollection 中一个一个读取文件名,然后处理该文件,直到 BlockingCollection 为空。

try
{
    var directoryPath = _Configurations.Descendants().SingleOrDefault(Pr => Pr.Name == "DirectoryPath").Value;

    var filePaths = CreateBlockingCollection(directoryPath);
    //Start the same #tasks as the #cores (Assuming that #files > #cores)
    int taskCount = System.Environment.ProcessorCount;

    for (int i = 0; i < taskCount; i++)
    {
        Task.Factory.StartNew(
                () =>
                {
                    string fileName;
                    while (!filePaths.IsCompleted)
                    {
                         if (!filePaths.TryTake(out fileName)) continue;
                         this.ProcessFile(fileName);
                    }
                });
     }
}

CreateBlockingCollection() 如下:

private BlockingCollection<string> CreateBlockingCollection(string path)
{
    var allFiles = Directory.EnumerateFiles(path);
    var filePaths = new BlockingCollection<string>(allFiles.Count);
    foreach(var fileName in allFiles)
    {
        filePaths.Add(fileName);
    }
    filePaths.CompleteAdding();
    return filePaths;
}

您现在必须修改您的 ProcessFile() 以接收文件名,而不是获取所有文件路径并处理其块。

这种方法的优点是现在您的 CPU 不会被过度或不足订阅,负载也将均匀平衡。


我自己没有运行代码,所以我的代码中可能存在一些语法错误。如果您遇到任何错误,请随时更正。

【讨论】:

  • 谢谢伙计,但是我如何才能使订单同步,因为我必须按处理文件的顺序处理文件。此外,如果出现异常,我将如何处理错误文件。我还必须将处理后的文件传递给 UI 线程以使用文件内容更新 GUI。
  • 您可以通过传递到队列中的BlockingCollection 来保存订单,例如[本例][stackoverflow.com/a/3825322/213550]。您可以检查每个文件的每个任务的Exception 属性,并查看它是否不为空。您可以通过ContinueWithWhenAny 方法来更新 UI。
  • @ehafeez:VMAtm 的建议是正确的。试试看。
【解决方案2】:

基于我公认的对 TPL 的有限理解,我认为您的代码可以这样重写:

private void ProcessFiles(object e)
{
    try
    {
        var diectoryPath = _Configurations.Descendants().SingleOrDefault(Pr => Pr.Name == "DirectoryPath").Value;

        var FilePaths = Directory.EnumerateFiles(diectoryPath);

        Parallel.ForEach(FilePaths, path => this.ProcessFile(path));

    }
    catch (Exception ex)
    {
        throw;
    }
}

问候

【讨论】:

  • 文件一次可以有 1000 个,所以我不能使用 parallel.foreach,因为一旦处理完文件,我必须实时更新 GUI。
  • 这不在您的 OP 中。正如您可能想象的那样,我们没有水晶球来阅读您的所有要求。下次,请确保在您的问题中包含您的所有要求,而不是在给出答案后一次添加。谢谢。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-04-19
  • 1970-01-01
  • 2015-08-04
相关资源
最近更新 更多