【问题标题】:Retry async Task code using Reactive Extensions使用 Reactive Extensions 重试异步任务代码
【发布时间】:2017-01-12 06:25:06
【问题描述】:

在我的数据访问类中有以下代码。

public async Task<IEnumerable<TEntity>> QueryAsync(string sql, object param = null,
            CommandType commandType = CommandType.Text, int? commandTimeout = null, IDbTransaction transaction = null)
        {
            using (var connection = Connection)
            {
                var tokenSource = GetCancellationTokenSource(commandTimeout ?? CommandTimeoutDefault);
                Task<IEnumerable<TEntity>> queryTask =
                    connection.QueryAsync<TEntity>(new CommandDefinition(sql, param, transaction,
                        commandTimeout ?? CommandTimeoutDefault, commandType, cancellationToken: tokenSource.Token));
                IEnumerable<TEntity> data = await queryTask.ConfigureAwait(false);
                connection.Close();
                connection.Dispose();
                tokenSource.Dispose();
                return data;
            }
        }

我想当一个SqlExeption 被抛出时重试一次。请记住,我不能将 RX 应用于应用程序,而只能在此代码块中。

我尝试了下面的代码,看起来它执行正确,Do 正在登录控制台输出,但并没有真正调用Catch 处理程序,我不确定Retry 处理程序是否被执行也是。

public async Task<IEnumerable<TEntity>> QueryAsync(string sql, object param = null,
            CommandType commandType = CommandType.Text, int? commandTimeout = null, IDbTransaction transaction = null)
        {
            return await Observable.Defer(async () =>
            {
                using (var connection = Connection)
                {
                    var tokenSource = GetCancellationTokenSource(commandTimeout ?? CommandTimeoutDefault);
                    Task<IEnumerable<TEntity>> queryTask =
                        connection.QueryAsync<TEntity>(new CommandDefinition(sql, param, transaction,
                            commandTimeout ?? CommandTimeoutDefault, commandType, cancellationToken: tokenSource.Token));
                    IEnumerable<TEntity> data = await queryTask.ConfigureAwait(false);
                    connection.Close();
                    connection.Dispose();
                    tokenSource.Dispose();
                    return Observable.Return(data);
                }
            })
            .Catch<IEnumerable<TEntity>, SqlException>(source =>
           {
               Debug.WriteLine($"QueryAsync Exception {source}");
               return Observable.Return(new List<TEntity>());
           })
           .Throttle(TimeSpan.FromMilliseconds(500))
           .Retry(1)
           .Do(_ => Debug.WriteLine("Do QueryAsync"));
        }

【问题讨论】:

    标签: c# system.reactive reactiveui


    【解决方案1】:

    我可以看到您的代码存在几个潜在问题:

    • 将重试逻辑与主逻辑分开,例如在称为QueryWithRetryAsync 的方法中。这只是一个设计问题,但仍然是一个问题
    • Retry 之前之后 之前不要Catch。否则SqlException 将导致一个空列表,Retry 运算符将永远不会看到异常
    • 我认为 Throttle 根本没有必要,因为您只期望通过管道获得一个值
    • Retry(1) 没有做你认为的事情(这也让我感到惊讶)。似乎“重试”的定义包括第一次调用,所以你需要Retry(2)

    这是一个独立的示例,可以按照您想要的方式运行:

    class Program
    {
        static void Main(string[] args)
        {
            var pipeline = Observable
                .Defer(() => DoSomethingAsync().ToObservable())
                .Retry(2)
                .Catch<string, InvalidOperationException>(ex => Observable.Return("default"));
    
            pipeline
                .Do(Console.WriteLine)
                .Subscribe();
    
            Console.ReadKey();
        }
    
        private static int invocationCount = 0;
    
        private static async Task<string> DoSomethingAsync()
        {
            Console.WriteLine("Attempting DoSomethingAsync");
    
            await Task.Delay(TimeSpan.FromSeconds(2));
    
            ++invocationCount;
    
            if (invocationCount == 2)
            {
                return "foo";
            }
    
            throw new InvalidOperationException();
        }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2013-06-09
      • 1970-01-01
      • 1970-01-01
      • 2012-01-18
      • 2013-09-24
      相关资源
      最近更新 更多