【发布时间】:2019-10-17 21:48:55
【问题描述】:
我有一个要处理的项目列表,我为每个项目创建一个任务,然后使用 Task.WhenAny() 等待。我遵循此处描述的模式:Start Multiple Async Tasks and Process Them As They Complete。
我改变了一件事:我使用HashSet<Task> 而不是List<Task>。但我注意到所有任务最终都获得了相同的 id,因此 HashSet 只添加了其中一个,因此我最终只等待一个任务。
我在 dotnetfiddle 中有一个工作示例:https://dotnetfiddle.net/KQN2ow
下面也粘贴代码:
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace ReproTasksWithSameId
{
public class Program
{
public static async Task Main(string[] args)
{
List<int> itemIds = new List<int>() { 1, 2, 3, 4 };
await ProcessManyItems(itemIds);
}
private static async Task ProcessManyItems(List<int> itemIds)
{
//
// Create tasks for each item and then wait for them using Task.WhenAny
// Following Task.WhenAny() pattern described here: https://docs.microsoft.com/en-us/dotnet/csharp/programming-guide/concepts/async/start-multiple-async-tasks-and-process-them-as-they-complete
// But replaced List<Task> with HashSet<Task>.
//
HashSet<Task> tasks = new HashSet<Task>();
// We map the task ids to item ids so that we have enough info to log if a task throws an exception.
Dictionary<int, int> taskIdToItemId = new Dictionary<int, int>();
foreach (int itemId in itemIds)
{
Task task = ProcessOneItem(itemId);
Console.WriteLine("Created task with id: {0}", task.Id);
tasks.Add(task);
taskIdToItemId[task.Id] = itemId;
}
// Add a loop to process the tasks one at a time until none remain.
while (tasks.Count > 0)
{
// Identify the first task that completes.
Task task = await Task.WhenAny(tasks);
// Remove the selected task from the list so that we don't
// process it more than once.
tasks.Remove(task);
// Get the item id from our map, so that we can log rich information.
int itemId = taskIdToItemId[task.Id];
try
{
// Await the completed task.
await task; // unwrap exceptions.
Console.WriteLine("Successfully processed task with id: {0}, itemId: {1}", task.Id, itemId);
}
catch (Exception ex)
{
Console.WriteLine("Failed to process task with id: {0}, itemId: {1}. Just logging & eating the exception {1}", task.Id, itemId, ex);
}
}
}
private static async Task ProcessOneItem(int itemId)
{
// Assume this method awaits on some asynchronous IO.
Console.WriteLine("item: {0}", itemId);
}
}
}
我得到的输出是这样的:
item: 1
Created task with id: 1
item: 2
Created task with id: 1
item: 3
Created task with id: 1
item: 4
Created task with id: 1
Successfully processed task with id: 1, itemId: 4
所以基本上程序在等待第一个任务后就退出了。
为什么多个短任务最终会获得相同的 id? 顺便说一句,我还使用返回
Task<TResult>而不是Task的方法进行了测试,在这种情况下它可以正常工作.我可以使用更好的方法吗?
【问题讨论】:
-
这不是模式,它只是一个演示任务的文档示例。它不适用于生产场景,除了非常简单的情况。有更好的类可以处理 pub/sub、多个 worker 和任务,例如 Dataflow 的 ActionBlock 和 System.Threading Channels。
-
至于您的具体问题 - 异步代码在哪里?此代码中没有实时任务。编译器应该已经发出警告
ProcessOneItem不包含await,因此将同步运行。这里没有任务,一切都在主线程上运行 -
你正在同步运行代码,它每次都返回相同的完成任务。在该方法中添加
await Task.Delay(100);,它将返回新任务。 -
Is there a better approach I can use?要做什么?细节很重要。处理 1000 个 URL 需要与处理 100K 内存中元素不同的架构 -
既然您对此有答案,最好将其标记为这样,并提出一个新问题
标签: c# async-await