【问题标题】:TPL DataFlow proper way to handle exceptionsTPL DataFlow 处理异常的正确方法
【发布时间】:2019-10-28 14:54:15
【问题描述】:

我在使用 TPL DataFlow 管理队列(数据库)并将工作重定向到网格计算服务的 Windows 服务中遇到问题。并且在某一时刻 BufferBlock 停止释放任务,我不知道为什么。我认为这是因为在某些任务的执行过程中发生了一些异常,但是它们被抑制了,很难理解 BufferBlock 何时停止接受新任务。

我试图在下面的工作示例中简化它。 它没有任何异常处理,我想知道如何正确处理 TPL 中的异常。 我在这里找到了类似的东西TPL Dataflow, guarantee completion only when ALL source data blocks completed。 在此示例中,我有 100 个请求,并以 10 个请求批量处理数据。 模拟 ID % 9 == 0 时发生的一些异常 如果我没有捕捉到这个异常,它会工作一点,然后停止接受新的请求。 如果我处理并返回 Result.Failure 我相信它可以正常工作,但我不确定这是否是在生产环境中使用它的正确方法。

我是 TPL 的新手,如果我没有更清楚地解释我的问题,请忘记我。 GitHub Project

Image Empty Slots

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Timers;
using CSharpFunctionalExtensions;

namespace TestTPL
{
    public class ServicePipeline
    {
        public const int batches = 100;
        private int currentBatch = 0;

        public ServicePipeline(int maxRequestsInParallel)
        {
            MaxRequestsInParallel = maxRequestsInParallel;
        }

        public int MaxRequestsInParallel { get; }
        public BufferBlock<MyData> QueueBlock { get; private set; }
        public List<TransformBlock<MyData, Result>> ExecutionBlocks
            { get; private set; }
        public ActionBlock<Result> ResultBlock { get; private set; }

        private void Init()
        {
            QueueBlock = new BufferBlock<MyData>(new DataflowBlockOptions()
                { BoundedCapacity = MaxRequestsInParallel });
            ExecutionBlocks = new List<TransformBlock<MyData, Result>>();
            ResultBlock = new ActionBlock<Result>(_ => _.OnFailure(
                () => Console.WriteLine($"Error: {_.Error}")));

            for (int blockIndex = 0; blockIndex < MaxRequestsInParallel;
                blockIndex++)
            {
                var executionBlock = new TransformBlock<MyData, Result>((d) =>
                {
                    return ExecuteAsync(d);
                }, new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 });
                executionBlock.LinkTo(ResultBlock, new DataflowLinkOptions()
                    { PropagateCompletion = true });
                QueueBlock.LinkTo(executionBlock, new DataflowLinkOptions()
                    { PropagateCompletion = true });
                ExecutionBlocks.Add(executionBlock);
            }
        }

        public static Result ExecuteAsync(MyData myData)
        {
            //try
            //{
            WebClient web = new WebClient();
            TaskCompletionSource<Result> res = new TaskCompletionSource<Result>();
            Task task = Task<Result>.Run(() => web.DownloadStringAsync(
                new Uri("http://localhost:49182/Slow.ashx")));
            task.Wait();
            Console.WriteLine($"Data = {myData}");
            if (myData != null && myData.Id % 9 == 0)
                throw new Exception("Test");
            return Result.Ok();
            //}
            //catch (Exception ex)
            //{
            //    return Result.Failure($"Exception: {ex.Message}");
            //}
        }

        public async void Start()
        {
            Init();
            while (currentBatch < batches)
            {
                Thread.Sleep(1000);
                await SubmitNextRequests();
            }
            Console.WriteLine($"Completed: {batches}");
        }

        private async Task<int> SubmitNextRequests()
        {
            var emptySlots = MaxRequestsInParallel - QueueBlock.Count;
            Console.WriteLine($"Empty slots: {emptySlots}" +
                $", left = {batches - currentBatch}");
            if (emptySlots > 0)
            {
                var dataRequests = await GetNextRequests(emptySlots);
                foreach (var data in dataRequests)
                {
                    await QueueBlock.SendAsync(data);
                }
            }
            return emptySlots;
        }

        private async Task<List<MyData>> GetNextRequests(int request)
        {
            MyData[] myDatas = new MyData[request];
            Task<List<MyData>> task = Task<List<MyData>>.Run(() =>
            {
                for (int i = 0; i < request; i++)
                {
                    myDatas[i++] = new MyData(currentBatch);
                    currentBatch++;
                }
                return new List<MyData>(myDatas);
            });
            return await task;
        }
    }

    public class MyData
    {
        public int Id { get; set; }
        public MyData(int id) => Id = id;
        public override string ToString() { return Id.ToString(); }
    }
}

编辑:2019 年 10 月 30 日 当异常被处理和显式调用时,它按预期工作 Result.Failure($"Exception: {ex.Message}");

    public static Result ExecuteAsync(MyData myData)
    {
        try
        {
            WebClient web = new WebClient();
            TaskCompletionSource<Result> res = new TaskCompletionSource<Result>();
            Task task = Task<Result>.Run(() => Thread.Sleep(2000));
            task.Wait();
            Console.WriteLine($"Data = {myData}");
            if (myData != null && myData.Id % 9 == 0)
                throw new Exception("Test");
            return Result.Ok();
        }
        catch (Exception ex)
        {
            return Result.Failure($"Exception: {ex.Message}");
        }
    }

【问题讨论】:

  • 你给我们的代码不能在本地测试中编译(我们缺少一些像Result这样的类,我们无权访问localhost:49182/Slow.ashx。因为我还不清楚这个问题并且代码没有显示所需的输出,也没有显示问题,恐怕您需要详细说明您的问题。
  • 嗨彼得,我刚刚在GitHub - TestTPL 上分享了这个示例项目。它已经注释掉了 TransformBlock 调用的 ExecuteAsync 中捕获的异常。在这种情况下,当一次调用 SubmitNextRequests 时,它不会有空槽来处理下一个请求
  • 你为什么要创建多个TransformBlocks?你不知道MaxDegreeOfParallelism 选项吗?
  • 感谢@TheodorZoulias 我看到了 MaxDegreeOfParallelism,还没有深入研究它。但我同意,多个 TranformBlock 对我来说也很奇怪。只是尝试使用管道解决现有生产问题会停止接收新请求,并在我的示例中复制了主要逻辑
  • 老实说,我认为通过重构代码以使用单个 TranformBlockMaxDegreeOfParallelism = MaxRequestsInParallel 来解决问题比尝试修复这个不必要的复杂实现更容易。这个可怕的结需要解开!

标签: c# .net task-parallel-library tpl-dataflow


【解决方案1】:

链接两个块时,可以选择向前传播完成,但不能向后传播。这在使用BoundedCapacity 选项时会出现问题,并且会发生错误,因为它会阻塞管道的馈线并导致死锁。不过,手动传播完成非常容易。这是您可以使用的方法。

async void OnErrorComplete(IDataflowBlock block1, IDataflowBlock block2)
{
    await Task.WhenAny(block1.Completion); // Safe awaiting
    if (block1.Completion.IsFaulted) block2.Complete();
}

它异步等待block1 完成,如果它失败了,它会立即完成block2。完成上游块通常就足够了,但如果需要,您也可以传播特定的异常:

async void OnErrorPropagate(IDataflowBlock block1, IDataflowBlock block2)
{
    await Task.WhenAny(block1.Completion); // Safe awaiting
    if (block1.Completion.IsFaulted)
        block2.Fault(block1.Completion.Exception.InnerException);
}

【讨论】:

  • 不知何故,我无法使我的代码与 OnErrorComplete 一起工作。尝试像QueueBlock.LinkTo(executionBlock, new DataflowLinkOptions() { PropagateCompletion = true }); ExecutionBlocks.Add(executionBlock); OnErrorComplete(QueueBlock, executionBlock); OnErrorComplete(executionBlock, ResultBlock); 一样使用它
  • OnErrorComplete 旨在反方向使用。您像往常一样向前链接:block1.LinkTo(block2),然后使用OnErrorComplete 创建向后条件链接:OnErrorComplete(block2, block1)
  • 我更改顺序后,它完成了所有任务,但停止打印结果。这是我在控制台窗口中得到的 空槽:10,左 = 40 空槽:10,左 = 35 空槽:10,左 = 30 空槽:10,左 = 25 空槽:10,左 = 20 空槽:10,左 = 15 空槽:10,左 = 10 空槽:10,左 = 5 已完成:100
  • 您是否记录了错误?编写一个等待block.CompletionOnErrorLog(IDataflowBlock block) 方法然后如果block.Completion.IsFaulted 它在某处记录block.Completion.Exception 是微不足道的。然后将其附加到所有块上,以便您知道发生了什么。
  • 是的,这就是计划 :) 谢谢伙计,非常感谢您的帮助!
猜你喜欢
  • 1970-01-01
  • 2011-08-25
  • 1970-01-01
  • 2012-04-10
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-12-22
  • 2018-11-07
相关资源
最近更新 更多