本文讲一些常见的分布式应用层面的技术,其中大部分都依赖于Zookeeper,所以对zookeeper不熟悉的同学可以先看我之前写的两往篇博客Zookeeper编程(二)Zookeeper编程(一)

我们的推荐程序部署在多台服务器上,每天凌晨会去重建信息索引(索引存在Redis上)。建索引这件事情只能让一台服务器去做,其他服务器歇着,这种应用场景自然想到了分布式锁,谁抢到锁谁来建索引。我们在线上分别使用过3种分布式锁:基于zookeeper的锁,基于MySQL的锁,基于Redis的锁。MySQL lock最稳定;Redis lock使用起来最灵活,同时实时起来也最方便。所以现在我们线上的分布式锁全换成了Redis Lock。

zookeeper lock的实现原理参见链接,不再赘述。

MySQL lock的基本思想是大家都去写同一条数据库记录,谁先写上谁获取锁,删除这条记录就相当于释放了锁。整个流程看起来像这样:

begin;
select count(*) from table where lockname='xxx' for update;
if count==0:
    insert into table (lockname) values ('xxx);
    do the task which must be done once
    delete from table where lockname='xxx'
commit;

如果2个进程同时执行第一步,发现lockname不存在,于是都去添加一行记录,还都添加成功了,那岂不是2个进程都获得了锁?莫急,看见select语句后面有个"for update"吗?当where条件中不包含主键时,select ... for update将会锁表,事务提交后才释放表上的锁。(for update仅适用于InnoDB)

Redis lock的实现思想跟mysql相同,不过操作起来更简单。看下面的代码

Long i = jedis.setnx(lockName, lockName);    // 若key不存在,则存储 ,并返回1
if (i == 1L) {
    // 设置key的过期时间
    if (live < 0) {
        live = DEFAULT_EXPIRE_TIME;
    }
    jedis.expire(lockName, live);
    logger.info("get redis lock " + lockName + " ,live " + live
            + " seconds");

    rect = true;        //获得锁返回true
} else { // 已存在锁
    logger.info("lockName: " + lockName
            + " locked by other business");
    rect = false;        //没有获得锁返回false
}

核心操作是redis提供的setnx()方法,它来保证并发情况下中有1个进程能写成功。另外我们还为redis的key设置了超时时间,即使你获得锁后忘记了释放锁,或者在释放锁之前进程死掉了,不用担心,在达到超时时间后该锁也是会自动释放的。

Barrier

接着上面的应用场景讲。在推荐系统中,建完信息索引后就要开始为每个用户进行推荐了。推荐任务要分发到每台服务器上去执行,我们没有做单独的任务分发器,而是每台服务器都去同一个数据表里读取所有的用户ID,userid % n == 自己的编号时(n是服务器总数),该服务器就计算这个用户的推荐。计算推荐的过程也伴随着计算用户兴趣,所有用户的推荐计算完毕后,兴趣也就计算完毕了,此时又需要建立兴趣索引。建立兴趣索引又是只能由一台服务器来做的事情。这里有2个关键节点,即必须建完兴趣索引后所有服务器才能开始计算推荐,所有服务器计算完推荐后才能开始建兴趣索引。分布式环境下各服务器之间要想达成这种默契就需要借助于DoubleBarrier。

Barrier是指:

1)所有的线程都到达barrier后才能进行后续的计算

或者

2)所有的线程都完成自己的计算后才能离开barrier

Double Barrier是指同时具有上述两点。

Double Barrier的实现:

分布式应用技术
enter barrier:
1.建一个根节点"/root"
2.想进入barrier的线程在"/root"下建立一个子节点"/root/c_i"
3.循环监听"/root"孩子节点数目的变化,当其达到size时就说明有size个线程都已经barrier点了

leave barrier:
1.想离开barrier的线程删除其在"/root"下建立的子节点
2.循环监听"/root"孩子节点数目的变化,当size减到0时它就可以离开barrier了

服务注册

 继续研究上面的应用场景,我们提到每台服务器遇到“userid % n == 自己的编号时(n是服务器总数)”这样的用户时才为其计算推荐,这里有两个问题:

  1. 集群中服务器的总数如何获取?如果直接设置成上线的服务器的个数会存在2个问题:将来服务器数目增加了n还得跟着改;如果哪天某台服务器进程挂了,那就造成1/n的用户没有推荐数据。
  2. 本服务器在集群中的编号如何获得?

解决办法是:

每台服务器进程启动时在特定的zookeeper路径下添加一个EPHEMERAL节点,节点是存储的数据为自己的IP(或者其他能唯一标识一台服务器的东西)。之所以要求是EPHEMERAL类型,是因为当进程死掉后该zookeeper节点会自动被删除掉。每天凌晨每台服务器去获取特定zookeeper路径下所有的子节点,子节点数目即为集群中服务器总数。根据IP每台服务器就可以知道自己在所有的孩子节点中排名第几。

ServerCluster.java

  1 import java.util.List;
  2 import java.util.concurrent.Executors;
  3 import java.util.concurrent.ScheduledExecutorService;
  4 import java.util.concurrent.TimeUnit;
  5 
  6 import org.apache.commons.logging.Log;
  7 import org.apache.commons.logging.LogFactory;
  8 import org.apache.curator.framework.CuratorFramework;
  9 import org.apache.curator.framework.api.CreateBuilder;
 10 import org.apache.zookeeper.CreateMode;
 11 
 12 
 13 /**
 14  * 
 15  * @Author:orisun
 16  * @Since:2016-4-7
 17  * @Version:1.0
 18  */
 19 public class ServerCluster {
 20 
 21     private static Log logger = LogFactory.getLog(ServerCluster.class);
 22     private static final String BASE_PATH = ZkClient.getInstance().getBasePath() + "/cluster";
 23     private static ScheduledExecutorService exec = Executors
 24             .newSingleThreadScheduledExecutor();
 25 
 26     /**
 27      * 向集群上报自己的存在,即把自己的IP写到特定的zk节点(EPHEMERAL节点)上去
 28      */
 29     public static void reportServer() {
 30         String selfIP = NIC.getLocalIP();
 31         CuratorFramework zkClient = ZkClient.getInstance().getZkClient();
 32         boolean exists = false;
 33         try {
 34             CreateBuilder cb = zkClient.create();
 35             if (zkClient.checkExists().forPath(BASE_PATH) == null) {
 36                 cb.creatingParentsIfNeeded().forPath(BASE_PATH,
 37                         new byte[] { 0 });
 38             }
 39             List<String> children = zkClient.getChildren().forPath(BASE_PATH);
 40             if (children != null && children.indexOf(selfIP) >= 0) {
 41                 exists = true;
 42             }
 43             if (!exists) {
 44                 // EPHEMERAL节点,进程终止时zookeeper连接断开,节点自动被删除
 45                 cb.creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
 46                         .forPath(BASE_PATH + "/" + selfIP, new byte[] { 0 });
 47                 logger.info(selfIP + " add to cluster");
 48             } else {
 49                 // 如果发现cluster上已存在该IP,则5秒后再确认一下
 50                 logger.info(selfIP + " is already in cluster");
 51                 Thread.sleep(1000 * 5);
 52                 children = zkClient.getChildren().forPath(BASE_PATH);
 53                 exists = false;
 54                 if (children != null && children.indexOf(selfIP) >= 0) {
 55                     exists = true;
 56                 }
 57                 if (!exists) {
 58                     // EPHEMERAL节点,进程终止时zookeeper连接断开,节点自动被删除
 59                     cb.creatingParentsIfNeeded()
 60                             .withMode(CreateMode.EPHEMERAL)
 61                             .forPath(BASE_PATH + "/" + selfIP, new byte[] { 0 });
 62                     logger.info(selfIP + " add to cluster");
 63                 }
 64             }
 65         } catch (Exception e) {
 66             logger.fatal("report to cluster failed", e);
 67         }
 68     }
 69 
 70     /**
 71      * 向集群上报自己的存在,即把自己的IP写到特定的zk节点(EPHEMERAL节点)上去<br>
 72      * 为防止zookeeper会话断开而造成节点被删除,每隔10分钟就去写一次
 73      */
 74     public static void report() {
 75         exec.scheduleAtFixedRate(new Runnable() {
 76             @Override
 77             public void run() {
 78                 reportServer();
 79             }
 80         }, 0, 10, TimeUnit.MINUTES);
 81     }
 82 
 83     /**
 84      * 获取集群中有多少台机器
 85      * 
 86      * @return
 87      */
 88     public static int getClusterSize() {
 89         int total = 0;
 90         List<String> children = null;
 91         try {
 92             CuratorFramework zkClient = ZkClient.getInstance().getZkClient();
 93             children = zkClient.getChildren().forPath(BASE_PATH);
 94         } catch (Exception e) {
 95             logger.error("get children of " + BASE_PATH + " failed", e);
 96         }
 97         if (children != null) {
 98             total = children.size();
 99         }
100         logger.info("cluster size is " + total);
101         return total;
102     }
103 
104     /**
105      * 获取自己在集群中的编码(从0开始)
106      * 
107      * @return
108      */
109     public static int getIndexInCluster() {
110         int index = -1;
111         CuratorFramework zkClient = ZkClient.getInstance().getZkClient();
112         try {
113             List<String> children = zkClient.getChildren().forPath(BASE_PATH);
114             String selfIP = NIC.getLocalIP();
115             index = children.indexOf(selfIP);
116         } catch (Exception e) {
117             logger.fatal("get cluster info failed", e);
118         }
119         logger.info("this server's index is " + index);
120         return index;
121     }
122 }
View Code

相关文章: