【发布时间】:2022-02-08 10:58:24
【问题描述】:
我有一个程序需要并行运行多个异步任务。 Task1()和Task2()这两个工具的主要区别:
Task1() 保存 FTask 类型的 Func<Task> 和稍后等待 Task.WhenAny(FTask()),
Task2() 保存由Task.Run() 生成的Task 并稍后等待Task.WhenAny(Task)。
我认为它们应该是等效的,但是 Task1() 不能正常工作,但是 Task2() 确实有效。
这个程序的流程是: 如果空闲队列中有可用的任务上下文,则将其出列并使用异步任务对其进行设置;当异步任务完成时,将上下文放回空闲队列。
问题是 Task1() 保存 Func<Task> 不起作用,似乎异步任务有多个实例在运行。
完整代码为:
static async Task Task1()
{
int qd = 4;
var taskPool = Enumerable.Range(0, qd).Select(n => new TaskCtx()).ToArray();
Queue<TaskCtx> qFree = new Queue<TaskCtx>(taskPool);
Random r = new Random();
long segs = 7, submit = 0;
int token = 0;
int ticket = 0;
Console.WriteLine($"-------- Task1 start --------");
while (submit < segs)
{
if (qFree.Count > 0)
{
var x = qFree.Dequeue();
x.Token = token++;
x.Delay1 = r.Next(10, 20);
x.Delay2 = r.Next(30, 50);
x.FTask = async () =>
{
Console.WriteLine($"[{x.Token}] start");
await Task.Delay(x.Delay1);
Console.WriteLine($"[{x.Token}] wait ticket: {ticket}");
while (ticket != x.Token)
{
await Task.Yield(); // yield and wait other task increases the ticket
}
Console.WriteLine($"[{x.Token}] acquire ticket: {ticket}");
await Task.Delay(x.Delay2);
Console.WriteLine($"[{x.Token}] release ticket: {ticket}");
ticket++;
qFree.Enqueue(x);
};
Console.WriteLine($"[{x.Token}] submit");
submit++;
}
else await Task.WhenAny(taskPool.Select(x => x.FTask()));
}
await Task.WhenAll(taskPool.Select(x => x.FTask()));
Console.WriteLine($"-------- Task1 finished --------");
}
static async Task Task2()
{
int qd = 4;
var taskPool = Enumerable.Range(0, qd).Select(n => new TaskCtx()).ToArray();
Queue<TaskCtx> qFree = new Queue<TaskCtx>(taskPool);
Random r = new Random();
long segs = 7, submit = 0;
int token = 0;
int ticket = 0;
Console.WriteLine($"-------- Task2 start --------");
while (submit < segs)
{
if (qFree.Count > 0)
{
var x = qFree.Dequeue();
x.Token = token++;
x.Delay1 = r.Next(10, 20);
x.Delay2 = r.Next(30, 50);
x.Task = Task.Run(async () =>
{
Console.WriteLine($"[{x.Token}] start");
await Task.Delay(x.Delay1);
Console.WriteLine($"[{x.Token}] wait ticket: {ticket}");
while (ticket != x.Token)
{
await Task.Yield(); // yield and wait other task increases the ticket
}
Console.WriteLine($"[{x.Token}] acquire ticket: {ticket}");
await Task.Delay(x.Delay2);
Console.WriteLine($"[{x.Token}] release ticket: {ticket}");
ticket++;
qFree.Enqueue(x);
});
Console.WriteLine($"[{x.Token}] submit");
submit++;
}
else await Task.WhenAny(taskPool.Select(x => x.Task));
}
await Task.WhenAll(taskPool.Select(x => x.Task));
Console.WriteLine($"-------- Task2 finished --------");
}
public class TaskCtx
{
public int Token;
public int Delay1;
public int Delay2;
public Func<Task> FTask;
public Task Task;
public TaskCtx()
{
FTask = async () => await Task.CompletedTask;
Task = Task.CompletedTask;
}
}
Task1()的输出:
-------- Task1 start --------
[0] submit
[1] submit
[2] submit
[3] submit
[0] start
[1] start
[2] start
[3] start
[0] wait ticket: 0
[0] acquire ticket: 0
[2] wait ticket: 0
[1] wait ticket: 0
[3] wait ticket: 0
[0] release ticket: 0
[1] acquire ticket: 1 <-- [1] is running and got the ticket
[4] submit
[4] start
[1] start <-- another [1] starts ???
[2] start
[3] start
[2] wait ticket: 1
[1] wait ticket: 1
[1] acquire ticket: 1
[4] wait ticket: 1
[3] wait ticket: 1
[1] release ticket: 1
[2] acquire ticket: 2
[2] acquire ticket: 2
[1] release ticket: 2
[3] acquire ticket: 3
[3] acquire ticket: 3
[5] submit
[6] submit
[4] start
[6] start
[2] start
[3] start
[2] wait ticket: 3
[4] wait ticket: 3
[6] wait ticket: 3
[3] wait ticket: 3
[3] acquire ticket: 3
[2] release ticket: 3
[4] acquire ticket: 4
[2] release ticket: 3
[4] acquire ticket: 4
[3] release ticket: 5
[6] acquire ticket: 6
[3] release ticket: 5
[3] release ticket: 7
[4] release ticket: 8
[4] release ticket: 8
[6] release ticket: 10
Task2()的输出:
-------- Task2 start --------
[0] submit
[1] submit
[2] submit
[3] submit
[0] start
[1] start
[2] start
[3] start
[2] wait ticket: 0
[1] wait ticket: 0
[3] wait ticket: 0
[0] wait ticket: 0
[0] acquire ticket: 0
[0] release ticket: 0
[1] acquire ticket: 1
[4] submit
[4] start
[4] wait ticket: 1
[1] release ticket: 1
[2] acquire ticket: 2
[5] submit
[5] start
[5] wait ticket: 2
[2] release ticket: 2
[3] acquire ticket: 3
[6] start
[6] submit
[6] wait ticket: 3
[3] release ticket: 3
[4] acquire ticket: 4
[4] release ticket: 4
[5] acquire ticket: 5
[5] release ticket: 5
[6] acquire ticket: 6
[6] release ticket: 6
-------- Task2 finished --------
更新: 我通过使用普通的异步函数而不是匿名函数来制作另一个实现:
static async Task Task3()
{
int qd = 4;
var taskPool = Enumerable.Range(0, qd).Select(n => new TaskCtx()).ToArray();
Queue<TaskCtx> qFree = new Queue<TaskCtx>(taskPool);
Random r = new Random();
long segs = 7, submit = 0;
int token = 0;
Ticket ticket = new Ticket(0);
Console.WriteLine($"-------- Task3 start --------");
while (submit < segs)
{
if (qFree.Count > 0)
{
var x = qFree.Dequeue();
x.Token = token++;
x.Delay1 = r.Next(10, 20);
x.Delay2 = r.Next(30, 50);
x.Task = RunTask3Async(x, ticket, qFree);
Console.WriteLine($"[{x.Token}] submit");
submit++;
}
else await Task.WhenAny(taskPool.Select(x => x.Task));
}
await Task.WhenAll(taskPool.Select(x => x.Task));
Console.WriteLine($"-------- Task3 finished --------");
}
static async Task RunTask3Async(TaskCtx x, Ticket ticket, Queue<TaskCtx> qFree)
{
Console.WriteLine($"[{x.Token}] start");
await Task.Delay(x.Delay1);
Console.WriteLine($"[{x.Token}] wait ticket: {ticket}");
while (ticket.ticket != x.Token)
{
await Task.Yield(); // yield and wait other task increases the ticket
}
Console.WriteLine($"[{x.Token}] acquire ticket: {ticket}");
await Task.Delay(x.Delay2);
Console.WriteLine($"[{x.Token}] release ticket: {ticket}");
ticket.ticket++;
qFree.Enqueue(x);
}
public class Ticket
{
public int ticket;
public Ticket(int t)
{
ticket = t;
}
public override string ToString()
{
return $"{ticket}";
}
}
Task3() 也可以,输出为:
-------- Task3 start --------
[0] start
[0] submit
[1] start
[1] submit
[2] start
[2] submit
[3] start
[3] submit
[0] wait ticket: 0
[3] wait ticket: 0
[0] acquire ticket: 0
[2] wait ticket: 0
[1] wait ticket: 0
[0] release ticket: 0
[1] acquire ticket: 1
[4] start
[4] submit
[4] wait ticket: 1
[1] release ticket: 1
[2] acquire ticket: 2
[5] start
[5] submit
[5] wait ticket: 2
[2] release ticket: 2
[3] acquire ticket: 3
[6] start
[6] submit
[6] wait ticket: 3
[3] release ticket: 3
[4] acquire ticket: 4
[4] release ticket: 4
[5] acquire ticket: 5
[5] release ticket: 5
[6] acquire ticket: 6
[6] release ticket: 6
-------- Task3 finished --------
更新2: 我尝试最小化示例,但以下代码没有这个问题。
var FTasks = Enumerable.Range(0, 8).Select(i => async () =>
{
Console.WriteLine($"[{i}] start");
await Task.Delay(i);
Console.WriteLine($"[{i}] end");
}).ToArray();
await Task.WhenAll(FTasks.Select(x => x()));
Console.WriteLine("----------------");
var Tasks = Enumerable.Range(0, 8).Select(i => Task.Run(async () =>
{
Console.WriteLine($"[{i}] start");
await Task.Delay(i);
Console.WriteLine($"[{i}] end");
})).ToArray();
await Task.WhenAll(Tasks);
【问题讨论】:
-
伊万你的例子远非最小。如果您想深入了解物化
Task和异步委托之间的区别,您可以通过使示例尽可能少来了解更多信息。一个五行十行的程序可能足以证明这些差异。 -
谢谢@TheodorZoulias,我有一个最小的,但他们没有这个问题。请参阅 UPDATE2。
-
"下面的代码没有这个问题" -- 原来的复杂例子有什么问题,而更新的最小例子没有?跨度>
-
嗨@TheodorZoulias,我更新了描述。该程序的流程是:如果空闲队列中有可用的任务上下文,则将其出列并使用异步任务对其进行设置;当异步任务完成时,将上下文放回空闲队列。问题是保存 Func
的 Task1() 不起作用,似乎异步任务有多个实例在运行。
标签: c# asynchronous async-await task-parallel-library