【问题标题】:Limit parallelism of an Async method and not block a Thread-Pool thread限制异步方法的并行性并且不阻塞线程池线程
【发布时间】:2014-05-08 19:06:05
【问题描述】:

我有一个异步方法RequestInternalAsync() 向外部资源发出请求,并且想编写一个包装器方法,通过减少并行度来限制对该方法的并发异步请求数。

想到的第一个选项是TaskScheduler,并发有限(LimitedConcurrencyLevelTaskSchedulerConcurrentExclusiveSchedulerPair 等)。

但要使用自定义调度程序运行任务,我必须使用仅接受 Action<>TaskFactory 启动任务,即我不能通过不阻塞额外线程来等待内部方法的执行来做到这一点.

第二个选项是SemaphoreSlim,它完成了它的工作,但在这种情况下,我自己实现了节流,而不是使用TaskScheduler

static void Main(string[] args)
{
    // TESTING 1

    var task1 = Task.WhenAll(Enumerable.Range(1, 10).Select(i => RequestAsyncBad()));

    task1.Wait();

    // TESTING 2

    var task2 = Task.WhenAll(Enumerable.Range(1, 10).Select(i => RequestAsyncBetter()));

    task2.Wait();
}

private static Task RequestInternalAsync()
{
    return Task.Delay(500);
}

解决方案 #1:

private static readonly ConcurrentExclusiveSchedulerPair _concurrentPair
    = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default, 2);

public static Task RequestAsyncBad()
{
    // Dumb: Because TaskFactory doesn't provide an overload which accepts another task, only action.
    // As result, we blocking a thread to just wait until the inner task finishes.

    return Task.Factory.StartNew(() => RequestInternalAsync().Wait(),
        CancellationToken.None, TaskCreationOptions.DenyChildAttach, _concurrentPair.ConcurrentScheduler);
}

解决方案 #2(更好):

private static readonly SemaphoreSlim _semaphore = new SemaphoreSlim(2);

public static async Task RequestAsyncBetter()
{
    // Here we don't waste thread-pool thread on waiting for a completion of inner task,
    // but instead of using TaskScheduler, implementing a hand-made stuff with semaphore. 

    await _semaphore.WaitAsync().ConfigureAwait(false);

    try
    {
        await RequestInternalAsync();
    }
    finally
    {
        _semaphore.Release();
    }
}

更优雅的方法是什么?

  • 重用TPL的标准Task API和TaskScheduler
  • 并且不阻塞额外的线程

【问题讨论】:

    标签: c# task-parallel-library async-await


    【解决方案1】:

    TaskScheduler 仅对 CPU 密集型工作有用。你的工作没有使用线程。它使用 IO 完成端口,这意味着您的网络调用根本不包含任何线程。没有办法让TaskScheduler 参与 IO 操作。

    如果您还不相信:.NET 中的异步 IO 是基于使用 TaskCompletionSource,它丝毫没有绑定到线程或调度程序。

    SemaphoreSlim 是正确的做法。或者,创建一个ServicePoint 并设置其最大并发。仅适用于HTTP 请求。

    请注意,如果您发现自己在使用Wait,您应该犹豫并考虑一下您在做什么。通常,这是一个错误。

    【讨论】:

    • 谢谢!所以,据我了解,TaskSchedulers 只负责 WHENWHERE 启动任务,而不是用来控制启动任务的生命周期?
    • 我不会这么说的。请注意,有两种任务(这令人困惑!):CPU 工作(由最终运行的委托支持的任务)和由 TaskCompletionSource 支持的“其他工作”。只有第一种情况使用 TaskScheduler(并且它总是使用一个)。所以我想说的是,TaskScheduler 负责通过在它认为合适的时间和地点运行委托来完成委托支持的任务。
    • 也不可能给 TCS 一个调度程序,TPL 也不会要求一个执行基于 TCS 的任务。我相信在使用 TPL 时理解这一点非常有启发性。
    • +1。我最近开始使用术语“委托任务”和“承诺任务”来区分不同类型的任务。并不是说每个人都必须使用该术语,但在讨论差异时可能会有所帮助。
    • @StephenCleary 对我来说,问题是我没有找到任何可以解释TaskSchedulers 的用途以及为什么TaskFactory 故意只接受代表的信息来源。谢谢你的评论。我真的很喜欢阅读您的博客,并期待在 Kindle 方面阅读您的书 :)
    【解决方案2】:
    public static async Task RequestAsyncCool()
    {
        await Task.Factory.StartNew(async () => {
                await RequestInternalAsync();
            },
            CancellationToken.None, 
            TaskCreationOptions.DenyChildAttach, 
            TaskScheduler.Current);
    }
    

    你真的不应该 Wait 处理任务。见https://www.google.com/search?q=task+wait+deadlock

    你查看TPL DataFlow了吗?它可能只是适合你的东西......

    【讨论】:

    • 这里的问题是,如果我将异步方法放入接受Func<Task>Task.Factory.StartNew,并通过指定自定义TaskScheduler 来限制并发,那么并发限制将仅适用于第一部分在我的异步方法中,它不会等到前 2 个方法完成后再开始新方法(2 是我的并行限制)。这就是为什么我必须把Wait() 放在那里,这就是我不喜欢这个并问这个问题的原因:)
    猜你喜欢
    • 2017-06-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-01-08
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多