异步的同步构造
任何使用了内核模式的线程同步构造,我都不是特别喜欢。因为所有这些基元都会阻塞一个线程的运行。创建线程的代价很大。创建了不用,这于情于理说不通。
创建了reader-writer锁的情况,如果写锁被长时间占有,那么其他的读请求线程都会被阻塞,随着越来越多客户端请求到达,服务器创建了更多的线程,而他们被创建出来的目的就是让他们在锁上停止运行。更糟糕的是,一旦writer锁释放,所有读线程都同时解除阻塞并开始执行。现在,又变成大量的线程试图在相对数量很少的cpu上运行。所以,windows开始在线程之间不同的进行上下文切换,而真正的工作时间却很少。
锁很流行,但长时间拥有会带来巨大的伸缩性问题。如果代码能通过异步的同步构造指出他想要一个锁,那么会非常有用。在这种情况下,如果线程得不到锁,可直接返回并执行其他工作,而不必在那里傻傻地阻塞。
SemaphoreSlim通过waitAsync实现了这个思路
public Task<bool> WaitAsync(int millisecondsTimeout, CancellationToken cancellationToken);
使用await asynclock.WaitAsync()就可以实现刚才说的情境。
但如果是reader-writer呢?.net framework提供了concurrentExclusiveSchedulerPair类。实例代码如下:
private static void ConcurrentExclusiveSchedulerDemo() { var cesp = new ConcurrentExclusiveSchedulerPair(); var tfExclusive = new TaskFactory(cesp.ExclusiveScheduler); var tfConcurrent = new TaskFactory(cesp.ConcurrentScheduler); for (int i = 0; i < 5; i++) { var exclusive = i < 2; (exclusive ? tfExclusive : tfConcurrent).StartNew(() => { Console.WriteLine("{0} access",exclusive?"exclusive":"concurrent"); //这里进行独占写入或者并发读取操作 }); } }
遗憾的是,framework没有提供鞠咏reader-writer语义的异步锁。所以我们可以自己构建一个,如下:
public sealed class AsyncOneManyLock { #region 锁的代码 //自旋锁不要用readonly private SpinLock m_lock = new SpinLock(true); private void Lock() { bool taken = false;m_lock.Enter(ref taken); } private void Unlock() { m_lock.Exit(); } #endregion #region 锁的状态和辅助方法 private Int32 m_state = 0; private bool IsFree { get { return m_state == 0; } } private bool IsOwnedByWriter { get { return m_state == -1; } } private bool IsOwnedByReader { get { return m_state > 0; } } private Int32 AddReaders(Int32 count) { return m_state += count; } private Int32 SubtractReader() { return --m_state; } private void MakeWriter() { m_state = -1; } private void MakeFree() { m_state = 0; } #endregion //目的实在非竞态条件时增强性能和减少内存消耗 private readonly Task m_noContentionAccessGranter; //每个等待的writer都通过他们在这里排队的TaskCompletionSource来唤醒 private readonly Queue<TaskCompletionSource<Object>> m_qWaitingWriters = new Queue<TaskCompletionSource<object>>(); //一个TaskCompletionSource收到信号,所有等待的reader都唤醒 private TaskCompletionSource<Object> m_waitingReaderSignal = new TaskCompletionSource<object>(); private Int32 m_numWaitingReaders = 0; public AsyncOneManyLock() { //创建一个返回null的任务 m_noContentionAccessGranter = Task.FromResult<Object>(null); } public Task WaitAsync(OneManyMode mode) { Task accressGranter = m_noContentionAccessGranter;//假定无竞争 Lock () ; switch (mode) { case OneManyMode.Exclusive: if (IsFree) { MakeWriter();//无竞争 } else { //有竞争 var tcs = new TaskCompletionSource<Object>(); m_qWaitingWriters.Enqueue(tcs); accressGranter = tcs.Task; } break; case OneManyMode.Shared: if (IsFree||(IsOwnedByReader&&m_qWaitingWriters.Count==0)) { AddReaders(1);//无竞争 } else { //有竞争,递增等待的reader数量,并返回reader任务使reader等待。 m_numWaitingReaders++; accressGranter = m_waitingReaderSignal.Task.ContinueWith(t => t.Result); } break; } Unlock(); return accressGranter; } public void Release() { //嘉定没有代码被释放 TaskCompletionSource<Object> accessGranter = null; Lock () ; if (IsOwnedByWriter) { MakeFree(); } else { SubtractReader(); } if (IsFree) { //如果自由,唤醒一个等待的writer或所有等待的readers if (m_qWaitingWriters.Count>0) { MakeWriter(); accessGranter = m_qWaitingWriters.Dequeue(); } else if (m_numWaitingReaders>0) { AddReaders(m_numWaitingReaders); m_numWaitingReaders = 0; accessGranter = m_waitingReaderSignal; //为将来需要等待的readers创建一个新的tcs m_waitingReaderSignal = new TaskCompletionSource<object>(); } } Unlock(); //唤醒锁外面的writer/reader,减少竞争几率以提高性能 if (accessGranter!=null) { accessGranter.SetResult(null); } } }