Leaf的Github地址:
https://github.com/Meituan-Dianping/Leaf
Leaf美团技术团队博客地址:
https://tech.meituan.com/2017/04/21/mt-leaf.html
关于Leaf的使用手册、架构说明、Segment和Snowflake的特点和时钟回拨解决办法,参考上面的链接内容都能获得到答案。拒绝重复搬砖。
在本篇博客里我想就Leaf的Segment模式的源码实现做个简单的注释。代码分支:master。
1.Segment
Segment是SegmentBuffer的成员属性,cache中存储的是SegmentBuffer,Segment是双buffer的实现。
2.初始化Segment
在初始化Segment时,主要做两件事情。1是根据数据库表中配置的busi_tag更新缓存;2是添加定时任务,定时(一分钟间隔)更新缓存。
@Override public boolean init() { logger.info("Init ..."); // 确保加载到kv后才初始化成功 updateCacheFromDb(); initOK = true; updateCacheFromDbAtEveryMinute(); return initOK; }
updateCacheFromDb():
private void updateCacheFromDb() { logger.info("update cache from db"); StopWatch sw = new Slf4JStopWatch(); try { //从配置的数据源中加载biz_tag List<String> dbTags = dao.getAllTags(); if (dbTags == null || dbTags.isEmpty()) { return; } //cache中的biz_tag.初始为空. List<String> cacheTags = new ArrayList<String>(cache.keySet()); //存储本次更新操作,要从DB中加载进cache的biz_tag. Set<String> insertTagsSet = new HashSet<>(dbTags); //存储失效的biz_tag:存在于cache,不存在于DB. Set<String> removeTagsSet = new HashSet<>(cacheTags); //过滤去重,得到需要存入进cache的biz_tag for(int i = 0; i < cacheTags.size(); i++){ String tmp = cacheTags.get(i); if(insertTagsSet.contains(tmp)){ insertTagsSet.remove(tmp); } } //存储进cache for (String tag : insertTagsSet) { SegmentBuffer buffer = new SegmentBuffer(); buffer.setKey(tag); Segment segment = buffer.getCurrent(); segment.setValue(new AtomicLong(0)); //这里的max、step均为0.所以这一步仅仅将biz_tag存储进了cache,并没有对SegmentBuffer执行初始化操作. segment.setMax(0); segment.setStep(0); cache.put(tag, buffer); logger.info("Add tag {} from db to IdCache, SegmentBuffer {}", tag, buffer); } //cache中已失效的biz_tag从cache删除 for(int i = 0; i < dbTags.size(); i++){ String tmp = dbTags.get(i); if(removeTagsSet.contains(tmp)){ removeTagsSet.remove(tmp); } } for (String tag : removeTagsSet) { cache.remove(tag); logger.info("Remove tag {} from IdCache", tag); } } catch (Exception e) { logger.warn("update cache from db exception", e); } finally { sw.stop("updateCacheFromDb"); } }