通过CAS操作免锁设计:

  • CAS原子 操作(Compare & Set):包含三个操作数,内存值V、旧的预期值 oldval、要修改的新值newval,当且仅当内存V中的值和旧值oldval相同时,将内存V修改为newval。
  • 数组队列是一个循环数组,队列少用一个元素,当头等于尾标示队空,尾加1等于头标示队满。
  • 数组的元素用EMPTY(无数据,标示可以入队)和FULL(有数据,标示可以出队)标记指示,数组一开始全部初始化成 EMPTY标示空队列。
  • EnQue 操作:如果当前队尾位置为EMPTY,标示线程可以在当前位置入队,通过CAS原子操作把该位置设置为FULL,避免其它线程操作这个位置,操作完后修改队尾位置。各个线程竞争新的队尾位置。如下图所示:

无锁队列的实现-循环数组

  1. 线程T1/T2竞争队尾位置。
  2. T1竞争成功,首先设置FULL标记,然后对该位置进行操作。
  3. T2轮询该位置标识为FULL继续轮询。
  4. T1操作完成后将队尾位置后移。
  5. T1/T2又开始竞争新的队尾。
  • DeQue 操作:如果当前队头位置为FULL,标示线程可以在当前位置出队,通过CAS原子操作把该位置设置为EMPTY,避免其它线程操作这个位置,操作完后修改队头位置。各个线程竞争新的队头位置。
  • 操作没有加锁,每个线程都假设没有冲突的去完成操作,如果因为冲突失败就重试。
#include "stdlib.h"
#include "stdio.h"
#include <pthread.h>
#define MAXLEN 2
#define CAS __sync_bool_compare_and_swap

typedef struct
{
    int elem;
    int status;//用于状态监测
}node;

typedef struct
{
    node elePool[MAXLEN];
    int front;
    int rear;
}queue;

enum
{
    EMPTY =1,
    FULL,
};

queue g_que;

void initQue()
{
    int i = 0;
    g_que.front = 0;
    g_que.rear  = 0;
    
    for(i=0;i<MAXLEN;i++)
    {
        g_que.elePool[i].status = EMPTY;
    }
    return;
}

int enque(int elem)
{
    do
    {
        if((g_que.rear+1)%MAXLEN == g_que.front)
        {
            return -1;
        }
    }while(!CAS(&(g_que.elePool[g_que.rear].status),EMPTY,FULL));
    g_que.elePool[g_que.rear].elem = elem;
    printf("in--%d(%lu)\n",elem,pthread_self());
    CAS(&(g_que.rear),g_que.rear,(g_que.rear+1)%MAXLEN);
    
    return 0;
}

int deque(int* pElem)
{
    do
    {
        if(g_que.rear == g_que.front)
        {
            return -1;
        }
    }while(!CAS(&(g_que.elePool[g_que.front].status),FULL,EMPTY));
    *pElem = g_que.elePool[g_que.front].elem;
    printf("out--%d(%lu)\n",*pElem,pthread_self());
    CAS(&(g_que.front),g_que.front,(g_que.front+1)%MAXLEN);
    return 0;
}
View Code

相关文章: