【问题标题】:How to execute nested async/await code in parallel while maintaining the same thread on await continuations?如何在等待延续上保持相同线程的同时并行执行嵌套的异步/等待代码?
【发布时间】:2015-11-05 14:22:18
【问题描述】:

这可能是我写过的最糟糕的 StackOverflow 标题。我实际上想要做的是执行一个异步方法,该方法使用异步/等待约定(并且它本身包含额外的等待调用)从一个同步方法中并行多次,同时在每个分支的执行过程中保持相同的线程并行执行,包括所有等待继续。换句话说,我想同步执行一些异步代码,但我想并行执行多次。现在你可以明白为什么标题如此糟糕了。也许最好用一些代码来说明这一点......

假设我有以下内容:

public class MyAsyncCode
{
    async Task MethodA()
    {
        // Do some stuff...
        await MethodB();
        // Some other stuff
    }

    async Task MethodB()
    {
        // Do some stuff...
        await MethodC();
        // Some other stuff
    }

    async Task MethodC()
    {
        // Do some stuff...
    }
}

调用者是同步的(来自控制台应用程序)。让我尝试说明我尝试使用Task.WaitAll(...) 和包装器任务的目的:

public void MyCallingMethod()
{
    List<Task> tasks = new List<Task>();
    for(int c = 0 ; c < 4 ; c++)
    {
        MyAsyncCode asyncCode = new MyAsyncCode();
        tasks.Add(Task.Run(() => asyncCode.MethodA()));
    }
    Task.WaitAll(tasks.ToArray());
}

所需的行为是MethodAMethodBMethodC 在继续之前和之后都在同一个线程上运行,并且在 4 个不同的线程上并行发生 4 次。换句话说,我想删除我的 await 调用的异步行为,因为我正在从调用者并行调用。

现在,在我进一步讨论之前,我确实了解异步代码和并行/多线程代码之间存在差异,并且前者并不暗示或暗示后者。我也知道实现此行为的最简单方法是删除 async/await 声明。不幸的是,我没有选择这样做(它在一个库中)并且有 原因 为什么我需要所有延续都在同一个线程上(与所述的糟糕设计有关图书馆)。但更重要的是,这激起了我的兴趣,现在我想从学术角度了解一下。

我尝试使用 PLINQ 运行它并使用 .AsParallel().Select(x =&gt; x.MethodA().Result) 立即执行任务。我还尝试使用这里和那里找到的AsyncHelper 类,它实际上只是使用.Unwrap().GetAwaiter().GetResult()。我还尝试了其他一些东西,但我似乎无法获得所需的行为。我要么在同一个线程上完成所有调用(显然不是并行的),要么在不同线程上执行延续。

我正在尝试做的事情是否可能,或者 async/await 和 TPL 太不同了(尽管两者都基于 Tasks)?

【问题讨论】:

  • 为什么需要一直在同一个线程上运行?有线程关联吗?为了实现这一点,方法必须合作并愿意在同步上下文或任务调度程序上安排它们的延续。库是否使用 ConfigureAwait(false)?那你就输了。
  • @usr 在实际代码中,异步方法接受一些输入,然后对这些输入进行操作。我需要传递的一些输入不是线程安全的(线程局部变量等)。我可以为每个并行操作创建新实例,但是如果继续在不同的线程上执行,事情就会迅速崩溃。但同样,在这一点上,我对问题本身更感兴趣,而不是我的具体场景。
  • 好的,取决于这些方法中的实际代码。库是否使用了 ConfigureAwait(false)?
  • 不,谢天谢地(在这种情况下)。
  • Task.Waital 的异步等待版本是 Task.WhenAll。这将返回一个任务,因此您可以等待 Task.WenAll 并且您的调用函数可以是异步的,从而使您的调用线程保持响应

标签: c# multithreading asynchronous parallel-processing async-await


【解决方案1】:

您调用的方法不使用ConfigureAwait(false)。这意味着我们可以强制继续在我们喜欢的上下文中恢复。选项:

  1. 安装单线程同步上下文。我相信 Nito.Async 可以做到这一点。
  2. 使用自定义TaskSchedulerawait 查看 TaskScheduler.Current 并在该调度程序不是默认的情况下继续。

我不确定这两种选择是否有利弊。我认为选项 2 的范围更容易。选项 2 看起来像:

Task.Factory.StartNew(
    () => MethodA()
    , new ConcurrentExclusiveSchedulerPair().ExclusiveScheduler).Unwrap();

为每个并行调用调用一次,并使用Task.WaitAll 加入所有这些任务。或许你也应该处理掉那个调度器。

我在这里(ab)使用ConcurrentExclusiveSchedulerPair 来获取单线程调度程序。

如果这些方法不是特别占用 CPU 资源,您可以对所有这些方法使用相同的调度程序/线程。

【讨论】:

    【解决方案2】:

    您可以创建 4 个独立线程,每个线程以有限并发(实际上,根本没有并发)TaskScheduler 执行 MethodA。这将确保线程创建的每个任务和后续任务都将由该线程执行。

        public void MyCallingMethod()
        {
            CancellationToken csl = new CancellationToken();
            var threads = Enumerable.Range(0, 4).Select(p =>
                {
                    var t = new Thread(_ =>
                        {
                            Task.Factory.StartNew(() => MethodA(), csl, TaskCreationOptions.None,
                                new LimitedConcurrencyLevelTaskScheduler(1)).Wait();
                        });
                    t.Start();  
                    return t;
                }).ToArray();
            //You can block the main thread and wait for the other threads here...
        }
    

    当然,这并不能确保您获得 4 度并行性。

    您可以在 MSDN 中看到此类 TaskScheduler 的实现 - https://msdn.microsoft.com/en-us/library/ee789351(v=vs.110).aspx

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-09-20
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多