任务并行库 (TPL) 基于任务的概念。术语“任务并行”是指同时运行的一个或多个任务。任务表示异步操作,在某些方面它类似于创建新线程或 ThreadPool 工作项,但抽象级别较高。任务提供两个主要好处:
-
系统资源的使用效率更高,可伸缩性更好。
在后台,任务排队到 ThreadPool,ThreadPool 已使用登山等算法进行增强,这些算法能够确定并调整到可最大化吞吐量的线程数。这会使任务相对轻量,您可以创建很多任务以启用细化并行。为了补偿这一点,可使用众所周知的工作窃取算法提供负载平衡。
-
对于线程或工作项,可以使用更多的编程控件。
任务和围绕它们生成的框架提供了一组丰富的 API,这些 API 支持等待、取消、继续、可靠的异常处理、详细状态、自定义计划等功能。
出于这两个原因,在 .NET Framework 4 中,任务是用于编写多线程、异步和并行代码的首选 API。
1:隐式创建和运行任务
Parallel.Invoke(() => DoSomeWork(), () => DoSomeOtherWork());
为了更好地控制任务执行或从任务返回值,必须更加显式地使用 Task 对象。
2:显式创建和运行任务
任务由 Task 继承。
任务对象处理基础结构详细信息,并提供可在任务的整个生存期内从调用线程访问的方法和属性。例如,可以随时访问任务的 TaskStatus 枚举表示。在创建任务时,您赋予它一个用户委托,该委托封装该任务将执行的代码。该委托可以表示为命名的委托、匿名方法或 lambda 表达式。lambda 表达式可以包含对命名方法的调用,如下面的示例所示。
taskA.Start();
您还可以使用 StartNew 方法在一个操作中创建并启动任务。如果不必将创建和计划分开,这是创建并启动任务的首选方法。任务公开静态 Factory 属性,该属性返回 TaskFactory 的默认实例,以便您可以通过 Task.Factory.StartNew(…) 的形式调用该方法。此外,在此示例中,由于任务的类型为 Task<double>,因此每个任务都具有包含计算结果的公共 Result 属性。任务以异步方式运行,可以按任意顺序完成。如果在计算完成之前访问 Result,则该属性将一直处于阻止状态,直到值可用为止。
class MyCustomData
{
public DateTime Date;
public String Name;
}
static void TaskDemo2()
{
var taskB = Task.Factory.StartNew((obj) =>
{
MyCustomData data = (MyCustomData)obj;
Console.WriteLine("Hello from {0}. Today is {1}.", data.Name, data.Date);
},
new MyCustomData { Name = "taskB", Date = DateTime.Today });
var taskC = Task.Factory.StartNew(state =>
{
dynamic data = state;
Console.WriteLine(data.Name);
Console.WriteLine(data.Date);
}, new { Name = "taskB", Date = DateTime.Today });
}
使用 TaskContinueWith(),可以指定在前面的任务完成时要启动的任务。延续任务的委托中将传入对前面的任务的引用,以便它可以检查其状态。此外,可以在 analyzeData 的输入,并产生结果,该结果的类型以类似方式推断,且可用于 Result 属性中的程序。
Task<byte[]> getData = new Task<byte[]>(() => GetFileData());
Task<double[]> analyzeData = getData.ContinueWith(x => Analyze(x.Result));
Task<string> reportData = analyzeData.ContinueWith(y => Summarize(y.Result));
getData.Start();
//or...
Task<string> reportData2 = Task.Factory.StartNew(() => GetFileData())
.ContinueWith((x) => Analyze(x.Result))
.ContinueWith((y) => Summarize(y.Result));
System.IO.File.WriteAllText(@"C:\reportFolder\report.txt", reportData.Result);
使用 ContinueWhenAll()()() 方法和 ContinueWhenAny()()() 方法,可以从多个任务继续。有关更多信息,请参见如何:用延续将多个任务链接在一起。
通常,会出于以下某个原因等待任务:
-
主线程依赖于任务计算的最终结果。
-
您必须处理可能从任务引发的异常。
private void buttonParallelTaskWait_Click(object sender, RoutedEventArgs e)
{
// Wait on a single task with no timeout specified.
Task task1 = Task.Factory.StartNew(() => DoSomeWork(10000000));
task1.Wait();
ConsoleTexter.Clear();
ConsoleTexter.WriteLine("task1 has completed.");
// Wait on a single task with a timeout specified.
Task task2 = Task.Factory.StartNew(() => DoSomeWork(10000000));
task2.Wait(100); //Wait for 100 ms.
if (task2.IsCompleted)
ConsoleTexter.WriteLine("task2 has completed.");
else
ConsoleTexter.WriteLine("Timed out before task2 completed.");
// Wait for all tasks to complete.
Task[] tasks = new Task[10];
for (int i = 0; i < 10; i++)
{
tasks[i] = Task.Factory.StartNew(() => DoSomeWork(10000000));
}
Task.WaitAll(tasks);
// Wait for first task to complete.
Task<double>[] tasks2 = new Task<double>[3];
// Try three different approaches to the problem. Take the first one.
tasks2[0] = Task<double>.Factory.StartNew(() => TrySolution1());
tasks2[1] = Task<double>.Factory.StartNew(() => TrySolution2());
tasks2[2] = Task<double>.Factory.StartNew(() => TrySolution3());
int index = Task.WaitAny(tasks2);
double d = tasks2[index].Result;
ConsoleTexter.WriteLine("task[{0}] completed first with result of {1}.", index, d);
MessageBox.Show(ConsoleTexter.Out.ToString());
}
当某个任务引发一个或多个异常时,异常包装在 AggregateException 中。该异常传播回与该任务联接的线程,此线程通常是正在等待该任务或尝试访问该任务的 Result 属性的线程。此行为用于强制实施所有未处理的异常默认情况下应关闭进程的 .NET Framework 策略。调用代码可以通过在任务或任务组上使用 () 属性,或者通过在 try-catch 块中包括 Wait 方法,来处理异常。
联接线程也可以通过在对任务进行垃圾回收之前访问 Exception 属性来处理异常。通过访问此属性,可防止未处理的异常触发在对象完成时关闭进程的异常传播行为。
异常处理的例子见下文的取消任务。
6:取消任务
-
简单地从委托中返回。在许多情况下,这样已足够;但是,采用这种方式“取消”的任务实例会转换为 RanToCompletion 状态,而不是 Canceled 状态。
-
引发 OperationCanceledException,并将其传递到在其上请求了取消的标记。完成此操作的首选方式是使用 ThrowIfCancellationRequested 方法。采用这种方式取消的任务会转换为 Canceled 状态,调用代码可使用该状态来验证任务是否响应了其取消请求。
CancellationTokenSource tokenSource2;
CancellationToken ct;
Task task;
public UserControlParallel()
{
InitializeComponent();
tokenSource2 = new CancellationTokenSource();
ct = tokenSource2.Token;
}
//任务开始
private void buttonParallelTaskStart_Click(object sender, RoutedEventArgs e)
{
task = Task.Factory.StartNew(() =>
{
// Were we already canceled?
ct.ThrowIfCancellationRequested();
bool moreToDo = true;
while (moreToDo)
{
// Poll on this property if you have to do
// other cleanup before throwing.
Thread.Sleep(100);
if (ct.IsCancellationRequested)
{
// Clean up here, then...
ct.ThrowIfCancellationRequested();
}
}
}, tokenSource2.Token); // Pass same token to StartNew.
}
//任务结束
private void buttonParallelTaskCancel_Click(object sender, RoutedEventArgs e)
{
tokenSource2.Cancel();
// Just continue on this thread, or Wait/WaitAll with try-catch:
try
{
task.Wait();
}
catch (AggregateException err)
{
ConsoleTexter.Clear();
foreach (var v in err.InnerExceptions)
ConsoleTexter.WriteLine("msg: " + v.Message);
MessageBox.Show(ConsoleTexter.Out.ToString());
}
}