之前的文章《Java分布式锁实现》中列举了分布式锁的3种实现方式,分别是基于数据库实现,基于缓存实现和基于zookeeper实现。三种实现方式各有可取之处,本篇文章就详细讲解一下Java分布式锁之基于数据库的实现方式,也是最简单最易理解的实现方式。
首先,先来阐述下“锁”的概念,锁作为一种安全防御工具,既能上锁防止别人打开,又能让持有钥匙的人打开锁,这是锁的基本功能。那再来说一下“分布式锁”,分布式锁是在分布式系统(多个独立运行系统)内的锁,相对来说,这把锁的安全级别以及作用范围更大,所以从设计上就要考虑更多东西。
现在来说,怎么基于数据库实现这把分布式锁。其实说白了就是,把锁作为数据资源存入数据库,当持有这把锁的访问者来决定是否开锁。以下详细讲解了数据库的交易同步锁和交易重试补偿锁的实现。
一、数据库的设计
数据库锁表的表结构如下:
| field | type | comment |
| ID | bigint | 主键 |
| OUTER_SERIAL_NO | varchar | 流水号 |
| CUST_NO | char | 客户号 |
| SOURCE_CODE | varchar | 锁操作 |
| THREAD_NO | varchar | 线程号 |
| STATUS | char | 锁状态 |
| REMARK | varchar | 备注 |
| CREATED_AT | timestamp | 创建时间 |
| UPDATED_AT | timestamp | 更新时间 |
作为锁的必要属性有5个:系统流水号,客户号,锁操作,线程号和锁状态,下面来解释一下每种属性
流水号:锁的具体指向,比如可以是产品,可以是交易流水号(后面会说到交易同步锁、交易补偿锁的使用方式)
客户号:客户的唯一标识
锁操作:客户的某种操作,比如客户取现操作,取现补偿重试操作
锁状态:P处理中,F失败,Y成功
二、代码设计
代码的目录结构如下:
主要贴一下锁操作的核心代码实现:
锁接口定义:DbLockManager.java
1 /** 2 * 锁接口 <br> 3 * 4 * @Author fugaoyang 5 * 6 */ 7 public interface DbLockManager { 8 9 /** 10 * 加锁 11 */ 12 boolean lock(String outerSerialNo, String custNo, LockSource source); 13 14 /** 15 * 解锁 16 */ 17 void unLock(String outerSerialNo, String custNo, LockSource source, LockStatus targetStatus); 18 19 }
锁接口实现类:DbLockManagerImpl.java
1 /** 2 * 3 * 数据库锁实现<br> 4 * 5 * @author fugaoyang 6 * 7 */ 8 @Service 9 public class DbLockManagerImpl implements DbLockManager { 10 11 private final Logger LOG = LoggerFactory.getLogger(this.getClass()); 12 13 @Autowired 14 private DbSyncLockMapper lockMapper; 15 16 @Transactional 17 public boolean lock(String outerSerialNo, String custNo, LockSource source) { 18 19 boolean isLock = false; 20 TradeSyncLock lock = null; 21 try { 22 lock = lockMapper.find(outerSerialNo, custNo, source.getCode()); 23 24 if (null == lock) { 25 lock = new TradeSyncLock(); 26 createLock(lock, outerSerialNo, custNo, source); 27 28 int num = lockMapper.insert(lock); 29 if (num == 1) { 30 isLock = true; 31 } 32 33 LOG.info(ThreadLogUtils.getLogPrefix() + "加入锁,客户号[{}],锁类型[{}]", custNo, source.getCode()); 34 return isLock; 35 } 36 37 // 根据交易类型进行加锁 38 isLock = switchSynsLock(lock, source); 39 LOG.info(ThreadLogUtils.getLogPrefix() + "更新锁,客户号[{}],锁类型[{}]", custNo, source.getCode()); 40 41 } catch (Exception e) { 42 LOG.error(ThreadLogUtils.getLogPrefix() + "交易加锁异常, 客户号:" + custNo, e); 43 } 44 return isLock; 45 } 46 47 @Transactional 48 public void unLock(String outerSerialNo, String custNo, LockSource source, LockStatus targetStatus) { 49 50 try { 51 TradeSyncLock lock = lockMapper.find(outerSerialNo, custNo, source.getCode()); 52 53 if (null != lock) { 54 lockMapper.update(lock.getId(), targetStatus.getName(), LockStatus.P.getName(), 55 ThreadLogUtils.getCurrThreadUuid(), ThreadLogUtils.getCurrThreadUuid()); 56 } 57 58 LOG.info(ThreadLogUtils.getLogPrefix() + "释放锁,客户号[{}],锁类型[{}]", custNo, source.getCode()); 59 } catch (Exception e) { 60 LOG.error(ThreadLogUtils.getLogPrefix() + "释放锁异常, 客户号:{}", custNo, e); 61 } 62 } 63 64 /** 65 * 匹配加锁 66 */ 67 private boolean switchSynsLock(TradeSyncLock lock, LockSource source) { 68 boolean isLock = false; 69 70 switch (source) { 71 case WITHDRAW: 72 ; 73 isLock = tradeSynsLock(lock); 74 break; 75 case WITHDRAW_RETRY: 76 ; 77 isLock = retrySynsLock(lock); 78 break; 79 default: 80 ; 81 } 82 return isLock; 83 } 84 85 /** 86 * 交易同步锁 87 */ 88 private boolean tradeSynsLock(TradeSyncLock lock) { 89 // 处理中的不加锁,即不执行交易操作 90 if (LockStatus.P.getName().equals(lock.getStatus())) { 91 return false; 92 } 93 94 int num = lockMapper.update(lock.getId(), LockStatus.P.getName(), LockStatus.S.getName(), 95 ThreadLogUtils.getCurrThreadUuid(), null); 96 if (num == 1) { 97 return true; 98 } 99 return false; 100 } 101 102 /** 103 * 补偿同步锁 104 */ 105 private boolean retrySynsLock(TradeSyncLock lock) { 106 // 处理中或处理完成的不加锁,即不执行补偿操作 107 if (LockStatus.P.getName().equals(lock.getStatus()) || LockStatus.S.getName().equals(lock.getStatus())) { 108 return false; 109 } 110 111 int num = lockMapper.update(lock.getId(), LockStatus.P.getName(), LockStatus.F.getName(), 112 ThreadLogUtils.getCurrThreadUuid(), null); 113 if (num == 1) { 114 return true; 115 } 116 return false; 117 } 118 119 private void createLock(TradeSyncLock lock, String outerSerialNo, String custNo, LockSource source) { 120 lock.setOuterSerialNo(outerSerialNo); 121 lock.setCustNo(custNo); 122 lock.setSourceCode(source.getCode()); 123 lock.setThreadNo(ThreadLogUtils.getCurrThreadUuid()); 124 lock.setStatus(LockStatus.P.getName()); 125 lock.setRemark(source.getDesc()); 126 } 127 128 }
获取当前线程号以及打印uuid工具类ThreadLogUtils.Java
1 /** 2 * 3 * 线程处理<br> 4 * @author fugaoyang 5 * 6 */ 7 8 public class ThreadLogUtils { 9 10 private static ThreadLogUtils instance = null; 11 12 private ThreadLogUtils() { 13 setInstance(this); 14 } 15 16 // 初始化标志 17 private static final Object __noop = new Object(); 18 private static ThreadLocal<Object> __flag = new InheritableThreadLocal<Object>() { 19 @Override 20 protected Object initialValue() { 21 return null; 22 } 23 }; 24 25 26 // 当前线程的UUID信息,主要用于打印日志; 27 private static ThreadLocal<String> currLogUuid = new InheritableThreadLocal<String>() { 28 @Override 29 protected String initialValue() { 30 return UUID.randomUUID().toString()/* .toUpperCase() */; 31 } 32 }; 33 34 private static ThreadLocal<String> currThreadUuid = new ThreadLocal<String>() { 35 @Override 36 protected String initialValue() { 37 return UUIDGenerator.getUuid(); 38 } 39 }; 40 41 42 public static void clear(Boolean isNew) { 43 if (isNew) { 44 45 currLogUuid.remove(); 46 47 __flag.remove(); 48 49 currThreadUuid.remove(); 50 51 } 52 } 53 54 public static String getCurrLogUuid() { 55 if (!isInitialized()) { 56 throw new IllegalStateException("TLS未初始化"); 57 } 58 59 return currLogUuid.get(); 60 } 61 62 public static String getCurrThreadUuid() { 63 return currThreadUuid.get(); 64 } 65 66 public static void clearCurrThreadUuid() { 67 currThreadUuid.remove(); 68 } 69 70 public static String getLogPrefix() { 71 if (!isInitialized()) { 72 return ""; 73 } 74 75 return "<uuid=" + getCurrLogUuid() + ">"; 76 } 77 78 79 private static boolean isInitialized() { 80 return __flag.get() != null; 81 } 82 83 /** 84 * 初始化上下文,如果已经初始化则返回false,否则返回true<br/> 85 * 86 * @return 87 */ 88 public static boolean initialize() { 89 if (isInitialized()) { 90 return false; 91 } 92 93 __flag.set(__noop); 94 return true; 95 } 96 97 private static void setInstance(ThreadLogUtils instance) { 98 ThreadLogUtils.instance = instance; 99 } 100 101 public static ThreadLogUtils getInstance() { 102 return instance; 103 } 104 105 }
两种锁的实现的大致思路如下:
1.交易同步锁
当一个客户来取现,第一次进入时,会插入一条当前线程,状态是P,操作是取现的锁,取现成功后根据当前线程号会更新成功;
当一个客户同时多个取现操作时,只有一个取现操作会加锁成功,其它会加锁失败;
当一个客户已经在取现中,这时数据库已经有一条状态P的锁,该客户同时又做了取现,这个取现动作会尝试加锁而退出;
2.交易重试补偿锁
1.当一个客户取现加锁成功,因调用第三方支付接口超时时,后台会对该笔交易重新发起重试打款操作,这时会新加一条当前交易流水号,当前线程号,状态是P,操作是取现重试的锁,重试的支付结果是成功的话,更新该条锁数据为Y状态,否则更新该条数据为F状态;
2.当重试支付失败后,再去重试打款时,发现锁的状态是F,这时把F更新为P,继续重试,根据重试结果更新锁状态。
上面实现的是一个最基本的数据库分布式锁,满足的并发量也是基于数据库所能扛得住的,性能基本可以满足普通的交易量。
后续可以优化的部分:
1.当一个用户同时多次获取lock时,因为目前是用的乐观锁,只会有一个加锁成功,可以优化成加入while(true)循环获取lock,当失败次数到达指定次数时退出,当前的操作结束。
2.当锁表数据量随着时间增大时,可以考虑按用户对锁表进行分表分库,以减小数据库方面的压力。
3.对锁的操作可以抽象出来,作为抽象实现,比如具体的取现操作只关心取现这个业务实现。
因为时间有限,写的比较仓促,希望大家有问题可以提出,相互探讨~~
完整示例代码后续会更新到github。