异步的同步构造

  任何使用了内核模式的线程同步构造,我都不是特别喜欢。因为所有这些基元都会阻塞一个线程的运行。创建线程的代价很大。创建了不用,这于情于理说不通。

  创建了reader-writer锁的情况,如果写锁被长时间占有,那么其他的读请求线程都会被阻塞,随着越来越多客户端请求到达,服务器创建了更多的线程,而他们被创建出来的目的就是让他们在锁上停止运行。更糟糕的是,一旦writer锁释放,所有读线程都同时解除阻塞并开始执行。现在,又变成大量的线程试图在相对数量很少的cpu上运行。所以,windows开始在线程之间不同的进行上下文切换,而真正的工作时间却很少。

       锁很流行,但长时间拥有会带来巨大的伸缩性问题。如果代码能通过异步的同步构造指出他想要一个锁,那么会非常有用。在这种情况下,如果线程得不到锁,可直接返回并执行其他工作,而不必在那里傻傻地阻塞。

       SemaphoreSlim通过waitAsync实现了这个思路

public Task<bool> WaitAsync(int millisecondsTimeout, CancellationToken cancellationToken);

使用await asynclock.WaitAsync()就可以实现刚才说的情境。

       但如果是reader-writer呢?.net framework提供了concurrentExclusiveSchedulerPair类。实例代码如下:

private static void ConcurrentExclusiveSchedulerDemo()
{
    var cesp = new ConcurrentExclusiveSchedulerPair();
    var tfExclusive = new TaskFactory(cesp.ExclusiveScheduler);
    var tfConcurrent = new TaskFactory(cesp.ConcurrentScheduler);

    for (int i = 0; i < 5; i++)
    {
        var exclusive = i < 2;
        (exclusive ? tfExclusive : tfConcurrent).StartNew(() =>
        {
            Console.WriteLine("{0} access",exclusive?"exclusive":"concurrent");
            //这里进行独占写入或者并发读取操作
        });
    }
}

       遗憾的是,framework没有提供鞠咏reader-writer语义的异步锁。所以我们可以自己构建一个,如下:

public sealed class AsyncOneManyLock
{
    #region 锁的代码
    //自旋锁不要用readonly
    private SpinLock m_lock = new SpinLock(true);

    private void Lock()
    {
        bool taken = false;m_lock.Enter(ref taken);
    }
    private void Unlock()
    {
        m_lock.Exit();
    }

    #endregion

    #region 锁的状态和辅助方法

    private Int32 m_state = 0;
    private bool IsFree { get { return m_state == 0; } }
    private bool IsOwnedByWriter { get { return m_state == -1; } }
    private bool IsOwnedByReader { get { return m_state > 0; } }
    private Int32 AddReaders(Int32 count) { return m_state += count; }
    private Int32 SubtractReader() { return --m_state; }
    private void MakeWriter() { m_state = -1; }
    private void MakeFree() { m_state = 0; }

    #endregion

    //目的实在非竞态条件时增强性能和减少内存消耗
    private readonly Task m_noContentionAccessGranter;
    //每个等待的writer都通过他们在这里排队的TaskCompletionSource来唤醒
    private readonly Queue<TaskCompletionSource<Object>> m_qWaitingWriters = new Queue<TaskCompletionSource<object>>();
    //一个TaskCompletionSource收到信号,所有等待的reader都唤醒
    private TaskCompletionSource<Object> m_waitingReaderSignal = new TaskCompletionSource<object>();
    private Int32 m_numWaitingReaders = 0;
    public AsyncOneManyLock()
    {
        //创建一个返回null的任务
        m_noContentionAccessGranter = Task.FromResult<Object>(null);
    }
    public Task WaitAsync(OneManyMode mode)
    {
        Task accressGranter = m_noContentionAccessGranter;//假定无竞争
        Lock () ;
        switch (mode)
        {
            case OneManyMode.Exclusive:
                if (IsFree)
                {
                    MakeWriter();//无竞争
                }
                else
                {
                    //有竞争
                    var tcs = new TaskCompletionSource<Object>();
                    m_qWaitingWriters.Enqueue(tcs);
                    accressGranter = tcs.Task;
                }
                break;
            case OneManyMode.Shared:
                if (IsFree||(IsOwnedByReader&&m_qWaitingWriters.Count==0))
                {
                    AddReaders(1);//无竞争
                }
                else
                {
                    //有竞争,递增等待的reader数量,并返回reader任务使reader等待。
                    m_numWaitingReaders++;
                    accressGranter = m_waitingReaderSignal.Task.ContinueWith(t => t.Result);
                }
                break;
        }
        Unlock();
        return accressGranter;
    }

    public void Release()
    {
        //嘉定没有代码被释放
        TaskCompletionSource<Object> accessGranter = null;
        Lock () ;
        if (IsOwnedByWriter)
        {
            MakeFree();
        }
        else
        {
            SubtractReader();
        }
        if (IsFree)
        {
            //如果自由,唤醒一个等待的writer或所有等待的readers
            if (m_qWaitingWriters.Count>0)
            {
                MakeWriter();
                accessGranter = m_qWaitingWriters.Dequeue();
            }
            else if (m_numWaitingReaders>0)
            {
                AddReaders(m_numWaitingReaders);
                m_numWaitingReaders = 0;
                accessGranter = m_waitingReaderSignal;
                //为将来需要等待的readers创建一个新的tcs
                m_waitingReaderSignal = new TaskCompletionSource<object>();
            }
        }
        Unlock();
        //唤醒锁外面的writer/reader,减少竞争几率以提高性能
        if (accessGranter!=null)
        {
            accessGranter.SetResult(null);
        }
    }
}
AsyncOneManyLock

相关文章: