【发布时间】:2021-11-10 18:14:02
【问题描述】:
我好像不懂TPL Dataflow错误处理。
假设我有一个我想要处理的项目列表,我为此使用了一个 ActionBlock:
var actionBlock = new ActionBlock<int[]>(async tasks =>
{
foreach (var task in tasks)
{
await Task.Delay(1);
if (task > 30)
{
throw new InvalidOperationException();
}
Console.WriteLine("{0} Completed", task);
}
}, new ExecutionDataflowBlockOptions
{
BoundedCapacity = 200,
MaxDegreeOfParallelism = 4
});
for (i = 0; i < 10000; i++)
{
if (!await bufferBlock.SendAsync(i))
{
break;
}
}
actionBlock.Complete();
await actionBlock.Completion;
如果发生错误,块将转换为故障状态,并且 SendAsync(...) 返回 false。我可以停止我的循环并完成它,当我等待完成时会引发异常。到目前为止一切顺利。
当我在两者之间放置一个 BufferBlock 时,它不再起作用:
bufferBlock.LinkTo(actionBlock, new DataflowLinkOptions
{
PropagateCompletion = true
});
for (i = 0; i < 10000; i++)
{
if (!await bufferBlock.SendAsync(i, cts.Token))
{
break;
}
}
bufferBlock.Complete();
await actionBlock.Completion;
对 SendAsync() 的调用只是永远“阻塞”,因为 BufferBlock 永远不会转换到故障状态。
我找到的唯一解决方案是:
using (var cts = new CancellationTokenSource())
{
actionBlock.Completion.ContinueWith(x =>
{
if (x.Status != TaskStatus.RanToCompletion)
{
cts.Cancel();
}
});
var i = 0;
try
{
for (i = 0; i < 10000; i++)
{
if (cts.Token.IsCancellationRequested)
{
break;
}
if (!await bufferBlock.SendAsync(i, cts.Token))
{
break;
}
}
}
catch (OperationCanceledException)
{
}
bufferBlock.Complete();
await actionBlock.Completion;
}
因为状态会传播,所以我必须监听网络中最后一个块的状态,当这个块停止时,我必须停止循环。
这是使用 Dataflow 库的预期方式还是有更好的解决方案?
【问题讨论】:
-
这里是相关问题:TPL Dataflow exception in transform block with bounded capacity。顺便说一句,我对这个问题的回答很好地解决了这个棘手的问题。这个 GitHub 问题显示了问题的另一个方面:No way to cancel completing dataflow blocks。
-
这不是处理管道错误的正确方法,错误也不应该倒流。块不是函数调用,链接不代表任何类型的所有权。这不是 TPL 数据流问题。 Go 频道也是如此。
标签: c# task-parallel-library tpl-dataflow