【发布时间】:2017-04-24 22:05:48
【问题描述】:
我正在创建一个使用 TPL DataFlow 的任务处理器。我将遵循生产者消费者模型,在该模型中,生产者会偶尔生产一些要处理的项目,而消费者会一直等待新项目的到来。这是我的代码:
async Task Main()
{
var runner = new Runner();
CancellationTokenSource cts = new CancellationTokenSource();
Task runnerTask = runner.ExecuteAsync(cts.Token);
await Task.WhenAll(runnerTask);
}
public class Runner
{
public async Task ExecuteAsync(CancellationToken cancellationToken) {
var random = new Random();
ActionMeshProcessor processor = new ActionMeshProcessor();
await processor.Init(cancellationToken);
while (!cancellationToken.IsCancellationRequested)
{
await Task.Delay(TimeSpan.FromSeconds(1)); // wait before enqueuing more
int[] items = GetItems(random.Next(3, 7));
await processor.ProcessBlockAsync(items);
}
}
private int[] GetItems(int count)
{
Random randNum = new Random();
int[] arr = new int[count];
for (int i = 0; i < count; i++)
{
arr[i] = randNum.Next(10, 20);
}
return arr;
}
}
public class ActionMeshProcessor
{
private TransformBlock<int, int> Transformer { get; set; }
private ActionBlock<int> CompletionAnnouncer { get; set; }
public async Task Init(CancellationToken cancellationToken)
{
var options = new ExecutionDataflowBlockOptions
{
CancellationToken = cancellationToken,
MaxDegreeOfParallelism = 5,
BoundedCapacity = 5
};
this.Transformer = new TransformBlock<int, int>(async input => {
await Task.Delay(TimeSpan.FromSeconds(1)); //donig something complex here!
if (input > 15)
{
throw new Exception($"I can't handle this number: {input}");
}
return input + 1;
}, options);
this.CompletionAnnouncer = new ActionBlock<int>(async input =>
{
Console.WriteLine($"Completed: {input}");
await Task.FromResult(0);
}, options);
this.Transformer.LinkTo(this.CompletionAnnouncer);
await Task.FromResult(0); // what do I await here?
}
public async Task ProcessBlockAsync(int[] arr)
{
foreach (var item in arr)
{
await this.Transformer.SendAsync(item); // await if there are no free slots
}
}
}
我在上面添加了一个条件检查来引发异常以模仿异常情况。
这是我的问题:
在不降低整个网格的情况下处理上述网格中的异常的最佳方法是什么?
有没有更好的方法来初始化/启动/继续一个永无止境的 DataFlow 网格?
我在哪里等待完成?
我已经查看了this similar question
【问题讨论】:
标签: c# concurrency task-parallel-library tpl-dataflow