前面一篇提到例子都是数据并行,但这并不是并行化的唯一形式,在.Net4之前,必须要创建多个线程或者线程池来利用多核技术。现在只需要使用新的Task实例就可以通过更简单的代码解决命令式任务并行问题。
1.Task及它的生命周期
一个Task表示一个异步操作,它的创建和执行都是独立的,因此可以对相关操作的执行拥有完全的控制权;当有很多异步操作作为Task实例加载的时候,为了充分利用运行时的逻辑内核,任务调度器会尝试并行的运行这些任务,当然任务都是有额外的开销,虽然要小于添加线程的开销;
对Task实例的生命周期的理解非常重要。一个Task的执行,取决于底层硬件和运行时可用的资源。因此Task实例的状态会不断的发生改变,而一个Task实例只会完成其生命周期一次,当Task到达它三种可能的最终状态只后,它就回不去之前的任何状态了。
Task实例有三种可能的初始状态,Created是Task构造函数创建实例的初始状态,WaitForActivation是子任务依赖其他任务完成后等待调度的初始状态,WaitingToRun是通过TaskFactory.StartNew所创建任务的初始状态。表示正在等待调度器挑选自己并运行。
任务开始执行,状态就变为TaskStatus.Runing。如果还有子任务,主任务的状态会转变到TaskStatus.WaitingForChildrenToComplete状态。并最终到达,Canceled,Faulted和RunToCompletion 三种状态。从字面理解就是任务取消,出错和完成。
2.任务并行。
前面我们通过Parallel.Invoke来并行加载方法。
Parallel.Invoke(GenerateAESKeys,GenerateMD5Has);
通过Task实例也能完成同样的工作。
var t1 = new Task(GenerateAESKeys); var t2 = new Task(GenerateMD5Has); t1.Start(); t2.Start(); Task.WaitAll(t1, t2);
Start方法对委托进行初始化。 WaitAll方法会等待两个任务的执行完成之后再往下走。
可以看见,执行过程中,任务的状态不断的发生变化。可以给WaitFor方法加上毫秒数。看任务是否会在指定时间内完成。
if(!Task.WaitAll(new[]{t1,t2},3000)) { Console.WriteLine("任务执行超过3秒"); Console.WriteLine(t1.Status.ToString()); Console.WriteLine(t2.Status.ToString()); }
即使到达了指定时间,任务还是继续执行。
同样任务本身也是可以等待
if (t1.Wait(3000)) { Console.WriteLine("任务t1执行超过3秒"); Console.WriteLine(t1.Status.ToString()); }
3.通过取消标记取消任务。
可以通过CancellationToken 来中断任务的执行。这需要再委托中添加一些代码,创建可以取消的任务。
private static void GenerateMD5HasCancel(CancellationToken ct) { ct.ThrowIfCancellationRequested(); var sw = Stopwatch.StartNew(); for (int i = 0; i < NUM_AES_KEYS; i++) { var md5M = MD5.Create(); byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + i); byte[] result = md5M.ComputeHash(data); string hexString = ConverToHexString(result); ct.ThrowIfCancellationRequested(); } Console.WriteLine("MD5:" + sw.Elapsed.ToString()); }
Console.WriteLine("任务开始..."); var cts = new CancellationTokenSource(); var ct = cts.Token; var sw = Stopwatch.StartNew(); var t1 = Task.Factory.StartNew(() => GenerateMD5HasCancel(ct), ct); var t2 = Task.Factory.StartNew(() => GenerateAESKeysCancel(ct), ct); //1秒后取消任务 Thread.Sleep(1000); cts.Cancel(); try { if (!Task.WaitAll(new[] { t1,t2}, 1000)) { Console.WriteLine("任务执行超过1秒"); Console.WriteLine(t1.Status.ToString()); } } catch (AggregateException ex) { foreach (var exc in ex.InnerExceptions) { Console.WriteLine(exc.ToString()); } if (t1.IsCanceled) { Console.WriteLine("任务1取消了..."); } Console.WriteLine(sw.Elapsed.ToString()); Console.WriteLine("结束"); }
CancellationTokenSource能够初始化取消的请求,而CancellationToken能将这些请求传递给异步操作;上面的方法通过Task类的Factory方法得到一个TaskFactory实例,相比Task直接创建任务,这个实例可以使用更多的功能。而StartNew 等价于用Task构造函数创建一个Task并调用Start方法执行。
直接在Debug下面运行,程序会在异常的地方中断。直接运行exe得到上面的结果。
ThrowIfCancellationRequested在每一次循环迭代都会执行,内部是判断任务取消后抛出一个OperationCanceledException的异常,来避免运行不必要的循环和其他命令。
public void ThrowIfCancellationRequested() { if (IsCancellationRequested) ThrowOperationCanceledException(); } private void ThrowOperationCanceledException() { throw new OperationCanceledException(Environment.GetResourceString("OperationCanceled"), this); }
如果有代码正在等待取消,还会自动抛出一个TaskCanceledException异常。会包含在AggregateException中。
4.处理异常。
修改上面的方法抛出一个异常。
private static void GenerateMD5HasCancel(CancellationToken ct) { ct.ThrowIfCancellationRequested(); //....if (sw.Elapsed.TotalSeconds > 0.5) { throw new TimeoutException("超时异常0.5秒"); } ct.ThrowIfCancellationRequested(); } Console.WriteLine("MD5:" + sw.Elapsed.ToString()); }
修改Main方法的Catch。
if (t1.IsFaulted) { foreach (var exc in ex.InnerExceptions) { Console.WriteLine(exc.ToString()); } Console.WriteLine(t1.Status.ToString()); }
执行结果:
当出现异常时,任务的状态就会转换为Faulted。并不会影响另外一个任务的执行。
5.从任务返回值。
前面的方法都是没有返回值,得到任务的返回值需要使用Task<TResult>实例,TResult要替换为返回的类型。修改AES方法。返回一个指定前缀的List<String>
GenerateMD5HasList:
private static List<string> GenerateMD5HasList(CancellationToken ct, char prefix) { ct.ThrowIfCancellationRequested(); var sw = Stopwatch.StartNew(); var list = new List<string>(); for (int i = 0; i < NUM_AES_KEYS; i++) { var md5M = MD5.Create(); byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + i); byte[] result = md5M.ComputeHash(data); string hexString = ConverToHexString(result); if (hexString[0] == prefix) { list.Add(hexString); } ct.ThrowIfCancellationRequested(); } Console.WriteLine("MD5:" + sw.Elapsed); return list; }