【发布时间】:2021-02-19 20:56:10
【问题描述】:
您好,我正在尝试使用Parallel.ForEach 循环来模拟多线程。以下是我的功能:
public void PollOnServiceStart()
{
constants = new ConstantsUtil();
constants.InitializeConfiguration();
HashSet<string> newFiles = new HashSet<string>();
//string serviceName = MetadataDbContext.GetServiceName();
var dequeuedItems = MetadataDbContext
.UpdateOdfsServiceEntriesForProcessingOnStart();
var handlers = Producer.GetParserHandlers(dequeuedItems);
while (handlers.Any())
{
Parallel.ForEach(handlers,
new ParallelOptions { MaxDegreeOfParallelism = 4 },
handler =>
{
Logger.Info($"Started processing a file remaining in Parallel ForEach");
handler.Execute();
Logger.Info($"Enqueing one file for next process");
dequeuedItems = MetadataDbContext
.UpdateOdfsServiceEntriesForProcessingOnPollInterval(1);
handlers = Producer.GetParserHandlers(dequeuedItems);
});
int filesRemovedCount = Producer.RemoveTransferredFiles();
Logger.Info($"{filesRemovedCount} files removed from {Constants.OUTPUT_FOLDER}");
}
}
所以解释一下发生了什么。函数UpdateOdfsServiceEntriesForProcessingOnStart() 获得4 个文件名(4 个因为并行计数)并将它们添加到一个名为ParserHandler 的线程安全对象中。然后将这些对象放入列表var handlers。
我的想法是遍历这个处理程序列表并调用handler.Execute()。
Handler.Execute() 将文件从网络位置复制到本地驱动器,解析文件并创建多个输出文件,然后将所述文件发送到网络位置并更新数据库表。
我对这个 Parallel For Each 循环的期望是,在Handler.Execute() 调用之后,UpdateOdfsServiceEntriesForProcessingOnPollInterval(1) 函数将从它读取的 db 表中添加一个新文件名到出队的项目容器中,然后该容器将作为一个项目传递到重新创建的处理程序列表。这样,在一个文件执行完毕后,每个并行循环都会有一个新文件代替它。
但是发生的情况是,虽然我确实添加了一个新文件,但它并没有被下一个可用线程执行。相反,每个并行都必须完成前 4 个文件的执行,然后它将拾取下一个文件。意思是,在前 4 个并行运行之后,一次只运行一个文件,从而使并行循环的整个点无效。在所有 4 个文件完成 Execute() 调用之前添加的初始文件永远不会执行。
IE:
(Start1, Start2, Start3, Start4) 一次。应该发生的事情类似于 (End2, Start5),然后是 (End3, Start6)。但是正在发生的事情是(结束 2,结束 3,结束 1,结束 4),然后是 Start5。结束 5。开始 6,结束 6。 为什么会这样?
因为我们想在一台机器上部署这个服务应用的多个实例,所以让一个巨大的列表在队列中等待是没有好处的。这很浪费,因为其他应用实例无法处理。
【问题讨论】:
-
这里有更好的技术可以使用。可能是 TPL 数据流,也可能是 RX,甚至可能混合了可靠的消息总线和专用的处理服务。然而,由于设计的拓扑结构不清楚,并且您可能关注的是 X/Y 问题的 Y,因此很难给出任何建议。如果您有一个需要以最大并行度进行序列化和处理的项目队列。我的第一选择是一个简单的
ActionBlock,将MaxDegreeOfParallelism和BoundedCapacity设置为您希望一次飞行的最大工作数量 -
或者,如果您想要一个具体的答案,您需要解释
UpdateOdfsServiceEntriesForProcessingOnStart和GetParserHandlers的确切性质以及任何进一步的系统范围的并发限制 -
你有没有追查到发生了什么。根据我的经验,我会同时看到 (Start1, Start2, Start3, Start4),然后看到 (End2, Start5),然后是 (End3, Start6) 等。当一项工作结束时,另一个立即开始(但还有另外三个在运行)。就我而言,我仍然会一直进行 4 个工作单元,但你会看到一个结束,另一个立即开始。这有点奇怪,但请考虑一下。
-
Parallel.ForEach在涉及 I/O 操作时不应使用,在这种情况下最好使用 async/wait 方法。总体检查线程池和管道设计模式以解决您的问题。 -
您可以异步执行两种工作(粗略地说),I/O Bound 工作 和 CPU Bound 工作。如果你在做 I/O 工作(比如与磁盘驱动器或 Web 服务通信,那么你可以通过使用 Async 和 await 来利用 I/O 操作的自然异步。如果你在做 CPU 工作(复杂的计算,比如做 Ray跟踪),那么最好通过将工作分派到处理器拥有的尽可能多的内核来加载 CPU。对于某些工作负载来说,分界线在哪里可能很难分辨。有时很容易
标签: c# parallel-processing parallel.foreach