【问题标题】:Tasks mismatch inputs任务不匹配输入
【发布时间】:2017-12-20 12:13:35
【问题描述】:

我遇到了一个小问题。我有一个 BackgroundWorker,它逐行读取文本文件。我想对那些读取的行执行操作,但是这个操作需要一段时间,所以我想我可以为这种工作创建并行运行的任务。到目前为止,这是我的代码:

List<Task> tasks = new List<Task>();

using (StreamReader sr = new StreamReader(this.AppData_Path + this.Playlists_File))
{
    string line = "";

    while ((line = sr.ReadLine()) != null)
    {
        if (!string.IsNullOrWhiteSpace(line))
        {
            Task temp = Task.Factory.StartNew(() => AddSearchPlaylistToList(line));

            tasks.Add(temp);
        }

        if (tasks.Count >= MaxThreads) // MaxThreads = 20
        {
            Task.WaitAll(tasks.ToArray());

            tasks = new List<Task>();
        }
    }
}

现在这是我的问题开始的地方,出于某种奇怪的原因,多个任务在同一行上工作。在将 BackgroundWorker 中的行提交给 Task 之前,我将其打印出来,看起来像这样:

line1 345893798537598375 
line2 435803948508394534 
line3 475734573478534879 
line4 438348975347895798 
line5 234234234234234242
...

当我在 AddSearchPlaylistToList void 中打印 line 参数时,任务正在处理,由于某种原因,它看起来像这样:

line1 345893798537598375 
line1 345893798537598375 
line2 435803948508394534 
line2 435803948508394534
line2 435803948508394534
line3 475734573478534879
...

所以输入似乎搞砸了,但我无法解释原因。

【问题讨论】:

  • 这看起来像是应该使用Parallel.ForEach的情况
  • 如果您使用任务,则根本不需要 BGW。这无异于等同于没有await 能力的Task.Run。您也不需要任务列表,或者手动设置线程数 - 这就是 Tasks 和 Task 调度程序的用途。
  • AddSearchPlaylistToList 是做什么的?您可能根本不需要任务,或者您可以对其进行修改,使其与任务一起运行得更好。例如,如果它将任何内容附加到列表中,您可以在文件上使用 LINQ 并直接从行中获取结果列表。或者您可以将列表替换为可以由多个线程修改而无需锁定的 ConcurrentQueue
  • 我会选择@PanagiotisKanavos 提供的Parallel.ForEach 选项。无需重新发明轮子。我是根据经验说话的;)曾经做过同样的事情,当时我不知道这种Parallel 的东西已经存在

标签: c# multithreading task


【解决方案1】:

File.ReadLines 已经在文件的行上返回了一个枚举器,因此您不需要直接使用 StreamReader。您也不需要任务列表来收集所有任务。你可以写:

var tasks= File.ReadLines(somePath)
                .Select(line=>Task.Run(AddSearchPlaylistToList(line)));
await Task.WhenAll(tasks);

如果您必须限制并发任务的数量(为什么?这很重要),您可以使用自定义 TaskScheduler。一个更简单的选择是使用具有 MaxDOP 限制的Parallel.ForEach,例如:

var lines=File.ReadLines(somePath);
var options = new ParallelOptions { MaxDegreesOfParallelism = 20};
Parallel.ForEach(lines,options,line=>AddSearchPlaylistToList(line));

【讨论】:

  • 可能你应该排除空行,​​比如 OP 尝试使用 if 条件
【解决方案2】:

试试这个:只需将该行保存在一个额外的局部变量中,然后将该变量传递给任务:

if (!string.IsNullOrWhiteSpace(line))
{
    string tempLine = line;
    Task temp = Task.Factory.StartNew(() => AddSearchPlaylistToList(tempLine ));

    tasks.Add(temp);
}

这种现象称为闭包。最好的articles 之一来自 Jon Skeet。

lambda () =&gt; AddSearchPlaylistToList(temp) 捕获变量行的值并将其扩展到变量的整个生命周期。

基本上这意味着,当任务真正开始时(很难说它到底是什么时候发生的),它会回到这一行:

Task temp = Task.Factory.StartNew(() => AddSearchPlaylistToList(line));

只有在这一点上,它才会从变量line 中获取值。但是现在你的循环已经推进了,这个值现在可能与创建该任务的时间不同。同时启动的多个任务将从line 获取相同的值。可能发生的最糟糕的事情是,当任务开始时line 实际上可能是空的,并且本应防止这种情况发生的 if 子句变得无用。

临时保存确保line的每个值都将被独立捕获。

【讨论】:

  • 更好的选择是将值作为状态参数传递给 StartNew。更好的是使用适当的 TPL 类并避免手动迭代、任务列表和Task.StartNew
  • @PanagiotisKanavos 好建议,我个人会使用Parallel.For 选项。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-09-24
  • 2013-05-24
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多