【问题标题】:How to create never ending DataFlow Mesh with exception handling?如何使用异常处理创建永无止境的 DataFlow Mesh?
【发布时间】:2017-04-24 22:05:48
【问题描述】:

我正在创建一个使用 TPL DataFlow 的任务处理器。我将遵循生产者消费者模型,在该模型中,生产者会偶尔生产一些要处理的项目,而消费者会一直等待新项目的到来。这是我的代码:

async Task Main()
{
    var runner = new Runner();
    CancellationTokenSource cts = new CancellationTokenSource();
    Task runnerTask = runner.ExecuteAsync(cts.Token);

    await Task.WhenAll(runnerTask);
}

public class Runner
{
    public async Task ExecuteAsync(CancellationToken cancellationToken) {
        var random = new Random();

        ActionMeshProcessor processor = new ActionMeshProcessor();
        await processor.Init(cancellationToken);

        while (!cancellationToken.IsCancellationRequested)
        {
            await Task.Delay(TimeSpan.FromSeconds(1)); // wait before enqueuing more

            int[] items = GetItems(random.Next(3, 7));

            await processor.ProcessBlockAsync(items);
        }
    }

    private int[] GetItems(int count)
    {
        Random randNum = new Random();

        int[] arr = new int[count];
        for (int i = 0; i < count; i++)
        {
            arr[i] = randNum.Next(10, 20);
        }

        return arr;
    }
}

public class ActionMeshProcessor
{
    private TransformBlock<int, int> Transformer { get; set; }
    private ActionBlock<int> CompletionAnnouncer { get; set; }

    public async Task Init(CancellationToken cancellationToken)
    {
        var options = new ExecutionDataflowBlockOptions
        {
            CancellationToken = cancellationToken,
            MaxDegreeOfParallelism = 5,
            BoundedCapacity = 5
        };


        this.Transformer = new TransformBlock<int, int>(async input => {

            await Task.Delay(TimeSpan.FromSeconds(1)); //donig something complex here!

            if (input > 15)
            {
                throw new Exception($"I can't handle this number: {input}");
            }

            return input + 1;
        }, options);

        this.CompletionAnnouncer = new ActionBlock<int>(async input =>
        {
            Console.WriteLine($"Completed: {input}");

            await Task.FromResult(0);
        }, options);

        this.Transformer.LinkTo(this.CompletionAnnouncer);

        await Task.FromResult(0); // what do I await here?
    }

    public async Task ProcessBlockAsync(int[] arr)
    {
        foreach (var item in arr)
        {
            await this.Transformer.SendAsync(item); // await if there are no free slots
        }       
    }
}

我在上面添加了一个条件检查来引发异常以模仿异常情况。

这是我的问题:

  • 在不降低整个网格的情况下处理上述网格中的异常的最佳方法是什么?

  • 有没有更好的方法来初始化/启动/继续一个永无止境的 DataFlow 网格?

  • 我在哪里等待完成?

我已经查看了this similar question

【问题讨论】:

    标签: c# concurrency task-parallel-library tpl-dataflow


    【解决方案1】:

    例外情况

    init 中没有任何异步内容,它可能是标准的同步构造函数。您可以在提供给块的 lamda 中使用简单的 try catch 来处理网格中的异常,而无需关闭网格。然后,您可以通过过滤网格中的结果或忽略以下块中的结果来处理这种情况。下面是一个过滤的例子。对于int 的简单情况,您可以使用int? 并过滤掉null 的任何值,或者当然您可以根据需要设置任何类型的魔术指标值。如果您实际上传递了引用类型,您可以推出 null 或将数据项标记为脏数据项,以通过链接上的谓词检查。

    public class ActionMeshProcessor {
        private TransformBlock<int, int?> Transformer { get; set; }
        private ActionBlock<int?> CompletionAnnouncer { get; set; }
    
        public ActionMeshProcessor(CancellationToken cancellationToken) {
            var options = new ExecutionDataflowBlockOptions {
                CancellationToken = cancellationToken,
                MaxDegreeOfParallelism = 5,
                BoundedCapacity = 5
            };
    
    
            this.Transformer = new TransformBlock<int, int?>(async input => {
                try {
                    await Task.Delay(TimeSpan.FromSeconds(1)); //donig something complex here!
    
                    if (input > 15) {
                        throw new Exception($"I can't handle this number: {input}");
                    }
    
                    return input + 1;
                } catch (Exception ex) {
                    return null;
                }
    
            }, options);
    
            this.CompletionAnnouncer = new ActionBlock<int?>(async input =>
            {
                if (input == null) throw new ArgumentNullException("input");
    
                Console.WriteLine($"Completed: {input}");
    
                await Task.FromResult(0);
            }, options);
    
            //Filtering
            this.Transformer.LinkTo(this.CompletionAnnouncer, x => x != null);
            this.Transformer.LinkTo(DataflowBlock.NullTarget<int?>());
        }
    
        public async Task ProcessBlockAsync(int[] arr) {
            foreach (var item in arr) {
                await this.Transformer.SendAsync(item); // await if there are no free slots
            }
        }
    }
    

    完成

    您可以在您的处理器中公开Complete()Completion,并在您的应用关闭时使用它们来完成await,假设那是您唯一一次关闭网格。此外,请确保通过链接正确传播完成。

        //Filtering
        this.Transformer.LinkTo(this.CompletionAnnouncer, new DataflowLinkOptions() { PropagateCompletion = true }, x => x != null);
        this.Transformer.LinkTo(DataflowBlock.NullTarget<int?>());
    }        
    
    public void Complete() {
        Transformer.Complete();
    }
    
    public Task Completion {
        get { return CompletionAnnouncer.Completion; }
    }
    

    然后,根据您的样本,最有可能完成的地方是驱动您的处理的循环之外:

    public async Task ExecuteAsync(CancellationToken cancellationToken) {
        var random = new Random();
    
        ActionMeshProcessor processor = new ActionMeshProcessor();
        await processor.Init(cancellationToken);
    
        while (!cancellationToken.IsCancellationRequested) {
            await Task.Delay(TimeSpan.FromSeconds(1)); // wait before enqueuing more
    
            int[] items = GetItems(random.Next(3, 7));
    
            await processor.ProcessBlockAsync(items);
        }
        //asuming you don't intend to throw from cancellation
        processor.Complete();
        await processor.Completion();
    
    }
    

    【讨论】:

    • 这看起来不错,谢谢@JSteward,我将调试并查看是否遇到任何其他边缘情况并更新!
    猜你喜欢
    • 1970-01-01
    • 2019-02-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-03-05
    • 2019-04-17
    • 1970-01-01
    相关资源
    最近更新 更多