【问题标题】:Why do multiple short Tasks end up getting the same id?为什么多个短任务最终得到相同的 id?
【发布时间】:2019-10-17 21:48:55
【问题描述】:

我有一个要处理的项目列表,我为每个项目创建一个任务,然后使用 Task.WhenAny() 等待。我遵循此处描述的模式:Start Multiple Async Tasks and Process Them As They Complete

我改变了一件事:我使用HashSet<Task> 而不是List<Task>。但我注意到所有任务最终都获得了相同的 id,因此 HashSet 只添加了其中一个,因此我最终只等待一个任务。

我在 dotnetfiddle 中有一个工作示例:https://dotnetfiddle.net/KQN2ow

下面也粘贴代码:

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace ReproTasksWithSameId
{
    public class Program
    {
        public static async Task Main(string[] args)
        {
            List<int> itemIds = new List<int>() { 1, 2, 3, 4 };
            await ProcessManyItems(itemIds);
        }

        private static async Task ProcessManyItems(List<int> itemIds)
        {
            //
            // Create tasks for each item and then wait for them using Task.WhenAny
            // Following Task.WhenAny() pattern described here: https://docs.microsoft.com/en-us/dotnet/csharp/programming-guide/concepts/async/start-multiple-async-tasks-and-process-them-as-they-complete
            // But replaced List<Task> with HashSet<Task>.
            //

            HashSet<Task> tasks = new HashSet<Task>();

            // We map the task ids to item ids so that we have enough info to log if a task throws an exception.
            Dictionary<int, int> taskIdToItemId = new Dictionary<int, int>();

            foreach (int itemId in itemIds)
            {
                Task task = ProcessOneItem(itemId);
                Console.WriteLine("Created task with id: {0}", task.Id);
                tasks.Add(task);
                taskIdToItemId[task.Id] = itemId;
            }

            // Add a loop to process the tasks one at a time until none remain.
            while (tasks.Count > 0)
            {
                // Identify the first task that completes.
                Task task = await Task.WhenAny(tasks);

                // Remove the selected task from the list so that we don't
                // process it more than once.
                tasks.Remove(task);

                // Get the item id from our map, so that we can log rich information.
                int itemId = taskIdToItemId[task.Id];

                try
                {
                    // Await the completed task.
                    await task;  // unwrap exceptions.
                    Console.WriteLine("Successfully processed task with id: {0}, itemId: {1}", task.Id, itemId);
                }
                catch (Exception ex)
                {
                    Console.WriteLine("Failed to process task with id: {0}, itemId: {1}. Just logging & eating the exception {1}", task.Id, itemId, ex);
                }
            }
        }
        private static async Task ProcessOneItem(int itemId)
        {
            // Assume this method awaits on some asynchronous IO.
            Console.WriteLine("item: {0}", itemId);
        }
    }
}

我得到的输出是这样的:

item: 1
Created task with id: 1
item: 2
Created task with id: 1
item: 3
Created task with id: 1
item: 4
Created task with id: 1
Successfully processed task with id: 1, itemId: 4

所以基本上程序在等待第一个任务后就退出了。

  1. 为什么多个短任务最终会获得相同的 id? 顺便说一句,我还使用返回 Task&lt;TResult&gt; 而不是 Task 的方法进行了测试,在这种情况下它可以正常工作.

  2. 我可以使用更好的方法吗?

【问题讨论】:

  • 这不是模式,它只是一个演示任务的文档示例。它适用于生产场景,除了非常简单的情况。有更好的类可以处理 pub/sub、多个 worker 和任务,例如 Dataflow 的 ActionBlock 和 System.Threading Channels。
  • 至于您的具体问题 - 异步代码在哪里?此代码中没有实时任务。编译器应该已经发出警告ProcessOneItem 不包含await,因此将同步运行。这里没有任务,一切都在主线程上运行
  • 你正在同步运行代码,它每次都返回相同的完成任务。在该方法中添加await Task.Delay(100);,它将返回新任务。
  • Is there a better approach I can use? 要做什么?细节很重要。处理 1000 个 URL 需要与处理 100K 内存中元素不同的架构
  • 既然您对此有答案,最好将其标记为这样,并提出一个新问题

标签: c# async-await


【解决方案1】:

问题的代码是同步的,因此只有一个已完成的任务在运行。 async 不会让某些东西异步运行,它是一种语法糖,它允许使用 await 等待 已经 正在执行的异步操作完成,而不会阻塞调用线程。

至于文档示例,就是这样。一个文档示例,而不是模式,当然除了简单的情况外,也不是可以在生产中使用的东西。

如果您一次只能发出 5 个请求以避免网络或 CPU 泛滥,会发生什么情况?您只需要为此下载固定数量的记录。如果您需要处理下载的数据怎么办?如果 URL 列表来自 另一个 线程怎么办?

这些问题由并发容器、发布/订阅模式以及专门构建的 Dataflow 和 Channel 类处理。

数据流

较旧的 Dataflow 类负责缓冲输入和输出并自动处理工作任务。整个下载代码可以替换为ActionBlock

var client=new HttpClient(....);
//Cancel if the process takes longer than 30 minutes
var cts=new CancellationTokenSource(TimeSpan.FromMinutes(30));
var options=new ExecutionDataflowBlockOptions(){
    MaxDegreeOfParallelism=10,
    BoundedCapacity=5,
    CancellationToken=cts.Token
};
var block=new ActionBlock<string>(url=>ProcessUrl(url,client,cts.Token));

就是这样。该块将使用多达 10 个并发任务来执行多达 10 个并发下载。它将在内存中最多保留 5 个 url(否则它将缓冲所有内容)。如果输入缓冲区已满,则将项目发送到块将异步等待,从而防止缓慢下载的 URL 淹没内存。

在相同或不同的线程上,网址的“发布者”可以发布任意数量的网址,只要它愿意。

foreach(var url in urls)
{
    await block.SendAsync(url);
}
//Tell the block we're done
block.Complete();
//Wait until all downloads are complete
await block.Completion;

我们可以使用 TransformBlock 等其他块来产生输出,将其传递给另一个块,从而创建一个并发处理管道。假设我们有两种方法,DownloadURLParseResponse 而不仅仅是 ProcessUrl

Task<string> DownloadUrlAsync(string url,HttpClient client)
{
    return client.GetStringAsync(url);
}

void ParseResponse(string content)
{
    var object=JObject.Parse();
    DoSomethingWith(object);
}

我们可以为管道中的每个步骤创建一个单独的块,使用不同的 DOP 和缓冲区:

var dlOptions=new ExecutionDataflowBlockOptions(){
    MaxDegreeOfParallelism=5,
    BoundedCapacity=5,
    CancellationToken=cts.Token
};
var downloader=new TransformBlock<string,string>(
                   url=>DownloadUrlAsync(url,client),
                   dlOptions);

var parseOptions = new ExecutionDataflowBlockOptions(){
    MaxDegreeOfParallelism=10,
    BoundedCapacity=2,
    CancellationToken=cts.Token
};
var parser=new ActionBlock<string>(ParseResponse);

downloader.LinkTo(parser, new DataflowLinkOptions{PropageateCompletion=true});

我们现在可以将 URL 发布到下载器,并等待它们都被解析。通过使用不同的 DOP 和容量,我们可以平衡下载器和解析器任务的数量,以下载我们可以解析和处理的尽可能多的 URL,例如慢速下载或大响应。

foreach(var url in urls)
{
    await downloader.SendAsync(url);
}
//Tell the block we're done
downloader.Complete();
//Wait until all urls are parsed
await parser.Completion;

频道

System.Threading.Channels 引入了 Go 风格的频道。这些实际上是 Dataflow 块的较低级别的概念。如果 Channels 早在 2012 年就可用,那么它们将使用 channels 编写。

等效的下载方法如下所示:

ChannelReader<string> Downloader(ChannelReader<string> ulrs,HttpClient client,
                                 int capacity,CancellationToken token=default)
{
    var channel=Channel.CreateBounded(capacity);
    var writer=channel.Writer;

    _ = Task.Run(async ()=>{
        await foreach(var url in urls.ReadAsStreamAsync(token))
        {
            var response=await client.GetStringAsync(url);
            await writer.WriteAsync(response);
        }
    }).ContinueWith(t=>writer.Complete(t.Exception));
    return channel.Reader;
}

这更冗长,但它允许我们做一些事情,比如创建 HttpClient in 方法并重用它。使用 ChannelReader 作为输入和输出可能看起来很奇怪,但现在我们可以简单地通过将输出阅读器作为输入传递给另一个方法来链接这些方法。

“神奇”在于我们创建了一个等待处理消息并立即返回阅读器的工作任务。每当产生结果时,都会将其发送到通道和管道中的下一步。

要使用多个工作任务,我们可以使用Enumerable.Range 启动其中的许多任务,并在所有通道完成后使用Task.WhenAny 关闭通道:

ChannelReader<string> Downloader(ChannelReader<string> ulrs,HttpClient client,
                                 int capacity,int dop,CancellationToken token=default)
{
    var channel=Channel.CreateBounded(capacity);
    var writer=channel.Writer;

    var tasks  = Enumerable
                   .Range(0,dop)
                   .Select(_=> Task.Run(async ()=>{
                       await foreach(var url in urls.ReadAllAsync(token))
                       {
                           var response=await client.GetStringAsync(url);
                           await writer.WriteAsync(response);
                       }
                    });
    _=Task.WhenAll(tasks)
          .ContinueWith(t=>writer.Complete(t.Exception));
    return channel.Reader;
}

发布者可以创建自己的频道并将阅读器传递给Downloader 方法。他们也不需要提前发布任何东西:

var channel=Channel.CreateUnbounded<string>();
var dlReader=Downloader(channel.Reader,client,5,5);
foreach(var url in someUrlList)
{
    await channel.Writer.WriteAsync(url);
}
channel.Writer.Complete();

流畅的管道

这很常见,以至于有人可以为此创建扩展方法。例如,要将 IList 转换为 Channel&lt;T&gt;,我们不需要等待,因为所有结果都已经可用:

ChannelReader<T> Generate<T>(this IEnumerable<T> source)
{
    var channel=Channel.CreateUnbounded<T>();
    foreach(var item in source)
    {
        channel.Writer.TryWrite(T);
    }
    channel.Writer.Complete();
    return channel.Reader;
}

如果我们也将Downloader 转换为扩展方法,我们可以使用:

var pipeline= someUrls.Generate() 
                      .Downloader(client,5,5);

【讨论】:

  • 感谢您提供详细信息。阅读有关此内容。
  • @Turbo 添加了另一个关于频道的部分
  • Dataflow 是完成这项工作的正确工具,恕我直言。这是一个涵盖缓冲和处理的完整解决方案。通道非常适合缓冲(实际上可能是完美的异步队列),但在处理部分没有帮助。因此,您最终会手动生成任务并编写具有问题异常处理特性的次优处理循环。比如例子中的第二个Downloader,一个异常就会杀死一个worker任务,降低并行度。在处理完所有 url 或所有工作人员都死亡之前,错误不会传播。
【解决方案2】:

这是因为ProcessOneItem 不是异步的。

您应该会看到以下警告:

此异步方法缺少“等待”运算符,将同步运行。考虑使用 'await' 运算符来等待非阻塞 API 调用,或使用 'await Task.Run(...)' 在后台线程上执行 CPU 密集型工作。

await (...) 添加到ProcessOneItem 后,返回任务将具有unique-ish id。

【讨论】:

    【解决方案3】:

    来自Task.Id属性的文档:

    任务 ID 是按需分配的,不一定代表任务实例的创建顺序。请注意,虽然冲突非常少见,但任务标识符不能保证是唯一的。

    据我了解,此属性主要用于调试目的。您可能应该避免将其用于生产代码。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-08-02
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多