【问题标题】:Unexpected Parallel.ForEach loop behavior意外的 Parallel.ForEach 循环行为
【发布时间】:2021-02-19 20:56:10
【问题描述】:

您好,我正在尝试使用Parallel.ForEach 循环来模拟多线程。以下是我的功能:

public void PollOnServiceStart()
{
    constants = new ConstantsUtil();
    constants.InitializeConfiguration();

    HashSet<string> newFiles = new HashSet<string>();

    //string serviceName = MetadataDbContext.GetServiceName();

    var dequeuedItems = MetadataDbContext
        .UpdateOdfsServiceEntriesForProcessingOnStart();
    var handlers = Producer.GetParserHandlers(dequeuedItems);

    while (handlers.Any())
    {
        Parallel.ForEach(handlers,
            new ParallelOptions { MaxDegreeOfParallelism = 4 },
            handler =>
            {
                Logger.Info($"Started  processing a file remaining in Parallel ForEach");
                handler.Execute();
                Logger.Info($"Enqueing one file for next process");
                dequeuedItems = MetadataDbContext
                    .UpdateOdfsServiceEntriesForProcessingOnPollInterval(1);
                handlers = Producer.GetParserHandlers(dequeuedItems);
            });

        int filesRemovedCount = Producer.RemoveTransferredFiles();
        Logger.Info($"{filesRemovedCount} files removed from {Constants.OUTPUT_FOLDER}");
    }
}

所以解释一下发生了什么。函数UpdateOdfsServiceEntriesForProcessingOnStart() 获得4 个文件名(4 个因为并行计数)并将它们添加到一个名为ParserHandler 的线程安全对象中。然后将这些对象放入列表var handlers

我的想法是遍历这个处理程序列表并调用handler.Execute()

Handler.Execute() 将文件从网络位置复制到本地驱动器,解析文件并创建多个输出文件,然后将所述文件发送到网络位置并更新数据库表。

我对这个 Parallel For Each 循环的期望是,在Handler.Execute() 调用之后,UpdateOdfsServiceEntriesForProcessingOnPollInterval(1) 函数将从它读取的 db 表中添加一个新文件名到出队的项目容器中,然后该容器将作为一个项目传递到重新创建的处理程序列表。这样,在一个文件执行完毕后,每个并行循环都会有一个新文件代替它。

但是发生的情况是,虽然我确实添加了一个新文件,但它并没有被下一个可用线程执行。相反,每个并行都必须完成前 4 个文件的执行,然后它将拾取下一个文件。意思是,在前 4 个并行运行之后,一次只运行一个文件,从而使并行循环的整个点无效。在所有 4 个文件完成 Execute() 调用之前添加的初始文件永远不会执行。

IE:

(Start1, Start2, Start3, Start4) 一次。应该发生的事情类似于 (End2, Start5),然后是 (End3, Start6)。但是正在发生的事情是(结束 2,结束 3,结束 1,结束 4),然后是 Start5。结束 5。开始 6,结束 6。 为什么会这样?

因为我们想在一台机器上部署这个服务应用的多个实例,所以让一个巨大的列表在队列中等待是没有好处的。这很浪费,因为其他应用实例无法处理。

【问题讨论】:

  • 这里有更好的技术可以使用。可能是 TPL 数据流,也可能是 RX,甚至可能混合了可靠的消息总线和专用的处理服务。然而,由于设计的拓扑结构不清楚,并且您可能关注的是 X/Y 问题的 Y,因此很难给出任何建议。如果您有一个需要以最大并行度进行序列化和处理的项目队列。我的第一选择是一个简单的ActionBlock,将MaxDegreeOfParallelismBoundedCapacity 设置为您希望一次飞行的最大工作数量
  • 或者,如果您想要一个具体的答案,您需要解释 UpdateOdfsServiceEntriesForProcessingOnStartGetParserHandlers 的确切性质以及任何进一步的系统范围的并发限制
  • 你有没有追查到发生了什么。根据我的经验,我会同时看到 (Start1, Start2, Start3, Start4),然后看到 (End2, Start5),然后是 (End3, Start6) 等。当一项工作结束时,另一个立即开始(但还有另外三个在运行)。就我而言,我仍然会一直进行 4 个工作单元,但你会看到一个结束,另一个立即开始。这有点奇怪,但请考虑一下。
  • Parallel.ForEach 在涉及 I/O 操作时不应使用,在这种情况下最好使用 async/wait 方法。总体检查线程池和管道设计模式以解决您的问题。
  • 您可以异步执行两种工作(粗略地说),I/O Bound 工作CPU Bound 工作。如果你在做 I/O 工作(比如与磁盘驱动器或 Web 服务通信,那么你可以通过使用 Async 和 await 来利用 I/O 操作的自然异步。如果你在做 CPU 工作(复杂的计算,比如做 Ray跟踪),那么最好通过将工作分派到处理器拥有的尽可能多的内核来加载 CPU。对于某些工作负载来说,分界线在哪里可能很难分辨。有时很容易

标签: c# parallel-processing parallel.foreach


【解决方案1】:

我正在写一个应该很长的评论作为答案,尽管这是一个糟糕的答案,因为它没有回答问题。

请注意,并行文件系统操作不太可能使它们更快,尤其是在存储是经典硬盘的情况下。磁盘的磁头不能同时在 N 个位置,如果你告诉它这样做只会浪费它的大部分时间旅行而不是读取或写入。

克服访问文件系统所带来的瓶颈的最佳方法是确保磁盘随时都有工作要做。不要停止磁盘的工作以进行计算或从/向数据库获取/保存数据。要做到这一点,您必须同时运行多个工作流。一个工作流将完全与磁盘进行 I/O,另一个工作流将与数据库连续对话,第三个工作流将通过一个接一个地进行计算来利用 CPU,等等。这种方法称为任务并行(并行执行异构工作),而不是数据并行性(并行执行同质工作,Parallel.ForEach 的专长)。它也称为流水线,因为为了使所有工作流同时运行,您必须在它们之间放置中间缓冲区,因此您创建了一个数据流从缓冲区到缓冲区的管道。用于此类操作的另一个术语是生产者-消费者模式,它描述了一个短管道,仅由两个构建块组成,第一个是生产者,第二个是消费者。

目前可用于创建管道的最强大工具¹是TPL Dataflow 库。它提供了各种可以相互链接的“块”(管道段),并且可以覆盖大多数场景。您所做的是实例化将构成管道的块,配置它们,告诉每个人应该做什么,将它们链接在一起,为第一个块提供应该处理的初始原始数据,然后最后一个块的Completion 最后是await。您可以查看使用 TPL 数据流库 here 的示例。

¹ 在 .NET 平台中作为内置库提供。还存在强大的第三方工具,例如 Akka.NET。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-03-02
    相关资源
    最近更新 更多