不用任何与创建线程、资源互斥有关系的API写多线程程序

     这次的例子,是一个很简单的控制台,她将面对瞬间提交的百万的数据,而面不改色(CPU、内存非常平稳),队列中始终只保存最新的数据,每次只处理cpu 个数据(我的机器是双核的,所以,在我这里,就是每个CPU一个线程,真正的并行运行哦....),OK不废话,进入正题: 

    呃,既然是实例,那么就直接看代码好了: 

CCR
    

CCR任务调度的原理和策略
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using Microsoft.Ccr.Core;

namespace CCRDemo1
{
    
class Program
    {
        
static void Main(string[] args)
        {
            
int maxiQueueDepth = 10;
            
// step1: 创建一个Dispatcher对象
            Dispatcher dispatcher = new Dispatcher(0"调度器名称");
            
// step2: 创建一个与step1创建对象关联的DispatcherQueue对象
            DispatcherQueue depthThrottledQueue = new DispatcherQueue(
                                            
"任务队列的名称",
                                            
// 关联到该队列的调度器
                                            dispatcher,
                                            
// 队列保存数据的策略:保存最近消息策略
                                            TaskExecutionPolicy.ConstrainQueueDepthDiscardTasks,
                                            
// 队列的深度
                                            maxiQueueDepth
                                            );
            
// step3: 创建一个能够接收整型数据的Port
            Port<int> intPort = new Port<int>();
            
// step4: 把Port与处理函数关联,然后再与DispatcherQueue关联
            Arbiter.Activate(depthThrottledQueue,
                            Arbiter.Receive(
true,
                                        intPort,
                                        
delegate(int i)    // 这里用了一个匿名方法,作为处理函数
                                            {
                                                Thread.Sleep(
2000);
                                                Console.WriteLine(
"[{0}] {1}", DateTime.Now.ToString("o"), i);
                                            }
                                        )
                            );

            
// step5: 快速的提交大量的任务
            Console.WriteLine("[{0}] 开始提交大量的任务", DateTime.Now.ToString("o"));
            
for (int i = 0; i < maxiQueueDepth * 100000; i++)
            {
                
// 把数据Post到intPort内
                intPort.Post(i);
            }
            Console.WriteLine(
"[{0}] 大量任务提交完毕。", DateTime.Now.ToString("o"));

            Console.WriteLine(
"Press any key to exit");
            Console.ReadKey();
            dispatcher.Dispose();
        }
    }
}
CCR
    

CCR任务调度的原理和策略

 

 二、原理讲解

    其实在代码里面的注释,我想应该写的很明白了,不过毕竟是第一个例子,我还是稍微讲些CCR的大致原理(至于详细的实现原理,我后面会专门起文想说,这次主要的目的还是,先领略下CCR的神奇之处):

     首先,多线程是以操作系统线程池的形式被Dispatcher管理的,因此创建线程的工作实际上由Dispatcher代劳了;

     其次,在CCR内,他规定Dispatcher只能处理任务,而这个任务只能从任务队列DispatcherQueue内获取,因此,我们要创建任务队列,并关联上Dispatcher;

     然后,我们把自己要提交给处理函数处理的数据,封装在Port<T>内,Port其实是一个FIFO队列,专门用来接收用户提交的数据的;

     最后,我们要把数据、处理函数、任务队列 组合起来,这就是上面代码中的step4,这步其实做了2个工作:

        1、把port和处理函数,封装为Receive关联起来;

        2、把Receive和DispatcherQueue关联起来;

     这样,我们就完成了,所有的工作。

    总之,CCR提供了一个模式,让我们只需要把需要并发、异步处理的工作,分解为:

        1、输入数据--->post到Port内;

        2、处理过程--->做成委托关联到任务队列中

      这种方法,是的编写多线程程序的工作大大简化了,而且也能够让后台的代码能够被编译器统一优化。

CCR任务调度的原理和策略

二、CCR的任务调度 

    查阅MSDN:http://msdn.microsoft.com/en-us/library/bb648756.aspx  

    可知:(下面引用了Ncindy翻译的部分内容,感谢ncindy的辛苦劳动)

【一】:当一个元素被投递到附加了接收器的port,port的实现中将会发生如下操作:

step1.   为投递进来的元素创建一个容器。容器的类型(IPortElement)允许CCR在不知道元素类型的情况下将元素排队并将元素赋值给Task实例。

step2.   容器被放入队列。

step3.   如 果接收器列表不是null,并且其中有一个以上的接收器,port对象将会调用ReceiverTask.Evaluate方法来让接收器和它里面的仲裁 器层次检测元素是否可以被使用,在这个例子中,Evaluate方法将会返回true,并使用收到的元素和用户的delegate作为参数创建一个 Task<int>实例。

step4.   port使用调用Evaluate方法返回的Task对象作为参数调用taskQueue.Enqueue,注意,当一个接收器是第一次被激活,它会被关联到由Arbiter.Activate方法提供的DispatcherQueue实例。

    当上面的4步完成之后,生成的Task对象现在已经被调度逻辑分发(dealt)给了对应的DispatcherQueue。 

【二】:一旦一个元素被放入DispatcherQueue,接下来将会做如下操作:

step1.   DispatcherQueue向它所属的Dispatcher发信号,告诉Dispatcher一个新的任务可以被执行了。

step2.   Dispatcher通知一个或者多个TaskExecutionWorker类型对象。每个TaskExecutionWorker对象管理一个操作系统线程。它将线程设置到一种高效的休眠状态,直到Dispatcher发出信号通知有元素可以被调度时。

step3.   TaskExecutionWorker对象调用DispatcherQueue.Test方法从队列中获取一个任务。如果是可用的任务,TaskExecutionWorker对象则调用ITask.Execute。

step4.   Task.Execute方法调用关联在task对象上的delegate,并将一个或者多个关联在task上的参数传递进去。 

    总之:在CCR中,线程池处理的任务,是由DispatcherQueue产生的;而DispathcerQueue有是根据用户线程通过Port或PortSet提交给的数据 和 初始化时指定的委托来产生任务的。因此可知影响任务调度的地方有3处:

1、客户端提交数据的地方:Port/PortSet的Post方法;

2、DispatcherQueue产生任务的地方:ReceiverTask的Evaluate方法;

3、Dispacher内线程池处理任务的地方:Task执行关联delegate的Execute方法; 

    而CCR就是通过给上面三处加入调度机制来达到任务调度的负载均衡目的的。 

三、CCR的四种任务调度策略

CCR
    

CCR任务调度的原理和策略
using System;

namespace Microsoft.Ccr.Core
{
    
public enum TaskExecutionPolicy
    {
        Unconstrained 
= 0,
        ConstrainQueueDepthDiscardTasks 
= 1,
        ConstrainQueueDepthThrottleExecution 
= 2,
        ConstrainSchedulingRateDiscardTasks 
= 3,
        ConstrainSchedulingRateThrottleExecution 
= 4,
    }
}
CCR
    

CCR任务调度的原理和策略

这4中策略分别应用在一下场景: 

1、ConstrainQueueDepthDiscardTasks 按队列深度丢弃最旧任务

    适用于:要处理的消息可以丢弃但是必须保存最近N条的情况。这对于CPU处理速度低于消息产生速度的情况很有好处,该策略能够保证丢弃的最旧任务的同时最新的N个任务能都得到调度。特别是在阻塞深度为1的时候,队列中保存的始终都是最新的任务。 

2、ConstrainQueueDepthThrottleExecution 按照队列深度阻塞任务产生

    适用于:消息不是规律产生,而是随机、爆炸性到达的情况。这对于来自网络获知其他机器的消息很相似,该策略保证任务不会被丢失,通过阻塞消息Post到Port/PortSet的方法来降低任务产生的速度。 

3、ConstrainSchedulingRateDiscardTasks 按照固定速度处理消息且丢失未处理的最旧消息

    适用于:处理产生速度有规律的消息,比如播放视频。在这种情况下一般所有的消息已经不是最重要的了,但保存最新的消息却很有意义,该策略能够保证代码会以固定的速度执行,即使消息以爆炸式的速度产生也没关系。 

4、ConstrainSchedulingRateThrottleExecution 按照固定速度处理消息且阻塞任务缠上

    适用于:消息产生源是同一处理器中的另一线程时。该策略会让消息的产生源慢下来,适应消息的处理速度,保证不会有任务丢失。 

四、问题&解答 

问题

 例子程序在运行的时候出现:数据会乱序 和 数据丢失的现象。(下面是例子程序的主要代码,与上一篇代码完全相同,此处贴出,是为了阅读方便) 

CCR
    

CCR任务调度的原理和策略
 1 using System;
 2 using System.Collections.Generic;
 3 using System.Linq;
 4 using System.Text;
 5 using System.Threading;
 6 using Microsoft.Ccr.Core;
 7 
 8 namespace CCRDemo1
 9 {
10     class Program
11     {
12         static void Main(string[] args)
13         {
14             int maxiQueueDepth = 10;
15             // step1: 创建一个Dispatcher对象
16             Dispatcher dispatcher = new Dispatcher(0"调度器名称");
17             // step2: 创建一个与step1创建对象关联的DispatcherQueue对象
18             DispatcherQueue depthThrottledQueue = new DispatcherQueue(
19                                             "任务队列的名称",
20                                             // 关联到该队列的调度器
21                                             dispatcher,
22                                             // 队列保存数据的策略:保存最近消息策略
23                                             TaskExecutionPolicy.ConstrainQueueDepthDiscardTasks,
24                                             // 队列的深度
25                                             maxiQueueDepth
26                                             );
27             // step3: 创建一个能够接收整型数据的Port
28             Port<int> intPort = new Port<int>();
29             // step4: 把Port与处理函数关联,然后再与DispatcherQueue关联
30             Arbiter.Activate(depthThrottledQueue,
31                             Arbiter.Receive(true,
32                                         intPort,
33                                         delegate(int i)    // 这里用了一个匿名方法,作为处理函数
34                                             {
35                                                 Thread.Sleep(2000);
36                                                 Console.WriteLine("[{0}] {1}", DateTime.Now.ToString("o"), i);
37                                             }
38                                         )
39                             );
40 
41             // step5: 快速的提交大量的任务
42             Console.WriteLine("[{0}] 开始提交大量的任务", DateTime.Now.ToString("o"));
43             for (int i = 0; i < maxiQueueDepth * 100000; i++)
44             {
45                 // 把数据Post到intPort内
46                 intPort.Post(i);
47             }
48             Console.WriteLine("[{0}] 大量任务提交完毕。", DateTime.Now.ToString("o"));
49 
50             Console.WriteLine("Press any key to exit");
51             Console.ReadKey();
52             dispatcher.Dispose();
53         }
54     }
55 }
56 
CCR
    

CCR任务调度的原理和策略

 

解答

1、数据乱序问题

    CCR内部创建了多线程池来执行这个匿名方法,而且执行的方式是并发、异步,因此改匿名方法打印出来的数字的顺序自然就应该是不可预知的,也就是说:顺序是乱的;

2、数据丢失问题

    这 个也是正常的,例子代码在瞬间提交的大量数据,提交的速度,远远超过匿名方法处理的速度(里面sleep了2秒),因此这意味着会有大量的任务堆积在 DispatcherQueue内,然而,改队列在创建的时候,已经指明了任务调度策略 为:TaskExecutionPolicy.ConstrainQueueDepthDiscardTasks,因此DispatcherQueue内 只会保存最新的任务,旧的就会丢失。

CCR的心脏Dispatcher。

    在我们使用Dispatcher的时候,最常用到的就是他的构造函数和资源释放函数了,因此本节,先介绍这2类函数,其他的函数因为涉及到DispatcherQueue等内部的运作机制,属于CCR内部的运作调度机制,留待前面这些基础知识交代完毕后再另起一篇细说。

Dispatcher内含的方法、属性预览
CCR
    

CCR任务调度的原理和策略
public sealed class Dispatcher : IDisposable
{
    
// 1、构造函数
    public Dispatcher();
    
public Dispatcher(int threadCount, string threadPoolName);
    
public Dispatcher(int threadCount, ThreadPriority priority,
            
bool useBackgroundThreads, string threadPoolName);
    
public Dispatcher(int threadCount, ThreadPriority priority,
            DispatcherOptions options, 
string threadPoolName);
    
public Dispatcher(int threadCount, ThreadPriority priority,
            DispatcherOptions options,
            ApartmentState threadApartmentState,
            
string threadPoolName);

    
// 2、资源释放函数
    public void Dispose();

    
// 3、属性
    public static ICollection<Causality> ActiveCausalities { get; }
    
public List<DispatcherQueue> DispatcherQueues { get; }
    
public string Name { getset; }
    
public DispatcherOptions Options { getset; }
    
public int PendingTaskCount { getset; }
    
public long ProcessedTaskCount { getset; }
    
public static int ThreadsPerCpu { getset; }
    
public int WorkerThreadCount { getset; }

    
// 4、因果关系函数
    public static void AddCausality(Causality causality);
    
public static void AddCausalityBreak();
    
public static void ClearCausalities();
    
public static bool RemoveCausality(Causality causality);
    
public static bool RemoveCausality(string name);
}
CCR
    

CCR任务调度的原理和策略

一、构造函数
     Dispatcher给我们提供了5个构造函数,其实是为了方便大家使用而对一个构造函数做的封装,我们看看具体的实现代码(我在关键的地方都加上了中文的注释):

/// Constructs a Dispatcher instance using the default number of threads and no friendly tag
  3CCR
    

CCR任务调度的原理和策略/// 使用默认线程数和空名作为参数构造一个Dispatcher实例
  4CCR
    

CCR任务调度的原理和策略/// </summary>

  5CCR
    

CCR任务调度的原理和策略public Dispatcher()
  6CCR
    

CCR任务调度的原理和策略    : this(0null)
  7}
  9CCR
    

CCR任务调度的原理和策略
 10/// Constructs a Dispatcher instance. 
 12CCR
    

CCR任务调度的原理和策略/// The instance is usable only after AddPort is called at least once
 13CCR
    

CCR任务调度的原理和策略/// 该实例仅在调用AddPort之后才生效
 14CCR
    

CCR任务调度的原理和策略/// </summary>
 15CCR
    

CCR任务调度的原理和策略/// <param name="threadCount">
 16CCR
    

CCR任务调度的原理和策略/// Number of OS threads to use for processing CCR Tasks
 17CCR
    

CCR任务调度的原理和策略/// 处理CCR任务的操作系统线程数
 18CCR
    

CCR任务调度的原理和策略/// </param>
 19CCR
    

CCR任务调度的原理和策略/// <param name="threadPoolName">
 20CCR
    

CCR任务调度的原理和策略/// Friendly name to use for the OS Threads and this dispatcher instance
 21CCR
    

CCR任务调度的原理和策略/// Dispatcher实例和系统线程的名称
 22CCR
    

CCR任务调度的原理和策略/// </param>
 23CCR
    

CCR任务调度的原理和策略public Dispatcher(int threadCount, string threadPoolName)
 24CCR
    

CCR任务调度的原理和策略    : this(threadCount, ThreadPriority.Normal, DispatcherOptions.None, threadPoolName)
 25}
 27CCR
    

CCR任务调度的原理和策略
 28/// Constructs a Dispatcher instance. 
 30CCR
    

CCR任务调度的原理和策略/// The instance is usable only after AddPort is called at least once
 31CCR
    

CCR任务调度的原理和策略/// </summary>
 32CCR
    

CCR任务调度的原理和策略/// <param name="threadCount">
 33CCR
    

CCR任务调度的原理和策略/// Number of OS threads to use for processing CCR Tasks
 34CCR
    

CCR任务调度的原理和策略/// </param>
 35CCR
    

CCR任务调度的原理和策略/// <param name="priority">
 36CCR
    

CCR任务调度的原理和策略/// OS Thread priority to use for threads exexuting CCR tasks
 37CCR
    

CCR任务调度的原理和策略/// 系统线程执行CCR任务所使用的优先级
 38CCR
    

CCR任务调度的原理和策略/// </param>
 39CCR
    

CCR任务调度的原理和策略/// <param name="options">
 40CCR
    

CCR任务调度的原理和策略/// Dispatcher scheduling options
 41CCR
    

CCR任务调度的原理和策略/// 调度选项
 42CCR
    

CCR任务调度的原理和策略/// </param>
 43CCR
    

CCR任务调度的原理和策略/// <param name="threadPoolName">
 44CCR
    

CCR任务调度的原理和策略/// Friendly name to use for the OS Threads and this dispatcher instance
 45CCR
    

CCR任务调度的原理和策略/// </param>
 46CCR
    

CCR任务调度的原理和策略public Dispatcher(int threadCount, ThreadPriority priority, DispatcherOptions options, string threadPoolName)
 47CCR
    

CCR任务调度的原理和策略    : this(threadCount, priority, options, ApartmentState.Unknown, threadPoolName)
 48}
 50CCR
    

CCR任务调度的原理和策略
 51/// Constructs a Dispatcher instance. 
 53CCR
    

CCR任务调度的原理和策略/// The instance is usable only after AddPort is called at least once
 54CCR
    

CCR任务调度的原理和策略/// </summary>
 55CCR
    

CCR任务调度的原理和策略/// <param name="threadCount">
 56CCR
    

CCR任务调度的原理和策略/// Number of OS threads to use for processing CCR Tasks
 57CCR
    

CCR任务调度的原理和策略/// </param>
 58CCR
    

CCR任务调度的原理和策略/// <param name="priority">
 59CCR
    

CCR任务调度的原理和策略/// OS Thread priority to use for threads exexuting CCR tasks
 60CCR
    

CCR任务调度的原理和策略/// </param>
 61CCR
    

CCR任务调度的原理和策略/// <param name="useBackgroundThreads">
 62CCR
    

CCR任务调度的原理和策略/// If true, background threads are used, which do not prevent application exit
 63CCR
    

CCR任务调度的原理和策略/// 是否使用后台线程,若是用,则应用程序可以自由的退出,而不用担心
 64CCR
    

CCR任务调度的原理和策略/// </param>
 65CCR
    

CCR任务调度的原理和策略/// <param name="threadPoolName">
 66CCR
    

CCR任务调度的原理和策略/// Friendly name to use for the OS Threads and this dispatcher instance
 67CCR
    

CCR任务调度的原理和策略/// </param>
 68CCR
    

CCR任务调度的原理和策略public Dispatcher(int threadCount, ThreadPriority priority, bool useBackgroundThreads, string threadPoolName)
 69CCR
    

CCR任务调度的原理和策略    : this(threadCount,
 70CCR
    

CCR任务调度的原理和策略            priority,
 71CCR
    

CCR任务调度的原理和策略            useBackgroundThreads ? DispatcherOptions.UseBackgroundThreads : DispatcherOptions.None,
 72CCR
    

CCR任务调度的原理和策略            threadPoolName)
 73}
 75CCR
    

CCR任务调度的原理和策略
 76/// Constructs a Dispatcher instance. 
 78CCR
    

CCR任务调度的原理和策略/// The instance is usable only after AddPort is called at least once
 79CCR
    

CCR任务调度的原理和策略/// </summary>
 80CCR
    

CCR任务调度的原理和策略/// <param name="threadCount">
 81CCR
    

CCR任务调度的原理和策略/// Number of OS threads to use for processing CCR Tasks
 82CCR
    

CCR任务调度的原理和策略/// </param>
 83CCR
    

CCR任务调度的原理和策略/// <param name="priority">
 84CCR
    

CCR任务调度的原理和策略/// OS Thread priority to use for threads exexuting CCR tasks
 85CCR
    

CCR任务调度的原理和策略/// </param>
 86CCR
    

CCR任务调度的原理和策略/// <param name="options">
 87CCR
    

CCR任务调度的原理和策略/// Dispatcher scheduling options
 88CCR
    

CCR任务调度的原理和策略/// </param>
 89CCR
    

CCR任务调度的原理和策略/// <param name="threadApartmentState">
 90CCR
    

CCR任务调度的原理和策略/// Thread apartment state. Use ApartmentState.Unknown when STA/MTA is not required for interop
 91CCR
    

CCR任务调度的原理和策略/// 线程单元状态,当COM interop不需要STA/MTA的时候可以使用ApartmentState.Unknown
 92CCR
    

CCR任务调度的原理和策略/// </param>
 93CCR
    

CCR任务调度的原理和策略/// <param name="threadPoolName">
 94CCR
    

CCR任务调度的原理和策略/// Friendly name to use for the OS Threads and this dispatcher instance
 95CCR
    

CCR任务调度的原理和策略/// </param>
 96CCR
    

CCR任务调度的原理和策略public Dispatcher(int threadCount, ThreadPriority priority, DispatcherOptions options,
 97CCR
    

CCR任务调度的原理和策略                    ApartmentState threadApartmentState, string threadPoolName)
 98    this._startupCompleteEvent = new ManualResetEvent(false);
100CCR
    

CCR任务调度的原理和策略    this._dispatcherQueues = new List<DispatcherQueue>();
101CCR
    

CCR任务调度的原理和策略    this._taskExecutionWorkers = new List<TaskExecutionWorker>();
102CCR
    

CCR任务调度的原理和策略    this._nameToQueueTable = new Dictionary<string, DispatcherQueue>();
103CCR
    

CCR任务调度的原理和策略
104CCR
    

CCR任务调度的原理和策略    // 线程数
105CCR
    

CCR任务调度的原理和策略    if (threadCount == 0)
106            threadCount = Math.Max(NumberOfProcessorsInternal, 2* ThreadsPerCpu;
108CCR
    

CCR任务调度的原理和策略    }
109CCR
    

CCR任务调度的原理和策略    else if (threadCount < 0)
110            throw new ArgumentException("Cannot create a negative number of threads. Pass 0 to use default.""threadCount");
112CCR
    

CCR任务调度的原理和策略    }
113CCR
    

CCR任务调度的原理和策略
114CCR
    

CCR任务调度的原理和策略    if (threadPoolName == null)
115            this._name = string.Empty;
117CCR
    

CCR任务调度的原理和策略    }
118CCR
    

CCR任务调度的原理和策略    else
119            this._name = threadPoolName;
121CCR
    

CCR任务调度的原理和策略    }
122CCR
    

CCR任务调度的原理和策略
123CCR
    

CCR任务调度的原理和策略    this._options = options;
124CCR
    

CCR任务调度的原理和策略
125CCR
    

CCR任务调度的原理和策略    for (int i = 0; i < threadCount; i++)
126            this.AddWorker(priority, threadApartmentState);
128CCR
    

CCR任务调度的原理和策略    }
129CCR
    

CCR任务调度的原理和策略
130CCR
    

CCR任务调度的原理和策略    this.StartWorkers();
131CCR
    

CCR任务调度的原理和策略}
132CCR
    

CCR任务调度的原理和策略
133/// Creates one TaskExecutionWorker instance associated with one OS thread
135CCR
    

CCR任务调度的原理和策略/// 创建一个与线程想关联的TaskExecutionWorker实例
136CCR
    

CCR任务调度的原理和策略/// </summary>
137CCR
    

CCR任务调度的原理和策略/// <remarks>
138CCR
    

CCR任务调度的原理和策略/// This routine should only be called once per dispatcher instance
139CCR
    

CCR任务调度的原理和策略/// 每个Dispatcher实例只能调用该函数一次
140CCR
    

CCR任务调度的原理和策略/// </remarks>
141CCR
    

CCR任务调度的原理和策略private void AddWorker(ThreadPriority priority, ApartmentState apartmentState)
142    TaskExecutionWorker item = new TaskExecutionWorker(this);
144CCR
    

CCR任务调度的原理和策略
145CCR
    

CCR任务调度的原理和策略    Thread thread = new Thread(new ThreadStart(item.ExecutionLoop));
146CCR
    

CCR任务调度的原理和策略    thread.SetApartmentState(apartmentState);
147CCR
    

CCR任务调度的原理和策略    thread.Name = this._name;
148CCR
    

CCR任务调度的原理和策略    thread.Priority = priority;
149CCR
    

CCR任务调度的原理和策略    thread.IsBackground = DispatcherOptions.None < (this._options & DispatcherOptions.UseBackgroundThreads);
150CCR
    

CCR任务调度的原理和策略
151CCR
    

CCR任务调度的原理和策略    item._thread = thread;
152CCR
    

CCR任务调度的原理和策略
153CCR
    

CCR任务调度的原理和策略    this._taskExecutionWorkers.Add(item);
154CCR
    

CCR任务调度的原理和策略    this._cachedWorkerListCount++;
155CCR
    

CCR任务调度的原理和策略}

二、Dispose函数
    使用该函数才能够释放Dispatcher内创建的线程池等资源,记得在退出程序之前,调用它,内部的实现细节很简单,详细请参考代码。

}

三、几个重要的枚举
1、 ThreadPriority指定Thread 的调度优先级。

CCR
    

CCR任务调度的原理和策略
public enum ThreadPriority
{
    
// 可以将 System.Threading.Thread 安排在具有任何其他优先级的线程之后。
    Lowest = 0,
    
// 可以将 System.Threading.Thread 安排在具有 Normal 优先级的线程之后,
    
// 在具有 Lowest 优先级的线程之前。
    BelowNormal = 1,
    
// 可以将 System.Threading.Thread 安排在具有 AboveNormal 优先级的线程之后,
    
// 在具有 BelowNormal 优先级的线程之前。默认情况下,线程具有
    
// Normal 优先级。
    Normal = 2,
    
// 可以将 System.Threading.Thread 安排在具有 Highest 优先级的线程之后,
    
// 在具有 Normal 优先级的线程之前。
    AboveNormal = 3,
    
// 可以将 System.Threading.Thread 安排在具有任何其他优先级的线程之前。
    Highest = 4,
}
CCR
    

CCR任务调度的原理和策略

2、DispatcherOptions: 线程池运行选项

CCR
    

CCR任务调度的原理和策略
public enum DispatcherOptions
{
    None,
    
/// <summary>
    
/// 时候后台线程
    
/// </summary>
    UseBackgroundThreads,
    
/// <summary>
    
/// 按照CPU来分配线程
    
/// </summary>
    UseProcessorAffinity
}
CCR
    

CCR任务调度的原理和策略

3、ApartmentState: 指定 System.Threading.Thread 的单元状态

CCR
    

CCR任务调度的原理和策略
public enum ApartmentState
{
    
// System.Threading.Thread 将创建并进入一个单线程单元。
    STA = 0,
    
// System.Threading.Thread 将创建并进入一个多线程单元。
    MTA = 1,
    
// 尚未设置 System.Threading.Thread.ApartmentState 属性。
    Unknown = 2,
}
DispatcherQueue是整个CCR中,唯一能让用户与Dispatcher交互的地方,所有外界想与Dispatcher交互都只能直接或间接通过DispatcherQueu来做,可见DispatcherQueue在整个CCR中处于“咽喉”之地。

二、预览
    DispatcherQueue类内,主要实现了5部分的功能:
1、构造实例对象;

CCR
    

CCR任务调度的原理和策略
public DispatcherQueue();
public DispatcherQueue(string name);
public DispatcherQueue(string name, Dispatcher dispatcher);
public DispatcherQueue(string name, Dispatcher dispatcher, 
            TaskExecutionPolicy policy, 
double schedulingRate);
public DispatcherQueue(string name, Dispatcher dispatcher, 
            TaskExecutionPolicy policy, 
int maximumQueueDepth);
CCR
    

CCR任务调度的原理和策略

2、销毁相关资源;

public void Dispose();
protected virtual void Dispose(bool disposing);

 

3、任务进队、出队操作;

public virtual bool Enqueue(ITask task);
public virtual bool TryDequeue(out ITask task);

4、队列状态控制;

public virtual void Suspend();
public virtual void Resume();

5、因果关系相关控制;

public virtual Timer EnqueueTimer(TimeSpan timeSpan, Port<DateTime> timerPort);


    这5个功能中,常用的是前面4个,因此本基础篇就只讲这4个功能相关的函数的使用和实现原理,最后一个功能不常用,计划与Dispatcher内的因果关系部分放在后面作为高级篇细说。

三、构造函数
    DispatcherQueue类内根据所使用的线程池的种类不同,而分为2类:
1、一类是使用CLR的线程池的构造函数:

CCR
    

CCR任务调度的原理和策略
/// <summary>
/// Default constructor
/// 默认构造函数
/// </summary>
public DispatcherQueue()
    : 
this("Unnamed queue using CLR Threadpool")
{
}

/// <summary>
/// Constructs an instance of the dispatcher port using the CLR thread pool for task execution
/// 构建一个不使用CCR的线程池,而是使用CLR线程池执行任务的实例
/// </summary>
/// <param name="name">
/// 名称
/// </param>
public DispatcherQueue(string name)
{
    
this._taskQueue = new Store<ITask>();
    
this._timescale = 1.0;
    
this._timerTable = new Dictionary<long, Timer>();
    
this._name = name;
}
CCR
    

CCR任务调度的原理和策略

2、一类是使用CCR线程池(也即操作系统线程池)的构造函数:

 name, Dispatcher dispatcher)
    : 
this(name, dispatcher, TaskExecutionPolicy.Unconstrained, 01.0)
{
}

/// <summary>
/// Constructs an instance of the dispatcher port using the specified CCR dispatcher
/// </summary>
/// <param name="name">Friendly name</param>
/// <param name="dispatcher">
/// Dispatcher instance for executing tasks
/// 执行任务的Dispatcher实例
/// </param>
/// <param name="policy">
/// Task scheduling policy
/// 任务调度策略
/// </param>
/// <param name="schedulingRate">
/// Average desired scheduling rate, in tasks per second. 
/// 期望的任务平均调度速率(每秒执行几个任务)
/// Only valid when appropriate policy is specified
/// 仅当指定对应策略的时候才生效
/// </param>
public DispatcherQueue(string name, Dispatcher dispatcher, TaskExecutionPolicy policy, double schedulingRate)
    : 
this(name, dispatcher, policy, 0, schedulingRate)
{
}

/// <summary>
/// Constructs an instance of the dispatcher port using the specified CCR dispatcher
/// </summary>
/// <param name="name">
/// Friendly name
/// </param>
/// <param name="dispatcher">
/// Dispatcher instance for executing tasks
/// </param>
/// <param name="policy">
/// Task scheduling policy
/// </param>
/// <param name="maximumQueueDepth">
/// Maximum number of pending tasks. 
/// 最大待处理任务数
/// Only valid when appropriate policy is specified
/// 仅当指定对应策略的时候才生效
/// </param>
public DispatcherQueue(string name, Dispatcher dispatcher, TaskExecutionPolicy policy, int maximumQueueDepth)
    : 
this(name, dispatcher, policy, maximumQueueDepth, 0.0)
{
}

/// <summary>
/// Constructs an instance of the dispatcher port using the specified CCR dispatcher
/// </summary>
/// <param name="name">
/// Friendly name
/// </param>
/// <param name="dispatcher">
/// Dispatcher instance for executing tasks
/// </param>
/// <param name="policy">
/// Task scheduling policy
/// </param>
/// <param name="maximumQueueDepth">
/// Maximum number of pending tasks. 
/// Only used when appropriate policy is specified
/// </param>
/// <param name="schedulingRate">
/// Average desired scheduling rate, in tasks per second. 
/// Only used when appropriate policy is specified
/// </param>
private DispatcherQueue(string name, Dispatcher dispatcher, TaskExecutionPolicy policy,
                        
int maximumQueueDepth, double schedulingRate)
{
    
// 1.初始化 任务队列、时间刻度、定时器表
    this._taskQueue = new Store<ITask>();
    
this._timescale = 1.0;
    
this._timerTable = new Dictionary<long, Timer>();
    
// 2.初始化 任务调度策略、
    if (dispatcher == null)
    {
        
throw new ArgumentNullException("dispatcher");
    }
    
if (((policy == TaskExecutionPolicy.ConstrainQueueDepthDiscardTasks) ||
        (policy 
== TaskExecutionPolicy.ConstrainQueueDepthThrottleExecution)) &&
        (maximumQueueDepth 
<= 0))
    {
        
throw new ArgumentOutOfRangeException("maximumQueueDepth");
    }
    
if (((policy == TaskExecutionPolicy.ConstrainSchedulingRateDiscardTasks) ||
        (policy 
== TaskExecutionPolicy.ConstrainSchedulingRateThrottleExecution)) &&
        (schedulingRate 
<= 0.0))
    {
        
throw new ArgumentOutOfRangeException("schedulingRate");
    }
    
this._dispatcher = dispatcher;
    
this._name = name;
    
this._policy = policy;
    
this._maximumQueueDepth = maximumQueueDepth;
    
this._maximumSchedulingRate = schedulingRate;
    
// 3.把DispatcherQueue关联到指定的Dispatcher上
    dispatcher.AddQueue(name, this);
    
// 4.判断是否需要开启CCR秒表
    if (policy >= TaskExecutionPolicy.ConstrainSchedulingRateDiscardTasks)
    {
        
this._watch = CcrStopwatch.StartNew();
    }
}

四、资源释放函数
    DispatcherQueue内含任务队列,因此也需要做资源的释放,而且改函数的调用应该在Diapatcher的Dispose函数调用之前,详细原因看代码:

 Dispose()
{
    
this.Dispose(true);
    GC.SuppressFinalize(
this);
}

/// <summary>
/// Implementation of dispose
/// </summary>
/// <param name="disposing"></param>
protected virtual void Dispose(bool disposing)
{
    
// 移除对应Dispatcher与当前DispatcherQueue的关联
    if ((disposing &&
        (
this._dispatcher != null)) &&
        
!this._dispatcher.RemoveQueue(this._name))
    {
        
// 释放任务队列内未处理的任务
        int elementCount = 0;
        
lock (this._taskQueue)
        {
            elementCount 
= this._taskQueue.ElementCount;
            
this._taskQueue = null;
        }
        
// 调整对应Dispatcher内的未处理任务数
        this._dispatcher.AdjustPendingCount(-elementCount);
    }
}

五、任务操作函数
    用户除了可以通过Port向DispatcherQueue推入任务外,还可以自己生产ITask任务,然后把它推入DiapatcherQueue内调度执行。为此DispatcherQueue具备了进队、出队2个功
能。而且前面第三篇所提到的任务调度策略,也是在进队这个函数内实现的。

/// Enqueue ITask instance
CCR
    

CCR任务调度的原理和策略
/// 任务实例进队
CCR
    

CCR任务调度的原理和策略
/// </summary>
CCR
    

CCR任务调度的原理和策略
/// <exception cref="T:Microsoft.Ccr.Core.PortNotFoundException">
CCR
    

CCR任务调度的原理和策略
/// Thrown if message type is not derived from ITask
CCR
    

CCR任务调度的原理和策略
/// 若消息没有实现ITask接口,则抛出异常
CCR
    

CCR任务调度的原理和策略
/// </exception>
CCR
    

CCR任务调度的原理和策略
/// <param name="task">
CCR
    

CCR任务调度的原理和策略
/// ITask instance
CCR
    

CCR任务调度的原理和策略
/// 任务实例
CCR
    

CCR任务调度的原理和策略
/// </param>

CCR
    

CCR任务调度的原理和策略public virtual bool Enqueue(ITask task)
    bool flag = true;
CCR
    

CCR任务调度的原理和策略    
// 1.空消息异常
CCR
    

CCR任务调度的原理和策略
    if (task == null)
    
        throw new ArgumentNullException("message");
CCR
    

CCR任务调度的原理和策略    }

CCR
    

CCR任务调度的原理和策略    task.TaskQueue 
= this;
CCR
    

CCR任务调度的原理和策略    
// 2.CCR线程池
CCR
    

CCR任务调度的原理和策略
    if (this._dispatcher == null)
    
        lock (this._taskQueue)
        
            this._scheduledTaskCount += 1L;
CCR
    

CCR任务调度的原理和策略        }

CCR
    

CCR任务调度的原理和策略        ThreadPool.QueueUserWorkItem(
new WaitCallback(TaskExecutionWorker.ThreadPoolExecute), task);
CCR
    

CCR任务调度的原理和策略        
return flag;
CCR
    

CCR任务调度的原理和策略    }

CCR
    

CCR任务调度的原理和策略    
// 3.任务进队列
CCR
    

CCR任务调度的原理和策略
    if (this._taskQueue == null)
    
        throw new ObjectDisposedException(typeof(DispatcherQueue).Name + ":" + this.Name);
CCR
    

CCR任务调度的原理和策略    }

CCR
    

CCR任务调度的原理和策略    
lock (this._taskQueue)
    
        // 4.根据不同的任务调度策略,来把任务插入队列不同的位置
CCR
    

CCR任务调度的原理和策略
        int num;
CCR
    

CCR任务调度的原理和策略        
switch (this._policy)
        
            case TaskExecutionPolicy.Unconstrained:
CCR
    

CCR任务调度的原理和策略                
this._taskQueue.ElementListAddLast(new PortElement<ITask>(task));
CCR
    

CCR任务调度的原理和策略                
goto Label_0285;
CCR
    

CCR任务调度的原理和策略
CCR
    

CCR任务调度的原理和策略            
case TaskExecutionPolicy.ConstrainQueueDepthDiscardTasks:
CCR
    

CCR任务调度的原理和策略                
if (this._taskQueue.ElementCount >= this._maximumQueueDepth)
                
                    Dispatcher.LogInfo(
CCR
    

CCR任务调度的原理和策略                        
"DispatcherQueue.Enqueue: Discarding oldest task because queue depth limit reached");
CCR
    

CCR任务调度的原理和策略                    
this._taskQueue.ElementListRemoveFirst();
CCR
    

CCR任务调度的原理和策略                    Interlocked.Decrement(
ref this._dispatcher._pendingTaskCount);
CCR
    

CCR任务调度的原理和策略                    flag 
= false;
CCR
    

CCR任务调度的原理和策略                }

CCR
    

CCR任务调度的原理和策略                
this._taskQueue.ElementListAddLast(new PortElement<ITask>(task));
CCR
    

CCR任务调度的原理和策略                
goto Label_0285;
CCR
    

CCR任务调度的原理和策略
CCR
    

CCR任务调度的原理和策略            
case TaskExecutionPolicy.ConstrainQueueDepthThrottleExecution:
CCR
    

CCR任务调度的原理和策略                
if (this._taskQueue.ElementCount < this._maximumQueueDepth)
                
                    goto Label_0172;
CCR
    

CCR任务调度的原理和策略                }

CCR
    

CCR任务调度的原理和策略                Microsoft.Ccr.Core.Dispatcher.LogInfo(
CCR
    

CCR任务调度的原理和策略                    
"DispatcherQueue.Enqueue: Forcing thread sleep because queue depth limit reached");
CCR
    

CCR任务调度的原理和策略                
goto Label_015F;
CCR
    

CCR任务调度的原理和策略
CCR
    

CCR任务调度的原理和策略            
case TaskExecutionPolicy.ConstrainSchedulingRateDiscardTasks:
CCR
    

CCR任务调度的原理和策略                
this.RecalculateSchedulingRate();
CCR
    

CCR任务调度的原理和策略                
if (this._currentSchedulingRate >= this._maximumSchedulingRate)
                
                    Microsoft.Ccr.Core.Dispatcher.LogInfo(
CCR
    

CCR任务调度的原理和策略                        
"DispatcherQueue.Enqueue: Discarding task because task scheduling rate exceeded");
CCR
    

CCR任务调度的原理和策略                    
this._taskQueue.ElementListRemoveFirst();
CCR
    

CCR任务调度的原理和策略                    Interlocked.Decrement(
ref this._dispatcher._pendingTaskCount);
CCR
    

CCR任务调度的原理和策略                    
this.RecalculateSchedulingRate();
CCR
    

CCR任务调度的原理和策略                    flag 
= false;
CCR
    

CCR任务调度的原理和策略                }

CCR
    

CCR任务调度的原理和策略                
this._scheduledItems++;
CCR
    

CCR任务调度的原理和策略                
this._taskQueue.ElementListAddLast(new PortElement<ITask>(task));
CCR
    

CCR任务调度的原理和策略                
goto Label_0285;
CCR
    

CCR任务调度的原理和策略
CCR
    

CCR任务调度的原理和策略            
case TaskExecutionPolicy.ConstrainSchedulingRateThrottleExecution:
CCR
    

CCR任务调度的原理和策略                
this.RecalculateSchedulingRate();
CCR
    

CCR任务调度的原理和策略                
if (this._currentSchedulingRate < this._maximumSchedulingRate)
                
                    goto Label_025E;
CCR
    

CCR任务调度的原理和策略                }

CCR
    

CCR任务调度的原理和策略                num 
= 1;
CCR
    

CCR任务调度的原理和策略                Microsoft.Ccr.Core.Dispatcher.LogInfo(
CCR
    

CCR任务调度的原理和策略                    
"DispatcherQueue.Enqueue: Forcing thread sleep because task scheduling rate exceeded");
CCR
    

CCR任务调度的原理和策略                
goto Label_0250;
CCR
    

CCR任务调度的原理和策略
CCR
    

CCR任务调度的原理和策略            
default:
CCR
    

CCR任务调度的原理和策略                
goto Label_0285;
CCR
    

CCR任务调度的原理和策略        }

CCR
    

CCR任务调度的原理和策略    Label_0142:
CCR
    

CCR任务调度的原理和策略        Monitor.Exit(
this._taskQueue);
CCR
    

CCR任务调度的原理和策略        Thread.Sleep(
100);
CCR
    

CCR任务调度的原理和策略        Monitor.Enter(
this._taskQueue);
CCR
    

CCR任务调度的原理和策略    Label_015F:
CCR
    

CCR任务调度的原理和策略        
if (this._taskQueue.ElementCount >= this._maximumQueueDepth)
        
            goto Label_0142;
CCR
    

CCR任务调度的原理和策略        }

CCR
    

CCR任务调度的原理和策略    Label_0172:
CCR
    

CCR任务调度的原理和策略        
this._taskQueue.ElementListAddLast(new PortElement<ITask>(task));
CCR
    

CCR任务调度的原理和策略        
goto Label_0285;
CCR
    

CCR任务调度的原理和策略    Label_0219:
CCR
    

CCR任务调度的原理和策略        Monitor.Exit(
this._taskQueue);
CCR
    

CCR任务调度的原理和策略        Thread.Sleep(
100 + num);
CCR
    

CCR任务调度的原理和策略        num 
*= 2;
CCR
    

CCR任务调度的原理和策略        
if (num > 0x3e8)
        
            num = 0x3e8;
CCR
    

CCR任务调度的原理和策略        }

CCR
    

CCR任务调度的原理和策略        Monitor.Enter(
this._taskQueue);
CCR
    

CCR任务调度的原理和策略        
this.RecalculateSchedulingRate();
CCR
    

CCR任务调度的原理和策略    Label_0250:
CCR
    

CCR任务调度的原理和策略        
if (this._currentSchedulingRate > this._maximumSchedulingRate)
        
            goto Label_0219;
CCR
    

CCR任务调度的原理和策略        }

CCR
    

CCR任务调度的原理和策略    Label_025E:
CCR
    

CCR任务调度的原理和策略        
this._scheduledItems++;
CCR
    

CCR任务调度的原理和策略        
this._taskQueue.ElementListAddLast(new PortElement<ITask>(task));
CCR
    

CCR任务调度的原理和策略    Label_0285:
CCR
    

CCR任务调度的原理和策略        
this._scheduledTaskCount += 1L;
CCR
    

CCR任务调度的原理和策略    }

CCR
    

CCR任务调度的原理和策略    
this._dispatcher.Signal();
CCR
    

CCR任务调度的原理和策略    
return flag;
CCR
    

CCR任务调度的原理和策略}

CCR
    

CCR任务调度的原理和策略
CCR
    

CCR任务调度的原理和策略
// 重新计算调度速率
CCR
    

CCR任务调度的原理和策略
private void RecalculateSchedulingRate()
    this._currentSchedulingRate = this._scheduledItems / this._watch.Elapsed.TotalSeconds;
CCR
    

CCR任务调度的原理和策略}

CCR
    

CCR任务调度的原理和策略
/// Atomically removes an ITask instance if the port is non empty
CCR
    

CCR任务调度的原理和策略
/// 若当前待执行任务队列不空,则原子地取走一个任务
CCR
    

CCR任务调度的原理和策略
/// </summary>
CCR
    

CCR任务调度的原理和策略
/// <param name="task">
CCR
    

CCR任务调度的原理和策略
/// ITask instance if port is not empty. Null otherwise
CCR
    

CCR任务调度的原理和策略
/// 任务实例
CCR
    

CCR任务调度的原理和策略
/// </param>
CCR
    

CCR任务调度的原理和策略
/// <returns>
CCR
    

CCR任务调度的原理和策略
/// True if port is not empty
CCR
    

CCR任务调度的原理和策略
/// 若队列不空则返回True
CCR
    

CCR任务调度的原理和策略
/// </returns>
CCR
    

CCR任务调度的原理和策略public virtual bool TryDequeue(out ITask task)
    // 1.异常判断
CCR
    

CCR任务调度的原理和策略
    if (this._dispatcher == null)
    
        // 不支持CCR线程池
CCR
    

CCR任务调度的原理和策略
        throw new InvalidOperationException(Resource1.DispatcherPortTestNotValidInThreadpoolMode);
CCR
    

CCR任务调度的原理和策略    }

CCR
    

CCR任务调度的原理和策略    
if (this._taskQueue == null)
    
        // 任务队列已经被释放了
CCR
    

CCR任务调度的原理和策略
        throw new ObjectDisposedException(typeof(DispatcherQueue).Name + ":" + this.Name);
CCR
    

CCR任务调度的原理和策略    }

CCR
    

CCR任务调度的原理和策略    
// 2.从任务队列的头部取一个任务
CCR
    

CCR任务调度的原理和策略
    lock (this._taskQueue)
    
        // 判断是否挂起
CCR
    

CCR任务调度的原理和策略
        if (this._isSuspended)
        
            task = null;
CCR
    

CCR任务调度的原理和策略            
return false;
CCR
    

CCR任务调度的原理和策略        }

CCR
    

CCR任务调度的原理和策略        
if (this._taskQueue.ElementCount > 0)
        
            task = this._taskQueue.ElementListRemoveFirst().TypedItem;
CCR
    

CCR任务调度的原理和策略        }

CCR
    

CCR任务调度的原理和策略        
else
        
            task = null;
CCR
    

CCR任务调度的原理和策略            
return false;
CCR
    

CCR任务调度的原理和策略        }

CCR
    

CCR任务调度的原理和策略    }

CCR
    

CCR任务调度的原理和策略    Interlocked.Decrement(
ref this._dispatcher._pendingTaskCount);
CCR
    

CCR任务调度的原理和策略    
return true;
CCR
    

CCR任务调度的原理和策略}

六、运行状态控制函数
    DispatcherQueue提供了挂起、恢复的操作,以调度线程池对任务的运行,不过要注意的是,挂起状态下,用户仍然可以向DispatcherQueue提交任务。

 Suspend()
{
    
lock (this._taskQueue)
    {
        
this._isSuspended = true;
    }
}

/// <summary>
/// Resumes execution of tasks, including any tasks queued while in paused state
/// 恢复任务的执行,包含所有处于暂停状态的排队任务
/// </summary>
public virtual void Resume()
{
    
lock (this._taskQueue)
    {
        
this._isSuspended = false;
    }
    
this._dispatcher.Signal();
}

相关文章: