garryfu

之前的文章《Java分布式锁实现》中列举了分布式锁的3种实现方式,分别是基于数据库实现,基于缓存实现和基于zookeeper实现。三种实现方式各有可取之处,本篇文章就详细讲解一下Java分布式锁之基于数据库的实现方式,也是最简单最易理解的实现方式。

首先,先来阐述下“锁”的概念,锁作为一种安全防御工具,既能上锁防止别人打开,又能让持有钥匙的人打开锁,这是锁的基本功能。那再来说一下“分布式锁”,分布式锁是在分布式系统(多个独立运行系统)内的锁,相对来说,这把锁的安全级别以及作用范围更大,所以从设计上就要考虑更多东西。

现在来说,怎么基于数据库实现这把分布式锁。其实说白了就是,把锁作为数据资源存入数据库,当持有这把锁的访问者来决定是否开锁。以下详细讲解了数据库的交易同步锁和交易重试补偿锁的实现。

一、数据库的设计

数据库锁表的表结构如下:

 
field type comment
ID bigint 主键
OUTER_SERIAL_NO varchar 流水号
CUST_NO char 客户号
SOURCE_CODE varchar 锁操作
THREAD_NO varchar 线程号
STATUS char 锁状态
REMARK varchar 备注
CREATED_AT timestamp 创建时间
UPDATED_AT timestamp 更新时间

作为锁的必要属性有5个:系统流水号,客户号,锁操作,线程号和锁状态,下面来解释一下每种属性

流水号:锁的具体指向,比如可以是产品,可以是交易流水号(后面会说到交易同步锁、交易补偿锁的使用方式)

客户号:客户的唯一标识

锁操作:客户的某种操作,比如客户取现操作,取现补偿重试操作

锁状态:P处理中,F失败,Y成功

二、代码设计

代码的目录结构如下: 

主要贴一下锁操作的核心代码实现:

锁接口定义:DbLockManager.java

 1 /**
 2  * 锁接口 <br>
 3  * 
 4  * @Author fugaoyang
 5  *
 6  */
 7 public interface DbLockManager {
 8 
 9     /**
10      * 加锁
11      */
12     boolean lock(String outerSerialNo, String custNo, LockSource source);
13 
14     /**
15      * 解锁
16      */
17     void unLock(String outerSerialNo, String custNo, LockSource source, LockStatus targetStatus);
18 
19 }
View Code

锁接口实现类:DbLockManagerImpl.java

  1 /**
  2  * 
  3  * 数据库锁实现<br>
  4  * 
  5  * @author fugaoyang
  6  *
  7  */
  8 @Service
  9 public class DbLockManagerImpl implements DbLockManager {
 10 
 11     private final Logger LOG = LoggerFactory.getLogger(this.getClass());
 12 
 13     @Autowired
 14     private DbSyncLockMapper lockMapper;
 15 
 16     @Transactional
 17     public boolean lock(String outerSerialNo, String custNo, LockSource source) {
 18 
 19         boolean isLock = false;
 20         TradeSyncLock lock = null;
 21         try {
 22             lock = lockMapper.find(outerSerialNo, custNo, source.getCode());
 23 
 24             if (null == lock) {
 25                 lock = new TradeSyncLock();
 26                 createLock(lock, outerSerialNo, custNo, source);
 27 
 28                 int num = lockMapper.insert(lock);
 29                 if (num == 1) {
 30                     isLock = true;
 31                 }
 32 
 33                 LOG.info(ThreadLogUtils.getLogPrefix() + "加入锁,客户号[{}],锁类型[{}]", custNo, source.getCode());
 34                 return isLock;
 35             }
 36 
 37             // 根据交易类型进行加锁
 38             isLock = switchSynsLock(lock, source);
 39             LOG.info(ThreadLogUtils.getLogPrefix() + "更新锁,客户号[{}],锁类型[{}]", custNo, source.getCode());
 40 
 41         } catch (Exception e) {
 42             LOG.error(ThreadLogUtils.getLogPrefix() + "交易加锁异常, 客户号:" + custNo, e);
 43         }
 44         return isLock;
 45     }
 46 
 47     @Transactional
 48     public void unLock(String outerSerialNo, String custNo, LockSource source, LockStatus targetStatus) {
 49 
 50         try {
 51             TradeSyncLock lock = lockMapper.find(outerSerialNo, custNo, source.getCode());
 52 
 53             if (null != lock) {
 54                 lockMapper.update(lock.getId(), targetStatus.getName(), LockStatus.P.getName(),
 55                         ThreadLogUtils.getCurrThreadUuid(), ThreadLogUtils.getCurrThreadUuid());
 56             }
 57 
 58             LOG.info(ThreadLogUtils.getLogPrefix() + "释放锁,客户号[{}],锁类型[{}]", custNo, source.getCode());
 59         } catch (Exception e) {
 60             LOG.error(ThreadLogUtils.getLogPrefix() + "释放锁异常, 客户号:{}", custNo, e);
 61         }
 62     }
 63 
 64     /**
 65      * 匹配加锁
 66      */
 67     private boolean switchSynsLock(TradeSyncLock lock, LockSource source) {
 68         boolean isLock = false;
 69 
 70         switch (source) {
 71         case WITHDRAW:
 72             ;
 73             isLock = tradeSynsLock(lock);
 74             break;
 75         case WITHDRAW_RETRY:
 76             ;
 77             isLock = retrySynsLock(lock);
 78             break;
 79         default:
 80             ;
 81         }
 82         return isLock;
 83     }
 84 
 85     /**
 86      * 交易同步锁
 87      */
 88     private boolean tradeSynsLock(TradeSyncLock lock) {
 89         // 处理中的不加锁,即不执行交易操作
 90         if (LockStatus.P.getName().equals(lock.getStatus())) {
 91             return false;
 92         }
 93 
 94         int num = lockMapper.update(lock.getId(), LockStatus.P.getName(), LockStatus.S.getName(),
 95                 ThreadLogUtils.getCurrThreadUuid(), null);
 96         if (num == 1) {
 97             return true;
 98         }
 99         return false;
100     }
101 
102     /**
103      * 补偿同步锁
104      */
105     private boolean retrySynsLock(TradeSyncLock lock) {
106         // 处理中或处理完成的不加锁,即不执行补偿操作
107         if (LockStatus.P.getName().equals(lock.getStatus()) || LockStatus.S.getName().equals(lock.getStatus())) {
108             return false;
109         }
110 
111         int num = lockMapper.update(lock.getId(), LockStatus.P.getName(), LockStatus.F.getName(),
112                 ThreadLogUtils.getCurrThreadUuid(), null);
113         if (num == 1) {
114             return true;
115         }
116         return false;
117     }
118 
119     private void createLock(TradeSyncLock lock, String outerSerialNo, String custNo, LockSource source) {
120         lock.setOuterSerialNo(outerSerialNo);
121         lock.setCustNo(custNo);
122         lock.setSourceCode(source.getCode());
123         lock.setThreadNo(ThreadLogUtils.getCurrThreadUuid());
124         lock.setStatus(LockStatus.P.getName());
125         lock.setRemark(source.getDesc());
126     }
127 
128 }
View Code

获取当前线程号以及打印uuid工具类ThreadLogUtils.Java

  1 /**
  2  * 
  3  * 线程处理<br>
  4  * @author fugaoyang
  5  *
  6  */
  7 
  8 public class ThreadLogUtils {
  9     
 10     private static ThreadLogUtils instance = null;
 11 
 12     private ThreadLogUtils() {
 13         setInstance(this);
 14     }
 15 
 16     // 初始化标志
 17     private static final Object __noop = new Object();
 18     private static ThreadLocal<Object> __flag = new InheritableThreadLocal<Object>() {
 19         @Override
 20         protected Object initialValue() {
 21             return null;
 22         }
 23     };
 24 
 25 
 26     // 当前线程的UUID信息,主要用于打印日志;
 27     private static ThreadLocal<String> currLogUuid = new InheritableThreadLocal<String>() {
 28         @Override
 29         protected String initialValue() {
 30             return UUID.randomUUID().toString()/* .toUpperCase() */;
 31         }
 32     };
 33 
 34     private static ThreadLocal<String> currThreadUuid = new ThreadLocal<String>() {
 35         @Override
 36         protected String initialValue() {
 37             return UUIDGenerator.getUuid();
 38         }
 39     };
 40     
 41 
 42     public static void clear(Boolean isNew) {
 43         if (isNew) {
 44 
 45             currLogUuid.remove();
 46             
 47             __flag.remove();
 48             
 49             currThreadUuid.remove();
 50             
 51         }
 52     }
 53 
 54     public static String getCurrLogUuid() {
 55         if (!isInitialized()) {
 56             throw new IllegalStateException("TLS未初始化");
 57         }
 58 
 59         return currLogUuid.get();
 60     }
 61 
 62     public static String getCurrThreadUuid() {
 63         return currThreadUuid.get();
 64     }
 65     
 66     public static void clearCurrThreadUuid() {
 67         currThreadUuid.remove();
 68     }
 69     
 70     public static String getLogPrefix() {
 71         if (!isInitialized()) {
 72             return "";
 73         }
 74 
 75         return "<uuid=" + getCurrLogUuid() + ">";
 76     }
 77 
 78 
 79     private static boolean isInitialized() {
 80         return __flag.get() != null;
 81     }
 82 
 83     /**
 84      * 初始化上下文,如果已经初始化则返回false,否则返回true<br/>
 85      *
 86      * @return
 87      */
 88     public static boolean initialize() {
 89         if (isInitialized()) {
 90             return false;
 91         }
 92 
 93         __flag.set(__noop);
 94         return true;
 95     }
 96 
 97     private static void setInstance(ThreadLogUtils instance) {
 98         ThreadLogUtils.instance = instance;
 99     }
100 
101     public static ThreadLogUtils getInstance() {
102         return instance;
103     }
104 
105 }
View Code

两种锁的实现的大致思路如下:

1.交易同步锁

当一个客户来取现,第一次进入时,会插入一条当前线程,状态是P,操作是取现的锁,取现成功后根据当前线程号会更新成功;

当一个客户同时多个取现操作时,只有一个取现操作会加锁成功,其它会加锁失败;

当一个客户已经在取现中,这时数据库已经有一条状态P的锁,该客户同时又做了取现,这个取现动作会尝试加锁而退出;

2.交易重试补偿锁

1.当一个客户取现加锁成功,因调用第三方支付接口超时时,后台会对该笔交易重新发起重试打款操作,这时会新加一条当前交易流水号,当前线程号,状态是P,操作是取现重试的锁,重试的支付结果是成功的话,更新该条锁数据为Y状态,否则更新该条数据为F状态;

2.当重试支付失败后,再去重试打款时,发现锁的状态是F,这时把F更新为P,继续重试,根据重试结果更新锁状态。

上面实现的是一个最基本的数据库分布式锁,满足的并发量也是基于数据库所能扛得住的,性能基本可以满足普通的交易量。

后续可以优化的部分:

1.当一个用户同时多次获取lock时,因为目前是用的乐观锁,只会有一个加锁成功,可以优化成加入while(true)循环获取lock,当失败次数到达指定次数时退出,当前的操作结束。

2.当锁表数据量随着时间增大时,可以考虑按用户对锁表进行分表分库,以减小数据库方面的压力。

3.对锁的操作可以抽象出来,作为抽象实现,比如具体的取现操作只关心取现这个业务实现。

 

因为时间有限,写的比较仓促,希望大家有问题可以提出,相互探讨~~

完整示例代码后续会更新到github。

 

相关文章: