【问题标题】:TPL.Dataflow - preventing hangs when unhanded exception occurs in ActionBlock<T>TPL.Dataflow - 在 ActionBlock<T> 中发生未处理的异常时防止挂起
【发布时间】:2015-02-18 21:23:34
【问题描述】:

刚从System.Threading.Tasks.Dataflow 开始,不确定我是否了解ActionBlock 中未处理异常的正确错误处理技术。

我现在拥有的东西会导致挂起: - ActionBlock 有未处理的异常,不再处理 - 制作人无法完成,因为它已经结束了BoundedCapacity

这是我拥有的代码(已简化以显示一位消费者)。

internal class Program
{
    private static int _processCounter = 0;

    internal class MyClass
    {
        public MyClass(int id)
        {
            this.Id = id;
        }

        internal int Id { get; set; }
    }

    private static void Main(string[] args)
    {
        BufferBlock<MyClass> queue = new BufferBlock<MyClass>(new DataflowBlockOptions {BoundedCapacity = 10,});

        ActionBlock<MyClass> consumer =
            new ActionBlock<MyClass>(record => Process(record),
                new ExecutionDataflowBlockOptions {BoundedCapacity = 1,});

        queue.LinkTo(consumer, new DataflowLinkOptions {PropagateCompletion = true,});

        Task producer = Produce(queue);

        Trace.TraceInformation("Starting to wait on producer and consumer...");

        Task.WhenAll(producer, consumer.Completion).Wait(); // <-- this will hang. consumer.Completion is faulted, but producer is still "running".

    }

    private static async Task Produce(BufferBlock<MyClass> queue)
    {
        for (int i = 0; i < 20; i++)
        {
            await queue.SendAsync(new MyClass(i));
            Trace.TraceInformation("Sending object number {0}", i);
            await Task.Delay(1);
        }
        Trace.TraceInformation("Completing the producer");
        queue.Complete();
            // <-- we never get here because one of the SendAsync will be waiting to not excede BoundedCapacity = 10
    }

    private static void Process(MyClass myClass)
    {
        int counter = Interlocked.Increment(ref _processCounter);
        Trace.TraceInformation("Processing object number {0}", myClass.Id);
        if (counter > 4)
        {
            Trace.TraceInformation("About to throw exception for object {0}", myClass.Id);
            throw new ArgumentException("Something bad happened");
        }
    }
}

输出:

ConsoleApplication5.vshost.exe Information: 0 : Sending object number 0
ConsoleApplication5.vshost.exe Information: 0 : Starting to wait on producer and consumer...
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 1
ConsoleApplication5.vshost.exe Information: 0 : Processing object number 0
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 2
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 3
ConsoleApplication5.vshost.exe Information: 0 : Processing object number 1
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 4
ConsoleApplication5.vshost.exe Information: 0 : Processing object number 2
ConsoleApplication5.vshost.exe Information: 0 : Processing object number 3
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 5
ConsoleApplication5.vshost.exe Information: 0 : Processing object number 4
ConsoleApplication5.vshost.exe Information: 0 : About to throw exception for object 4
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 6
A first chance exception of type 'System.ArgumentException' occurred in ConsoleApplication5.exe
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 7
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 8
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 9
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 10
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 11
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 12
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 13
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 14
<never finishes>

问题是,等待此类执行以确保其完成或传播异常的正确方法是什么。 谢谢!

【问题讨论】:

    标签: c# .net tpl-dataflow


    【解决方案1】:

    您可以做很多事情,因为这与您如何构建代码有关。最简单的可能是使用CancellationToken作为生产者,然后先等待消费者:

    private static void Main(string[] args)
    {
        // ...
    
        var cts = new CancellationTokenSource();
        Task producer = Produce(queue, cts.Token);
    
        Trace.TraceInformation("Starting to wait on producer and consumer...");
        try
        {
            await consumer.Completion;
        }
        catch
        {
            cts.Cancel();
            // handle
        }
    
        try
        {
            await producer
        }
        catch
        {
            // handle
        }
    }
    
    private static async Task Produce(BufferBlock<MyClass> queue, CancellationToken token)
    {
        for (int i = 0; i < 20; i++)
        {
            await queue.SendAsync(new MyClass(i), token);
            Trace.TraceInformation("Sending object number {0}", i);
            await Task.Delay(1);
        }
        Trace.TraceInformation("Completing the producer");
        queue.Complete();
    }
    

    【讨论】:

    • 谢谢@i3arnon。它感觉这是一种填补框架空白的解决方法:挂起可能发生并不是很明显(在幸福的情况下也不会发生),并且必须编写额外的手动编排。谢谢!
    • @Alexey 没有任何问题。你只是在等待永远不会发生的事情。您根本不需要这样做。
    猜你喜欢
    • 1970-01-01
    • 2018-07-09
    • 2016-05-25
    • 1970-01-01
    • 2010-10-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多