【问题标题】:Why are my async continuations scheduled outside of the thread pool?为什么我的异步延续计划在线程池之外?
【发布时间】:2018-08-23 16:06:53
【问题描述】:

请在标记为重复之前阅读问题。这个问题实现了this question提供的解决方案,仍然遇到死锁。

我正在调试一个大型多线程应用程序,该应用程序使用 .Net 客户端库对各种 Google API 进行多次并发调用,我们偶尔会在某些请求期间遇到死锁。有关应用程序的一些信息:

  • 应用程序是 Windows 服务(SynchronizationContext 为空)
  • .Net Framework 版本为 4.5
  • 所有发出 API 请求的线程都不在默认线程池中。

具体来说,我们正在调用Execute() 方法,该方法使用异步方法并在等待结果时阻塞

public TResponse Execute()
{
    try
    {
        using (var response = ExecuteUnparsedAsync(CancellationToken.None).Result)
        {
            return ParseResponse(response).Result;
        }
    }
    ...
}

这反过来又调用ExecuteUnparsedAsync() 执行HttpClient.SendAsync()

private async Task<HttpResponseMessage> ExecuteUnparsedAsync(CancellationToken cancellationToken)
{
    using (var request = CreateRequest())
    {
        return await service.HttpClient.SendAsync(request, cancellationToken).ConfigureAwait(false);
    }
}

现在,我了解到我们可以通过多种方式在这里遇到死锁,最好将应用程序更改为使用异步方法来避免它们。不幸的是,这将是一笔重大的时间投资,目前不可能,但将来可能会。

我的具体问题是所有调用线程都不在线程池中,并且因为 ConfigureAwait(false) 正在被调用,我希望延续将始终在线程池中运行,但事实并非如此。相反,似乎在原始调用线程上安排了延续,并且线程死锁,因为它正在等待结果。

使用以下 MCVE,我可以在几个小时内产生死锁。

using Google.Apis.Auth.OAuth2;
using Google.Apis.Drive.v2;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace DeadlockTest
{
    class Program
    {
        const int NUM_THREADS = 70;

        static long[] s_lastExecute = new long[NUM_THREADS];
        static long count = 0;

        static void Main(string[] args)
        {
            ServicePointManager.DefaultConnectionLimit = 50;

            for(int i = 0; i < s_lastExecute.Length; i++)
            {
                s_lastExecute[i] = DateTime.Now.ToBinary();
            }

            Thread deadlockCheck = new Thread(new ThreadStart(CheckForDeadlock));
            deadlockCheck.Start();

            RunThreads();

            deadlockCheck.Join();
        }

        static void RunThreads()
        {
            List<Thread> threads = new List<Thread>();

            for (int i = 0; i < NUM_THREADS; i++)
            {
                int threadIndex = i;
                Thread thread = new Thread(
                    new ParameterizedThreadStart(BeginThread));
                thread.Start(threadIndex);
                threads.Add(thread);
            }

            foreach(var thread in threads)
            {
                thread.Join();
            }
        }

        static void BeginThread(object threadIndex)
        {
            Debug.Assert(SynchronizationContext.Current == null);
            Debug.Assert(Thread.CurrentThread.IsThreadPoolThread == false);
            ThreadLoop((int)threadIndex);
        }

        static void ThreadLoop(int threadIndex)
        {
            Random random = new Random(threadIndex);

            while (true)
            {
                try
                {
                    GoogleDrive.Test(random);
                }
                catch(Exception ex)
                {
                    Console.WriteLine(ex.Message);
                }

                Interlocked.Exchange(ref s_lastExecute[threadIndex], DateTime.Now.ToBinary());
                Interlocked.Increment(ref count);
            }
        }

        private static void CheckForDeadlock()
        {
            Console.WriteLine("Deadlock check started");

            TimeSpan period = TimeSpan.FromMinutes(1);
            TimeSpan deadlockThreshold = TimeSpan.FromMinutes(10);

            while (true)
            {
                Thread.Sleep((int)period.TotalMilliseconds);

                DateTime now = DateTime.Now;
                TimeSpan oldestUpdate = TimeSpan.MinValue;

                for (int i = 0; i < NUM_THREADS; i++)
                {
                    DateTime lastExecute = DateTime.FromBinary(
                        Interlocked.Read(ref s_lastExecute[i]));

                    TimeSpan delta = now - lastExecute;
                    if(delta > oldestUpdate)
                    {
                        oldestUpdate = delta;
                    }

                    if (delta > deadlockThreshold)
                    {
                        var msg = string.Format("Deadlock detected in thread {0} for {1} minutes",
                            i.ToString(), (now - lastExecute).TotalMinutes);
                        Console.WriteLine(msg);
                    }
                }

                int workerThreads, completionPortThreads;
                System.Threading.ThreadPool.GetAvailableThreads(out workerThreads, out completionPortThreads);

                Console.WriteLine("Checked for deadlocks.");
                Console.WriteLine("\tWorker threads: " + workerThreads.ToString());
                Console.WriteLine("\tCompletion port threads: " + completionPortThreads.ToString());
                Console.WriteLine("\tExecute calls: " + Interlocked.Read(ref count).ToString());
                Console.WriteLine("\tOldest update (minutes): " + oldestUpdate.TotalMinutes.ToString());
            }
        }
    }

    class GoogleDrive
    {
        const string SERVICE_ACCOUNT = @"<path_to_service_account>";

        static string[] SCOPES = { DriveService.Scope.Drive };

        public static DriveService GetDriveService(string user)
        {
            GoogleCredential credential;
            using (var stream = new FileStream(SERVICE_ACCOUNT, FileMode.Open, FileAccess.Read))
            {
                credential = GoogleCredential
                    .FromStream(stream)
                    .CreateScoped(SCOPES)
                    .CreateWithUser(user);
            }

            var service = new DriveService(new DriveService.Initializer()
            {
                HttpClientInitializer = credential
            });

            return service;
        }

        public static void Test(Random random)
        {
            int userIndex = random.Next(Users.USERS.Length);
            string user = Users.USERS[userIndex];
            using (DriveService service = GetDriveService(user))
            {
                var request = service.Files.List();
                var result = request.Execute();
            }
        }
    }

    public static class Users
    {
        public static string[] USERS = new string[]
        {
            "user0000@domain.com",
            "user0001@domain.com",
            ...
        };
    }
}

一夜之间运行这个测试给了我以下信息:

Deadlock detected in thread 15 for 274.216744496667 minutes
Deadlock detected in thread 45 for 154.73506413 minutes
Deadlock detected in thread 46 for 844.978023301667 minutes
Checked for deadlocks.
        Worker threads: 2045
        Completion port threads: 989
        Execute calls: 2153228
        Oldest update (minutes): 844.978023301667

一旦检测到死锁,我可以在线程循环中插入一个中断并让正在运行的线程退出。这给我留下了主线程、计时器线程、运行时使用的两个线程和我的死锁线程(在这种情况下为三个。还要注意线程 ID 不匹配,因为我不够聪明,无法使用实际线程ID):

每个线程都有以下调用栈:

    mscorlib.dll!System.Threading.Monitor.Wait(object obj, int millisecondsTimeout, bool exitContext)
    mscorlib.dll!System.Threading.Monitor.Wait(object obj, int millisecondsTimeout)
    mscorlib.dll!System.Threading.ManualResetEventSlim.Wait(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken)
    mscorlib.dll!System.Threading.Tasks.Task.SpinThenBlockingWait(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken)
    mscorlib.dll!System.Threading.Tasks.Task.InternalWait(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken)
    mscorlib.dll!System.Threading.Tasks.Task<System.Net.Http.HttpResponseMessage>.GetResultCore(bool waitCompletionNotification)
    mscorlib.dll!System.Threading.Tasks.Task<System.__Canon>.Result.get()
    Google.Apis.dll!Google.Apis.Requests.ClientServiceRequest<Google.Apis.Drive.v2.Data.FileList>.Execute()
>   DeadlockTest.exe!DeadlockTest.GoogleDrive.Test(System.Random random)
    DeadlockTest.exe!DeadlockTest.Program.ThreadLoop(int threadIndex)
    DeadlockTest.exe!DeadlockTest.Program.BeginThread(object threadIndex)
    mscorlib.dll!System.Threading.ThreadHelper.ThreadStart_Context(object state)
    mscorlib.dll!System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state, bool preserveSyncCtx)
    mscorlib.dll!System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state, bool preserveSyncCtx)
    mscorlib.dll!System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state)
    mscorlib.dll!System.Threading.ThreadHelper.ThreadStart(object obj)

这也给我留下了以下剩余任务:

我不知道有什么方法可以准确找出任务安排在哪个线程上,但我认为很明显它们是为死锁线程安排的(即它们是死锁的根源)。

这让我想到了我的问题:

  • 为什么延续任务没有安排在线程池而不是调用线程中?
  • 有没有办法强制任务在线程池上运行,但结果上没有线程池线程块?

【问题讨论】:

  • .Result 总是不安全的,IMO 应该弃用。这些天人们使用它的唯一原因(根据我的经验)是试图桥接异步/同步,而且它总是会导致死锁。我知道你想推迟 asyncopolypse,但实际上,只有这样才能让你的代码solid。考虑一下您已经浪费时间在代码中乱扔ConfigureAwait 调用,只要您尝试执行此桥接操作,这些调用仍然容易失败。
  • @Damien_The_Unbeliever 您是否点击了该 Execute() 方法的链接?令人震惊的是,这是谷歌的代码。我想我会在他们的库中将其报告为错误;显然有人没有运行 John Skeet 的那个!
  • @Alden:我不知道问题是什么,但也许您可以尝试将这些异步调用包装在Task.Run(…).Result 中并强制所有内容在线程池上运行。

标签: c# task deadlock dotnet-httpclient taskscheduler


【解决方案1】:

我将假设这里的真正目标是消除死锁,为此我不相信您的延续所运行的线程甚至是相关的。我看到 2 个问题:

  1. Google 的 Execute() 方法完全是错误的。我什至会考虑报告一个错误,因为HttpClient 不支持同步调用,并且使用.Result 阻塞调用会导致死锁,期间。没有办法解决这个问题。

  2. 您可能会通过强制混合新线程来加剧死锁的可能性。一个常见的误解是并发需要线程,而在 I/O 绑定工作的情况下,这是不正确的。等待 I/O 的结果(您的代码可能花费大部分时间在做的事情)需要no thread at all。底层子系统很复杂,但它们被异步任务返回 API 优雅地抽象出来,例如 HttpClient。使用任务而不是线程,这些子系统将决定何时需要新线程,何时不需要。

所有这些都会导致您希望避免的结论 - 在您的代码中使用异步。你说这现在不可行,所以尽管我讨厌建议不是 100% 正确的代码,但希望一个公平的妥协是在其中部分实现以换取死锁可能性的显着降低。

如果可行,我的建议是重构您的线程创建方法(您的 MCVE 中的RunThreads)以使用任务(我们称之为RunTasks),并将所有内容向下转换为异步调用堆栈,以调用 Google 的 ExecuteAsync 而不是 Execute 结束。以下是重要部分的外观:

static void RunTasks()
{
    List<Task> tasks = new List<Task>();

    for (int i = 0; i < NUM_TASKS; i++)
    {
        tasks.Add(BeginTaskAsync(i));
    }

    // Your long-term goal should be to replace this with await Task.WhenAll(tasks);
    Task.WaitAll(tasks);
}

// work down your call stack here and convert methods to use async/await,
// eventually calling await ExecuteAsync() from the Google lib...

static async Task BeginTaskAsync(int taskIndex)
{
    ...
    await ThreadLoopAsync(taskIndex);
}

static async Task ThreadLoopAsync(int taskIndex)
{
    Random random = new Random(taskIndex);

    while (true)
    {
        try
        {
            await GoogleDrive.TestAsync(random);
        }
        ...
    }
}

class GoogleDrive
{
    ...

    public static async Task TestAsync(Random random)
    {
        ...
        using (DriveService service = GetDriveService(user))
        {
            var request = service.Files.List();
            var result = await request.ExecuteAsync();
        }
    }
}

我提到的“妥协”是调用Task.WaitAll。这是一个阻塞调用,因此仍然不能保证您不会在这里遇到死锁。但是,如果您没有时间/资源在调用堆栈up 中正确地异步,这应该是一个很大的改进。您的线程阻塞要少得多,一般来说线程也要少得多。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2012-05-08
    • 2014-05-14
    • 1970-01-01
    • 1970-01-01
    • 2014-05-05
    • 2013-05-07
    • 2010-12-22
    相关资源
    最近更新 更多