【问题标题】:TransactionScope across multiple threads using Task.WhenAll使用 Task.WhenAll 跨多个线程的 TransactionScope
【发布时间】:2022-04-04 00:41:57
【问题描述】:

我正在尝试使用 Task.WhenAll 对数据库进行多个并行更新。代码流程是这样的。

在主方法中,我创建了一个事务范围并创建了主事务的克隆并传递给子事务。主事务被阻塞,直到子事务完成

using (var scope = DalcHelper.GetTransactionScope())
{
    DependentTransaction transaction = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete);
    var task1= Dalc.UpdateDetails1(transaction );

    DependentTransaction transaction1 = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete);
    var task2 = Dalc.UpdateDetails2(transaction1);

    await Task.WhenAll(task1, task2 ).ConfigureAwait(false);

    scope.Complete();
}

DalcMethod 是这样的。在这里,从外部事务创建的克隆作为参数。依赖事务完成通知主事务依赖完成

try
{
    using (SqlCommand databaseCommand = DalcHelper.GetCommand(SPName))
    using (var scope = new TransactionScope(dependentCloneTransaction, TransactionScopeAsyncFlowOption.Enabled))
    {
        -- Update database
        scope.Complete();
    }
}
finally
{
    //Call complete on the dependent transaction
    dependentCloneTransaction.Complete();
}

dalc 方法是返回 Task 的异步方法

我得到以下异常

事务已中止。尝试提升事务时失败。已经有一个打开的与此命令关联的 DataReader,必须先关闭它。等待操作超时

。谁能告诉我我在这里做错了什么?

【问题讨论】:

  • 在这种情况下,问题似乎有所不同,与Async or TPL 无关,它作为本地事务开始,当您在同一事务上下文中打开多个资源(连接)时,它确实被提升为分布式事务,由于某些限制而失败。第二个问题似乎是你试图在Command 对象上ExecuteReader,当原始Reader 仍然存在时,这是一个实时连接,首先关闭它以运行第二个ExecuteNonQuery

标签: c# multithreading async-await task-parallel-library transactionscope


【解决方案1】:
namespace Playground
{
    static class DalcHelper
    {
        public static TransactionScope GetTransactionScope()
        {
            return new TransactionScope(TransactionScopeAsyncFlowOption.Enabled);
        }

        public async static Task ReadDetails1(DependentTransaction transaction,SqlConnection conn)
        {
            try
            {
                string commandText = "SELECT * FROM dbo.Persons"; // some table, say Persons
                using (SqlCommand cmd = new SqlCommand(commandText, conn))
                {
                    cmd.CommandType = System.Data.CommandType.Text;
                    SqlDataReader reader = await cmd.ExecuteReaderAsync(CommandBehavior.Default);
                    while (reader.Read())
                    {
                        int Id = reader.GetInt32("Id");
                        Console.WriteLine("Id " + Id);
                    }
                    reader.Close();
                }
                transaction.Complete();
                return;
            }
            catch (Exception ex)
            {
                Console.WriteLine("Task 1"+ ex.Message);
            }
        }

        public async static Task ReadDetails2(DependentTransaction transaction1, SqlConnection conn)
        {
            try
            {
                string commandText = "SELECT * FROM dbo.Persons";
                using (SqlCommand cmd = new SqlCommand(commandText, conn))
                {
                    cmd.CommandType = System.Data.CommandType.Text;
                    SqlDataReader reader = await cmd.ExecuteReaderAsync(CommandBehavior.Default);
                    while (reader.Read())
                    {
                        int age = reader.GetInt32("Age");
                        Console.WriteLine("Age " + age);
                    }
                    reader.Close();
                }
                transaction1.Complete();
                return;
            }
            catch (Exception ex)
            {
                Console.WriteLine("Task 2" + ex.Message);
            }
        }
    }

    class Program
    {
        static void Main(string[] args)
        {
            string connectionString = "YourConnectionString";
            _ = RunMe(connectionString);
        }

        private async static Task RunMe(string connectionString)
        {
            
                try
                {
                    
                    Task task1 = Task.Run( async()=> {
                        using (TransactionScope scope = DalcHelper.GetTransactionScope())
                        {
                            using (SqlConnection conn = new SqlConnection(connectionString))
                            {
                                DependentTransaction transaction = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete);
                                conn.Open();
                                await DalcHelper.ReadDetails1(transaction, conn);
                                /*
                                * add more tasks if you wish to
                                */
                                Console.WriteLine("Completed task 1");
                                conn.Close();

                            }
                            scope.Complete();
                        }
                    });

                    

                    Task task2 = Task.Run(async () =>
                    {
                        using (TransactionScope scope = DalcHelper.GetTransactionScope())
                        {
                            using (SqlConnection conn = new SqlConnection(connectionString))
                            {
                                DependentTransaction transaction = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete);
                                conn.Open();
                                await DalcHelper.ReadDetails2(transaction, conn);
                                /*
                                    may be update some column of table based on previous op.
                                   // await DalcHelper.UpdateDetails2(transaction, conn); 
                                */ 
                                Console.WriteLine("Completed task 2");
                                conn.Close();
                            }
                            /*
                            calling `Complete` method will commit all the changes within the transaction scope(including the UpdateDetails2 method)
                            need not dispose transaction scope explicitly, `using` block takes care of that
                            */ 
                            scope.Complete(); 
                        }
                    });

                 await Task.WhenAll(task1, task2);// at this point every task added is complete
                 Console.WriteLine("completed both tasks");
                 Console.ReadLine();
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                }
            }
        }
    }

使用事务范围时要记住的一些要点

  1. 必须将TransactionScope 处理在创建它的同一线程中,否则可能会抛出类似Transaction already aborted 的错误。
  2. 只有在调用TransactionScope.Complete() 方法时,才会保留任何更新操作。
  3. 确保为每个线程打开单独的连接并在使用后关闭它。话虽如此,从性能的角度来看,我不确定是否为每个线程使用单独的连接。我很高兴在这方面得到更多的教育,我会更新我的答案。不过,此解决方案应该可以帮助您解决问题。

阅读一些已经发布的与该主题相关的有用答案

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多