【问题标题】:Dispatcher, Async/Await, Concurrent work调度程序,异步/等待,并发工作
【发布时间】:2016-07-22 08:05:22
【问题描述】:

我有一堆从 Dispatcher 调用的异步方法。这些方法不会在后台执行任何工作,它们只是等待一些 I/O 操作,或者等待网络服务器的响应。

async Task FetchAsync()
{
    // Prepare request in UI thread
    var response = await new WebClient().DownloadDataTaskAsync();
    // Process response in UI thread
}

现在,我想执行负载测试,方法是并行调用多个 FetchAsync(),并以某种最大并行度进行。

我的第一次尝试是使用Paralell.Foreach(),但 id 不适用于 async/await。

var option = new ParallelOptions {MaxDegreeOfParallelism = 10};
Parallel.ForEach(UnitsOfWork, uow => uow.FetchAsync().Wait());

我一直在研究响应式扩展,但我仍然无法利用 Dispatcher 和 async/await。

我的目标是不为每个FetchAsync() 创建单独的线程。你能给我一些提示吗?

【问题讨论】:

  • 不要将Parallel.ForEachasync/await stackoverflow.com/a/11565317/585968 混为一谈,正如您所发现的那样
  • 如果没有单独的线程,您将无法并行运行。看看concurrency vs parallelism what is the difference
  • @AntP:您已删除我的 EDIT,因为“无需使用 asnwers 编辑问题”,但在我的编辑中对答案进行了重要修改。请回滚!
  • @Liero 如果它的不同足以成为一个单独的答案,它应该作为一个单独的答案发布(而不是在问题内)。不过,据我所知,这是一个非常狭窄的变化,仅适用于这个特定示例,对于未来的读者来说不会是一个重要的区别。

标签: c# multithreading parallel-processing async-await dispatcher


【解决方案1】:

只需拨打Fetchasync 而不等待每个电话,然后使用Task.WhenAll 一起等待所有电话。

var tasks = new List<Task>();
var max = 10;
for(int i = 0; i < max; i++)
{
    tasks.Add(FetchAsync());
}

await Task.WhenAll(tasks);

【讨论】:

  • 当我执行await Task.WhenAll() 时,FetchAsync 是否仍会在 UI 线程中被调用?
  • @Liero FetchAsync 在 FetchAsync 被调用时被调用,而不是在它被等待的时候,但这就是细节。
  • 好的,现在我需要添加 MaxDegreeOfParallelism 等效项 :) 当然我可以使用 Task.WhenAny() 手动实现一些逻辑,但是没有构建它的东西,可以帮助我吗?跨度>
  • 记住:在这种情况下没有线程。等待 UI 操作适用于在 ThreadPool 中注册延续的句柄。 ThreadPool 本身具有并发活动线程的限制,这取决于您的系统和池的使用情况。所以没有真正的“最大并行度”。如果你想限制并发请求的数量,你必须自己实现逻辑。
  • 你不需要if(tasks.Any()) - 如果你用一组空任务调用Task.WhenAll,它会立即完成。
【解决方案2】:

这是针对您的问题的通用可重用解决方案,您不仅可以使用 FetchAsync 方法重用它,还可以重用具有相同签名的任何异步方法。该 API 还包括实时并发限制支持:

参数是不言自明的: totalRequestCount:是您总共要执行多少异步请求(FatchAsync 调用),异步处理器是 FetchAsync 方法本身,maxDegreeOfParallelism 是可选的可空参数。如果您希望实时并发限制最大并发异步请求数,请设置它,否则不要。

public static Task ForEachAsync(
        int totalRequestCount,
        Func<Task> asyncProcessor,
        int? maxDegreeOfParallelism = null)
    {
        IEnumerable<Task> tasks;

        if (maxDegreeOfParallelism != null)
        {
            SemaphoreSlim throttler = new SemaphoreSlim(maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value);

            tasks = Enumerable.Range(0, totalRequestCount).Select(async requestNumber =>
            {
                await throttler.WaitAsync();
                try
                {
                    await asyncProcessor().ConfigureAwait(false);
                }
                finally
                {
                    throttler.Release();
                }
            });
        }
        else
        {
            tasks = Enumerable.Range(0, totalRequestCount).Select(requestNumber => asyncProcessor());
        }

        return Task.WhenAll(tasks);
    }

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-09-04
    • 1970-01-01
    • 2014-06-06
    • 2020-04-25
    • 2019-01-13
    • 1970-01-01
    • 2020-05-22
    • 2011-12-01
    相关资源
    最近更新 更多