之前知道可以通过redis的setnx来进行加锁,但是,获取锁的顺序是没有保障的。刚好之前有写过一个AQS的博客,所以基于那个队列的原理写一个非公平分布式锁。
主要原理是,每个想获取锁的节点需要通过以下步骤(若获取锁user):
1.拿到***(user:seq)
2.入等待队列(队列为user:list)。
3.轮询,若队列头部为自己,则去获取锁。
主要接口为以下,使用方法与ReentreenLock类似,但是需要传入加锁的key:
public class TestLock {
@Autowired
private FenbushiLock lock;//分布式锁
public void get() throws InterruptedException{
lock.lock("user");
}
public void unlock() throws InterruptedException{
lock.unlock("user");
}
}
锁内部实现代码为以下:
public void lock(String key) throws InterruptedException {
// Boolean result = redisUtil.setNx(key);
final String listKey = key+":list";//该锁配合使用的队列
final String seqKey = key+":seq";//该锁配合使用的唯一序列。
redisTemplate.watch(seqKey);//监控序列key,防止被人改变。
Object key2 = redisTemplate.boundValueOps(seqKey).get();
if(key2==null){//若为空,说明队列中无数据。
key2 = 0;
}
long trySeq = Long.parseLong(key2.toString())+1;//试图设置序列为当前序列+1
boolean same = false;
while(!same){
redisTemplate.multi();//开始事务,这个事务可以保证,拿到的***马上和如队列排队的号为一致的,防止拿到了序列3,但是因为非原子性,导致排在了序列4的后面。
redisTemplate.boundValueOps(seqKey).increment(1);//序列加1
redisTemplate.boundListOps(listKey).rightPush(trySeq);//将序列入队
List<Object> l= redisTemplate.exec();
if(l.size()!=0){//说明执行成功
break;//当break时,说明成功拿到序号,并入队列等待。
}else{
redisTemplate.watch(seqKey);//否则说明已经有人先拿了序列,并且入队。
trySeq = (Long) redisTemplate.boundValueOps(seqKey).get()+1;//重新试图设置序列为当前序列+1。
}
}
long top = redisUtil.listGet(listKey);//当前排队的最前面的节点。
for(;;){
if(top!=trySeq){//如果最前面的节点不是当前节点
try {
Thread.sleep(1);//sleep一下,然后继续循环
} catch (InterruptedException e) {
e.printStackTrace();
}
top = redisUtil.listGet(listKey);//重新获取最前面的节点。
}else{//如果当前节点已经是最top的节点,则试着setnx拿锁。
boolean result = redisUtil.setNx(key);
if(result){//如果拿锁成功,则打印日志。
System.out.println("序号"+trySeq+"获取到锁:");
return ;
}
}
}
}
public void unlock(String key) {
redisUtil.listLPop(key+":list");//释放锁时,将最top元素pop出队列。
redisUtil.removeKey(key);//并移除锁。
}
test函数如下:
@SpringBootApplication
@EnableTransactionManagement
public class ServerStart {
public static void main(String args[]){
ApplicationContext a =SpringApplication.run(ServerStart.class, args);
final TestLock t = (TestLock) a.getBean(TestLock.class);
for(int i=0;i<100;i++){//100个线程获取锁,按顺序获取和释放。
new Thread(new Runnable() {
public void run() {
try {
t.get();
t.unlock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
}
结果(此为单机结果,可以多节点使用):