【问题标题】:Task Fails to Run任务运行失败
【发布时间】:2013-02-27 19:47:56
【问题描述】:

我编写了一个小实用程序来读取大型文本文件并搜索包含搜索词的行。我借此机会学习 TPL 数据流。

代码运行良好,除非搜索词接近文件末尾。在这种情况下,uiResult 操作块不会被调用除非其中有断点。

我的理解是数据从searcher 发布到uiResult之后 searcher 变得完整(它已经处理了最后一个数据块)。由于数据已发布到uiResult,因此在处理完该数据之前它不应该变得完整。

问题

为什么uiResult 已经完成,即使数据已经发送到它(除非在uiResult 中设置了断点)?

代码

以下是相关代码,尽可能精简:

ActionBlock<LineInfo> uiResult = new ActionBlock<LineInfo>(li =>
    {
        // If match found near end of file, the following line only runs
        // if a breakpoint is set on it:
        if (results != null) results.Add(li);
    },
    new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 1,
        CancellationToken = cancelSource.Token,
        TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext()
    });

BatchBlock<LineInfo> batcher = new BatchBlock<LineInfo>(5000); 

ActionBlock<LineInfo[]> searcher = new ActionBlock<LineInfo[]>(lines =>
    {
        foreach (LineInfo li in lines)
        {
            if (li.TextOfLine.Contains(searchTerm))
            {
                uiResult.Post(li);
            }
        }
    },
    new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 1,
        CancellationToken = cancelSource.Token
    });

batcher.LinkTo(searcher);

batcher.Completion.ContinueWith(t =>
{
    if (t.IsFaulted) ((IDataflowBlock)searcher).Fault(t.Exception);
    else searcher.Complete();

    if (t.IsFaulted) ((IDataflowBlock)uiResult).Fault(t.Exception);
    else uiResult.Complete();
});

Task.Run(() =>
    {
        using (FileStream fs = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
        using (BufferedStream bs = new BufferedStream(fs))
        using (StreamReader sr = new StreamReader(bs))
        {
            string line;
            while ((line = sr.ReadLine()) != null && cancelSource.IsCancellationRequested == false)
            {
                batcher.Post(new LineInfo() { LineNumber = lineNumber, OffsetOfLine = offset, TextOfLine = line });
            }

            batcher.Complete();
            try
            {
                searcher.Completion.Wait();
                uiResult.Completion.Wait();
            }
            catch (AggregateException ae)
            {
                TaskCanceledException taskCancelled = ae.InnerException as TaskCanceledException;
                if (taskCancelled != null)
                {
                    // Swallow the Exception if is just a user cancellation
                    throw;
                }
            }
            finally
            {
                signalDone();
            }
        }
    });

【问题讨论】:

  • 你真的是说设置断点会改变行为吗?这很可疑。
  • @svick:是的,我可以始终如一地重现这种行为。感觉就像断点改变了线程计时问题。不过我的理解是,编写的代码应该是确定性的。

标签: .net task-parallel-library tpl-dataflow


【解决方案1】:

由于您处理完成的方式,您的代码是不确定的。可能的事件顺序是这样的:

  1. Task 处理整个文件并在batcher 上调用Complete()
  2. batcher 处理最后一批,将其发送到 searcher 并完成。
  3. 继续执行,它在searcheruiResult 上调用Complete()
  4. 由于uiResult没有工作要做,它完成了。
  5. searcher 处理最后一批,尝试将每个结果发送到 uiResult。但是uiResult 已经完成,所以它拒绝一切。这意味着Post() 返回false,但您并没有对此进行检查。

所以问题是你试图向一个已经完成的块发送一些东西,这是行不通的。

解决方案是仅在块完成之前的块之后调用Complete()(即其Completion 完成)。可能最简单的方法是使用PropagateCompletionLinkTo()

【讨论】:

  • 您指出问题后,我看到了问题,但不明白解决方案。 PropagateCompletion 的文档有点轻。我将如何重构 batcher.Completion.ContinueWith 周围的代码以正确处理故障但等待 searcheruiResult 完成现有工作?
  • @EricJ。就像我说的,不要使用ContinueWith(),而是使用PropagateCompletion。这将在适当的时候正确完成或故障以下块。如果您想继续使用COntinueWith(),则需要两个延续:一个用于完成searcherbatcher.Completion,另一个用于完成searcher.CompletionuiResult
  • 谢谢,我的问题已经解决了。可以肯定的是我理解......在我的情况下searcheruiResult都是ActionBlock&lt;T&gt;s,所以我不能使用LinkTo(两者都没有实现IPropagatorBlock&lt;TInput, TOutput&gt;)。我确实通过使用带有批处理器->搜索器链接的PropogateCompletion 解决了这个问题。我是否也可以将searcher 更改为链接到uiResultTransformManyBlock&lt;T&gt; 并返回一个空枚举(对于与搜索不匹配的行)或一个元素的枚举(对于匹配的行)?跨度>
  • @EricJ。是的,我没有意识到这一点。 TransformManyBlock 可以工作,要么没有 batcher 你描述的方式,要么返回所有匹配元素的集合。另一种进行过滤的方法是使用带有谓词的LinkTo() 的重载,但这需要另一个块作为不匹配项的接收器(您可以为此使用NullTarget())。
  • 我在您的另一个答案中看到了提供多个 LinkTo 的选项,但在这种情况下不喜欢这样,因为它将行匹配逻辑放在谓词本身中。再次感谢你的帮助。我现在更了解 Dataflow。
猜你喜欢
  • 2018-11-14
  • 2015-11-13
  • 1970-01-01
  • 2020-03-07
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-07-13
  • 1970-01-01
相关资源
最近更新 更多