Leaf的Github地址:

https://github.com/Meituan-Dianping/Leaf

Leaf美团技术团队博客地址:

https://tech.meituan.com/2017/04/21/mt-leaf.html

关于Leaf的使用手册、架构说明、Segment和Snowflake的特点和时钟回拨解决办法,参考上面的链接内容都能获得到答案。拒绝重复搬砖。

在本篇博客里我想就Leaf的Segment模式的源码实现做个简单的注释。代码分支:master。

1.Segment

Segment是SegmentBuffer的成员属性,cache中存储的是SegmentBuffer,Segment是双buffer的实现。

2.初始化Segment

在初始化Segment时,主要做两件事情。1是根据数据库表中配置的busi_tag更新缓存;2是添加定时任务,定时(一分钟间隔)更新缓存。

    @Override
    public boolean init() {
        logger.info("Init ...");
        // 确保加载到kv后才初始化成功
        updateCacheFromDb();
        initOK = true;
        updateCacheFromDbAtEveryMinute();
        return initOK;
    }

updateCacheFromDb():

private void updateCacheFromDb() {
        logger.info("update cache from db");
        StopWatch sw = new Slf4JStopWatch();
        try {
            //从配置的数据源中加载biz_tag
            List<String> dbTags = dao.getAllTags();
            if (dbTags == null || dbTags.isEmpty()) {
                return;
            }
            //cache中的biz_tag.初始为空.
            List<String> cacheTags = new ArrayList<String>(cache.keySet());
            //存储本次更新操作,要从DB中加载进cache的biz_tag.
            Set<String> insertTagsSet = new HashSet<>(dbTags);
            //存储失效的biz_tag:存在于cache,不存在于DB.
            Set<String> removeTagsSet = new HashSet<>(cacheTags);
            //过滤去重,得到需要存入进cache的biz_tag
            for(int i = 0; i < cacheTags.size(); i++){
                String tmp = cacheTags.get(i);
                if(insertTagsSet.contains(tmp)){
                    insertTagsSet.remove(tmp);
                }
            }
            //存储进cache
            for (String tag : insertTagsSet) {
                SegmentBuffer buffer = new SegmentBuffer();
                buffer.setKey(tag);
                Segment segment = buffer.getCurrent();
                segment.setValue(new AtomicLong(0));
                //这里的max、step均为0.所以这一步仅仅将biz_tag存储进了cache,并没有对SegmentBuffer执行初始化操作.
                segment.setMax(0);
                segment.setStep(0);
                cache.put(tag, buffer);
                logger.info("Add tag {} from db to IdCache, SegmentBuffer {}", tag, buffer);
            }
            //cache中已失效的biz_tag从cache删除
            for(int i = 0; i < dbTags.size(); i++){
                String tmp = dbTags.get(i);
                if(removeTagsSet.contains(tmp)){
                    removeTagsSet.remove(tmp);
                }
            }
            for (String tag : removeTagsSet) {
                cache.remove(tag);
                logger.info("Remove tag {} from IdCache", tag);
            }
        } catch (Exception e) {
            logger.warn("update cache from db exception", e);
        } finally {
            sw.stop("updateCacheFromDb");
        }
    }
View Code

相关文章:

  • 2022-12-23
  • 2022-02-03
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2021-07-01
猜你喜欢
  • 2021-05-17
  • 2022-12-23
  • 2021-05-28
  • 2021-09-01
  • 2021-08-07
相关资源
相似解决方案