RMI

RPC

RPC 远程过程调用,一般用来实现部署在不同机器上的系统之间的方法调用,使得程序能够像访问本地系统资源一样,通过网络传输去访问远端系统资源;对于客户端来说,传输层使用什么协议,序列化、反序列化都是透明的。

Java RMI与RPC的区别

RMI

RMI 全称是remote method invocation – 远程方法调用。一种用于远程过程调用的应用程序编程接口,是纯java的网络分布式应用系统的核心解决方案之一。

远程对象必须继承UnicastRemoteObject,这样才能保证客户端访问获得远程对象时,该远程对象会把自身的一个拷贝以Socket 形式传输给客户端,客户端获得的拷贝称为“stub” , 而服务器端本身已经存在的远程对象称为“skeleton”,此时客户端的stub 是客户端的一个代理,用于与服务器端进行通信,而skeleton 是服务端的一个代理,用于接收客户端的请求之后调用远程方法来响应客户端的请求

RMI远程调用步骤:

1,客户调用客户端辅助对象stub上的方法

2,客户端辅助对象stub打包调用信息(变量,方法名),通过网络发送给服务端辅助对象skeleton

3,服务端辅助对象skeleton将客户端辅助对象发送来的信息解包,找出真正被调用的方法以及该方法所在对象

4,调用真正服务对象上的真正方法,并将结果返回给服务端辅助对象skeleton

5,服务端辅助对象将结果打包,发送给客户端辅助对象stub

6,客户端辅助对象将返回值解包,返回给调用者

7,客户获得返回值

Zookeeper

是什么:Zookeeper 是一个开源的分布式协调服务,

节点特性:包含持久化、临时节点、有序节点,同级节点的唯一性、临时节点不能存在子节点

数据模型:znode ,类似文件夹目录

功能

  • 使用集群防止单点故障,

    • 具有三个角色:
      • leader:所有事务请求必须由一个全局唯的服器来协调处理
      • follower:参与事务请求的投票以及leader的选举投票
      • observer:只提供非事物 请求,在不影响集群事务处理能力的前提下提升非事务处理的能力,observer不需要参与投票的过程,但是必须要同步leader的数据,从而在处理请求的时候保证数据的一致性
  • 采用ZAB协议保证数据的一致性,

    ZAB 协议是为分布式协调服务 Zookeeper 专门设计的一种支持崩溃恢复的原子广播协议。

    在Zookeeper 中主要依赖ZAB协议来实现分布式数据的一致性。

    • 消息广播

      当整个集群启动或leader崩溃的时候,ZAB协议便会进入恢复模式,当选举出leader的时候,并与过半的follower节点完成数据同步以后,就会进入消息广播节点

      • 消息广播的过程实际上是一个简化版本的二阶段提交过程Zookeeper 概要 和 leader选举、watcher 机制分析
    • 崩溃恢复

      当leader节点崩溃或者leader服务器失去了过半的follower节点的联系,变户籍进入崩溃恢复阶段

      • 已经处理的消息不能丢失
      • 被丢弃的消息不能再次出现

      主要是通过zxid 和 epoch 来实现

  • ZAB 来实现选举

  • 2pc来做数据一致性

leader 选举(QuorumPeerMain)

###服务器启动时的leader选举

  • 每个节点启动的时候状态都是LOOKING ,处于观望状态,
  • 每个server发出一个投票,比如:(myid,zxid,epoch) 进行封装:server1(1,0,1),server2(2,0,1)会根据自己的epoch和zxid 及 myid进行比较票据,各自将这个票据发给集群中其他机器。
    • 先比较epoch 大者胜出
    • 如果epoch相等,则进行比较zxid(事务id) 大者胜出
    • 如果zxid也相等,则进行比较myid,大者胜出
  • 如果票据胜出者将进行修改票据并广播票据给其他机器,再次进行投票,直到选出leader
  • 统计票据,当过半的机器接受相同的投票信息,对server1,server2而言,都统计出集群中已经有两台机器接受了(2,0,1) 的投票信息,此时便认为已经选出了leader
  • 改变服务器状态。一旦确定了leader,每个服务器就会更新自己的状态,如果是follower 就会变更为following,如果是leader,就变更为leading。

运行过程中的leader选举

  • 当leader节点出现宕机或不可用时,会进入新一轮的leader选举
  • 变更状态,leader挂后,剩下非Observer服务器都会将自己的服务器状态变更为LOOKING状态,然后开始进入leader选举
  • 处理投票此时的(myid,zxid,epoch) 可能会不相同,此时投票过程与启动过程相同。
  • 统计投票,与启动过程相同。
  • 改变服务器状态,与启动过程相同。

【分布式】Zookeeper的Leader选举-选举过程介绍比较清晰

Leader 选举源码分析

QuorumPeerMain.main() 分析入口

public static void main(String[] args) {
    QuorumPeerMain main = new QuorumPeerMain();
    try {
        main.initializeAndRun(args);
    } catch (Exception e) {
        LOG.error("Unexpected exception, exiting abnormally", e);
        System.exit(ExitCode.UNEXPECTED_ERROR.getValue());
    }
}
protected void initializeAndRun(String[] args)
    throws ConfigException, IOException {
    QuorumPeerConfig config = new QuorumPeerConfig();
    if (args.length == 1) {
        config.parse(args[0]);
    }
    // Start and schedule the the purge task
    DatadirCleanupManager purgeMgr = new DatadirCleanupManager(
        config.getDataDir(), 
        config.getDataLogDir(), 
        config.getSnapRetainCount(), 
        config.getPurgeInterval());
    purgeMgr.start();
    //如果是集群,则读取配置的信息
    if (args.length == 1 && config.servers.size() > 0) {
        runFromConfig(config);
    } else {
        LOG.warn("Either no config or no quorum defined in config, running "
                 + " in standalone mode");
        // there is only server in the quorum -- run as standalone
        ZooKeeperServerMain.main(args);
    }
}
public void runFromConfig(QuorumPeerConfig config) throws IOException {
    LOG.info("Starting quorum peer");
    try {
        ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
        cnxnFactory.configure(config.getClientPortAddress(),
                              config.getMaxClientCnxns());
	    // 从配置文件中加载配置信息
        quorumPeer = new QuorumPeer(config.getServers(),
                                    new File(config.getDataDir()),
                                    new File(config.getDataLogDir()),
                                    //将配置文件中的选举类型给quorumPeer对象,默认类型是3
                                    config.getElectionAlg(),
                                    config.getServerId(),
                                    config.getTickTime(),
                                    config.getInitLimit(),
                                    config.getSyncLimit(),
                                    config.getQuorumListenOnAllIPs(),
                                    cnxnFactory,
                                    config.getQuorumVerifier());
        quorumPeer.setClientPortAddress(config.getClientPortAddress());
        quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
        quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
        quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
        quorumPeer.setLearnerType(config.getPeerType());
        quorumPeer.setSyncEnabled(config.getSyncEnabled());

        // sets quorum sasl authentication configurations
        quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
        if(quorumPeer.isQuorumSaslAuthEnabled()){
            quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
            quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
            quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
            quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
            quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
        }

        quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
        quorumPeer.initialize();
	    // 开始启动
        quorumPeer.start();
        quorumPeer.join();
    } catch (InterruptedException e) {
        // warn, but generally this is ok
        LOG.warn("Quorum Peer interrupted", e);
    }
}

QuorumPeer

public synchronized void start() {
    //加载数据
    loadDataBase();
    cnxnFactory.start();
    //开始选举 leader
    startLeaderElection();
    // 此时调用 run()
    super.start();
}
public void run() {


    try {
        while (running) {
            switch (getPeerState()) {
                case LOOKING:
                    LOG.info("LOOKING");

                    if (Boolean.getBoolean("readonlymode.enabled")) {
                        LOG.info("Attempting to start ReadOnlyZooKeeperServer");
                        try {
                            roZkMgr.start();
                            setBCVote(null);
                            //todo leader选举核心内容
                            //设置当前的投票,通过策略模式来决定当前用哪个选举算法来进行领导选举,
                            //默认是FastLeaderElection
                            setCurrentVote(makeLEStrategy().lookForLeader());
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception",e);
                            setPeerState(ServerState.LOOKING);
                        } finally {
                            // If the thread is in the the grace period, interrupt
                            // to come out of waiting.
                            roZkMgr.interrupt();
                            roZk.shutdown();
                        }
                    }
                    break;
            }
        }
    } finally {
        LOG.warn("QuorumPeer main thread exited");
        try {
            MBeanRegistry.getInstance().unregisterAll();
        } catch (Exception e) {
            LOG.warn("Failed to unregister with JMX", e);
        }
        jmxQuorumBean = null;
        jmxLocalPeerBean = null;
    }
}

FastLeaderElection

public Vote lookForLeader() throws InterruptedException {
    try {
        self.jmxLeaderElectionBean = new LeaderElectionBean();
        MBeanRegistry.getInstance().register(
            self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
    } catch (Exception e) {
        LOG.warn("Failed to register with JMX", e);
        self.jmxLeaderElectionBean = null;
    }
    if (self.start_fle == 0) {
        self.start_fle = System.currentTimeMillis();
    }
    try {
        /**
         * 封装接受的投票信息
         */
        HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
        /**
         * 退出选举是的封装结果:存储选举的结果信息
         */	
        HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();

        int notTimeout = finalizeWait;

        synchronized(this){
            //增加逻辑时钟
            logicalclock++;
            //将自己的myid,zxid,epoch封装到自己的对象中
            updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
        }

        LOG.info("New election. My id =  " + self.getId() +
                 ", proposed zxid=0x" + Long.toHexString(proposedZxid));
        //发送通知,将包含自己信息的实体添加到发送队列中
        sendNotifications();

        /*
         * Loop in which we exchange notifications until we find a leader
         */
        //如果是LOOKING状态,则开始选举
        while ((self.getPeerState() == ServerState.LOOKING) &&
               (!stop)){
            /*
                 * Remove next notification from queue, times out after 2 times
                 * the termination time
                 */
            //从接受队列中拿到投屏消息,自己的投票也在这里面处理
            Notification n = recvqueue.poll(notTimeout,
                                            TimeUnit.MILLISECONDS);

            /*
                 * Sends more notifications if haven't received enough.
                 * Otherwise processes new notification.
                 */
            //未取到结果
            if(n == null){
                if(manager.haveDelivered()){
                    //如果空闲的情况下,消息发送完了,继续重新发送,直到选举中leader为止
                    sendNotifications();
                } else {
                    //消息还没投递出去,可能是其他server还没启动,尝试再连接
                    manager.connectAll();
                }

                /*
                     * Exponential backoff
                     */
                int tmpTimeOut = notTimeout*2;
                notTimeout = (tmpTimeOut < maxNotificationInterval?
                              tmpTimeOut : maxNotificationInterval);
                LOG.info("Notification time out: " + notTimeout);
            }
            //如果取到结果,校验接受到消息是否属于这个集群中的
            else if(self.getVotingView().containsKey(n.sid)) {
                /*
                     * Only proceed if the vote comes from a replica in the
                     * voting view.
                     */
                //选举leader
                switch (n.state) {
                    case LOOKING:
                        // If notification > current, replace and send messages out
                        //如果通知的消息大于当前的消息,则表示当前时新一轮的选举,
                        //将通知消息的epoch号(朝代号)给当前的对象
                        // logicalclock 逻辑时钟,一开始是1,表示是新的开始,
                        //如果取出来的消息的epoch 比当前的逻辑时钟大的话,表示逻辑时钟已经是落后的朝代了,进行重新赋值
                        if (n.electionEpoch > logicalclock) {
                            logicalclock = n.electionEpoch;
                            // 启动接受的投票信息
                            recvset.clear();
                            //比较当前取出的消息是否可以胜出,
                            /**
                             比较规则:
                             首先比较epoch,大者获胜
                             其次,epoch相同时比较zxid,大者获胜
                             最后比较myid,大者获胜
                             */
                            //TODO 比较myid,zxid,epoch,将当前对象的票据改成胜者的票据
                            if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                                   getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                                //当前消息获胜的话,将投票的票据(leader、zxid、epoch)信息改成当前消息的票据
                                updateProposal(n.leader, n.zxid, n.peerEpoch);
                            } else {
                                //否则,将当前的票据进行保持
                                updateProposal(getInitId(),
                                               getInitLastLoggedZxid(),
                                               getPeerEpoch());
                            }
                            // 继续广播,即将当前对象包含票据的信息添加到发送队列中,
                            sendNotifications();
                        } else if (n.electionEpoch < logicalclock) {
                        // 如果取出来的消息的 epoch 比当前的逻辑时钟小,说明该消息已经是以前的了。不做任何处理
                            if(LOG.isDebugEnabled()){
                                LOG.debug("Notification election epoch is smaller than logicalclock."
                                		  + "n.electionEpoch = 0x"
                                          + Long.toHexString(n.electionEpoch)
                                          + ", logicalclock=0x" + Long.toHexString(logicalclock));
                            }
                            break;
                            // 这里表示 取出来的消息的 epoch 和逻辑时钟相等,就继续比较zxid 和 myid ,
                            // 如果胜出,则将该消息的票据进行修改,并且继续广播
                        } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                                       proposedLeader, proposedZxid, proposedEpoch)) {
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                            sendNotifications();
                        }

                        if(LOG.isDebugEnabled()){
                            LOG.debug("Adding vote: from=" + n.sid +
                                      ", proposed leader=" + n.leader +
                                      ", proposed zxid=0x" + Long.toHexString(n.zxid) +
                                      ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
                        }
                        //将当前胜出的最终投票结果添加到接受投票信息的集合中,用来做选举的最终结果的判断
                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

                        //统计recvset集合,查看投某个id的票数是否超过一半
                        if (termPredicate(recvset,
                                          new Vote(proposedLeader, proposedZxid,
                                                   logicalclock, proposedEpoch))) {

                            // Verify if there is any change in the proposed leader
                            while((n = recvqueue.poll(finalizeWait,
                                                      TimeUnit.MILLISECONDS)) != null){
                                if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                                       proposedLeader, proposedZxid, proposedEpoch)){
                                    recvqueue.put(n);
                                    break;
                                }
                            }

                            /*
                             * This predicate is true once we don't read any new
                             * relevant message from the reception queue
                             */
                            //确定leader
                            if (n == null) {
                                //修改状态,LEADING or FOLLOWING or OBSERVEING
                                self.setPeerState((proposedLeader == self.getId()) ?
                                                  ServerState.LEADING: learningState());
                                //返回最终投票结果
                                Vote endVote = new Vote(proposedLeader,
                                                        proposedZxid,
                                                        logicalclock,
                                                        proposedEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }
                        break;
                        //OBSERVING机器不参数选举
                    case OBSERVING:
                        LOG.debug("Notification from observer: " + n.sid);
                        break;
                        //如果收到的选票状态不是LOOKING,比如这台机器刚加入一个已经正在运行的zk集群时
                    case FOLLOWING:
                    case LEADING:
                        /*
                         * Consider all notifications from the same epoch
                         * together.
                         */
                        if(n.electionEpoch == logicalclock){
                            recvset.put(n.sid, new Vote(n.leader,
                                                        n.zxid,
                                                        n.electionEpoch,
                                                        n.peerEpoch));

                            if(ooePredicate(recvset, outofelection, n)) {
                                self.setPeerState((n.leader == self.getId()) ?
                                                  ServerState.LEADING: learningState());

                                Vote endVote = new Vote(n.leader, 
                                                        n.zxid, 
                                                        n.electionEpoch, 
                                                        n.peerEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }

                        /*
                         * Before joining an established ensemble, verify
                         * a majority is following the same leader.
                         */
                        outofelection.put(n.sid, new Vote(n.version,
                                                          n.leader,
                                                          n.zxid,
                                                          n.electionEpoch,
                                                          n.peerEpoch,
                                                          n.state));

                        if(ooePredicate(outofelection, outofelection, n)) {
                            synchronized(this){
                                logicalclock = n.electionEpoch;
                                self.setPeerState((n.leader == self.getId()) ?
                                                  ServerState.LEADING: learningState());
                            }
                            Vote endVote = new Vote(n.leader,
                                                    n.zxid,
                                                    n.electionEpoch,
                                                    n.peerEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                        break;
                    default:
                        LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)",
                                 n.state, n.sid);
                        break;
                }
            } else {
                LOG.warn("Ignoring notification from non-cluster member " + n.sid);
            }
        }
        return null;
    } finally {
        try {
            if(self.jmxLeaderElectionBean != null){
                MBeanRegistry.getInstance().unregister(
                    self.jmxLeaderElectionBean);
            }
        } catch (Exception e) {
            LOG.warn("Failed to unregister with JMX", e);
        }
        self.jmxLeaderElectionBean = null;
        LOG.debug("Number of connection processing threads: {}",
                  manager.getConnectionThreadCount());
    }
}

Zookeeper 概要 和 leader选举、watcher 机制分析

Watcher机制

事件机制

watcher监听机制是Zookeeper中非常重要的特性,我们基于zookeeper 上创建的节点,可以对这些节点绑定监听事件。比如可以监听节点数据的变更、节点删除、子节点状态变更等事件。通过这个事件机制,可以基于zookeeper实现分布式锁,集群管理等功能。

watcher 特性

当数据发生变化时,zookeeper 会产生一个watcher 事件,并且会发送到客户端,但是客户的只会收到一次通知。如果后续这个节点再次发生变化,那么之前设置 watcher 的客户端不会再次收到消息。即watcher是一次性的操作。可以通过循环监听去达到永久监听的效果

注册事件机制

通过这三个操作来绑定事件:

  • getData
  • exists
  • getChildren

凡是事务类型的操作,都会触发监听事件。(create/delete/setData)

事件类型

事件类型 发生原因
None(-1) 客户端连接状态发生变化的时候,会收到none的事件
NodeCreated(1) 创建节点的事件。比如 create /demo
NodeDeleted(2) 删除节点的事件 。比如 delete /demo
NodeDataChanaged(3) 节点数据发生变更。
NodeChildrenChanged(4) 子节点被创建、删除会触发事件

事件操作类型对应事件机制

操作类型 /demo 监听事件 (exists/getData/getChild) /demo/chindren
create(/demo) NodeCreated (exists/getData)
delete(/demo) NodeDeleted (exists/getData)
setData(/demo) NodeDataChanaged (exists/getData)
create(/demo/children) NodeChildChangerend (getChild) NodeCreated
delete(/demo/children) NodeChildChangerend (getChild) NodeDeleted
setData(/demo/children) NodeDataChanaged

Watcher 事件源码分析

注册事件,发送数据到服务端

  1. 构造Zookeeper客户端,客户端注册Watcher 事件,并发送数据给服务端
//构造一个Zookeeper的方法,传入一个Watcher事件,
ZooKeeper zooKeeper = new ZooKeeper("192.168.45.131:2181", 4000, new Watcher() {
    @Override
    public void process(WatchedEvent event) {
        if(Event.KeeperState.SyncConnected==event.getState()){
            //如果是连接响应
            countDownLatch.countDown();
        }
    }
});
//绑定一个exists watcher事件
zooKeeper.exists("/demo", new Watcher() {
    @Override
    public void process(WatchedEvent event) {
        System.out.println("内部watcher:"+event.getType()+":"+event.getPath());
    }
});

源码的实现

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
            boolean canBeReadOnly) throws IOException {
    //将构造时传过来的watcher作为watcherManage的默认watcher
    watchManager.defaultWatcher = watcher;
    
    ConnectStringParser connectStringParser = new ConnectStringParser(
        connectString);
    HostProvider hostProvider = new StaticHostProvider(
        connectStringParser.getServerAddresses());
    //初始化ClientCnxn,并实例化SendThread 和 EventThread 两个内部类线程
    cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
                          hostProvider, sessionTimeout, this, watchManager,
                          getClientCnxnSocket(), canBeReadOnly);
    //调用启动方法,启动sendThread.start();eventThread.start(); 两个线程
    cnxn.start();
}

至此客户端进行 zookeeper 的构造结束。

  1. 下面是 exists 的实现
public Stat exists(final String path, Watcher watcher) {
    final String clientPath = path;
    PathUtils.validatePath(clientPath);

    // the watch contains the un-chroot path
    WatchRegistration wcb = null;
    if (watcher != null) {
        //对 watcher 进行绑定注册
        wcb = new ExistsWatchRegistration(watcher, clientPath);
    }

    final String serverPath = prependChroot(clientPath);

   	//封装请求头,声明 请求类型是 exists
    RequestHeader h = new RequestHeader();
    h.setType(ZooDefs.OpCode.exists);
    //封装 exists 的请求信息,将 节点path 和是否存在watcher 进行封装
    ExistsRequest request = new ExistsRequest();
    request.setPath(serverPath);
    request.setWatch(watcher != null);
    // 设置服务端响应的接收类
    SetDataResponse response = new SetDataResponse();
    //将 requestHeader、existsRequest、setDataResponse、watchRegistration 提交到发送队列中去
    //此时的 cnxn 便是构造 zookeeper 时的 ClientCnxn 对象
    ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
    //返回exists 得到的结果信息 stat 信息
    return response.getStat().getCzxid() == -1 ? null : response.getStat();
}

进入到 ClientCnxn 的 submitRequest 方法中

public ReplyHeader submitRequest(RequestHeader h, Record request,
            Record response, WatchRegistration watchRegistration)
            throws InterruptedException {
    ReplyHeader r = new ReplyHeader();
    //将消息添加到队列,并构造一个Packet 传输对象
    //组装数据包,packet 是客户端和服务端进行通信的最小单元
    Packet packet = queuePacket(h, r, request, response, null, null, null,
                                null, watchRegistration);
    synchronized (packet) {
        while (!packet.finished) {
            //在数据包没有处理完成之前,一致阻塞
            packet.wait();
        }
    }
    return r;
}
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
            Record response, AsyncCallback cb, String clientPath,
            String serverPath, Object ctx, WatchRegistration watchRegistration){
    Packet packet = null;

    // Note that we do not generate the Xid for the packet yet. It is
    // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
    // where the packet is actually sent.
    synchronized (outgoingQueue) {
        // 将相关传输对象装换成Packet
        packet = new Packet(h, r, request, response, watchRegistration);
        packet.cb = cb;
        packet.ctx = ctx;
        packet.clientPath = clientPath;
        packet.serverPath = serverPath;
        if (!state.isAlive() || closing) {
            conLossPacket(packet);
        } else {
            // If the client is asking to close the session then
            // mark as closing
            if (h.getType() == OpCode.closeSession) {
                closing = true;
            }
            //添加到 队列中,
            // LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列,
            //即可以从队列的两端插入和移除元素。
            // 双向队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。
            // LinkedList类是双向列表
            outgoingQueue.add(packet);
        }
    }
    //此处是多路复用机制,唤醒Selector ,告诉他有数据包添加过来了
    sendThread.getClientCnxnSocket().wakeupCnxn();
    return packet;
}
  1. SendThread 的发送过程

在初始化连接的时候,zookeeper初始化了两个线程并且启动了。接下来是分析SendThread 的发送过程,因为是一个线程,所以会调用run方法

//SendThread#run 方法中,这个调用最为重要,
//调用 clientCnxnSocket 发起传输,进入到ClientCnxnSocketNIO.doTransport 方法中
//其中 pendingQueue 是一个用来存放已经发送、等待回应的 packet 队列
//outgoingQueue 是 exists 中将事件通过封装成 Packet 添加到该队列中
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);

总结:client 调用 exists 注册监听以后,会做几个事情:

  • 将请求数据封装为 packet ,添加到 outgoingQueue
  • SendThread 线程会执行数据发送操作,主要是将 outgoingQueue 队列中的数据发送到服务端
  • 通过ClientCnxnSocket.doTransport ,其中ClientCnxnSocket 只是zookeeper 客户端和服务端连接通信的封装。
  • 最后在ClientCnxnSocketNIO.doIO()方法中通过 sock.write(p.bb); 将请求的数据包发送到服务端

服务端接受请求处理流程

  1. 从服务端NettyServerCnxn类 ,开始分析

    服务端有个NettyServerCnxn 类,用来处理客户端发送过来的请求

    //服务端接受客户端发来的数据 NettyServerCnxn 
    public void receiveMessage(ChannelBuffer message) {
        try {
            while(message.readable() && !throttled) {
                //byteBuffer 不为空
                if (bb != null) {
                    //已经读完
                    if (bb.remaining() == 0) {
                       if (initialized) {
                           	// 处理客户端传过来的数据包
                            zks.processPacket(this, bb);
                        }
                    }
                } 
            }
        } catch(IOException e) {
            LOG.warn("Closing connection to " + getRemoteSocketAddress(), e);
            close();
        }
    }
    
    //ZooKeeperServer 
    public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer)  {
        if (h.getType() == OpCode.auth) {
        } else {
            if (h.getType() == OpCode.sasl) {
            }else {
                //不是权限也不是sasl,组装请求参数
                Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
                                         h.getType(), incomingBuffer, cnxn.getAuthInfo());
                si.setOwner(ServerCnxn.me);
                //提交请求
                submitRequest(si);
            }
        }
        cnxn.incrOutstandingRequests(h);
    }
    public void submitRequest(Request si) {
        try {
            touch(si.cnxn);
            boolean validpacket = Request.isValid(si.type);
            if (validpacket) {
                //访问调用链中的方法
                firstProcessor.processRequest(si);
                if (si.cnxn != null) {
                    incInProcess();
                }
            }
        } catch (MissingSessionException e) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Dropping request: " + e.getMessage());
            }
        } catch (RequestProcessorException e) {
            LOG.error("Unable to process request:" + e.getMessage(), e);
        }
    }
    
    

    此处的firstProcessor.processRequest(si); 是一个调用链,查看源码 找到实例化的过程

    /**
     * 封装了三个调用链:
     *  PrepRequestProcessor(SyncRequestProcessor(FinalRequestProcessor()))
     */
    protected void setupRequestProcessors() {
       
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        RequestProcessor syncProcessor = new SyncRequestProcessor(this,
        			finalProcessor);
        //此处和下面启动了两个线程,
        //所以在processRequest 方法执行完后,会用到线程异步去执行
        ((SyncRequestProcessor)syncProcessor).start();
        firstProcessor = new PrepRequestProcessor(this, syncProcessor);
        ((PrepRequestProcessor)firstProcessor).start();
    }
    

    所以了解了调用过程,接下来 firstProcessor.processRequest(si); 实际上是调用 PrepRequestProcessor.processRequest() 方法

    //PrepRequestProcessor
    public void processRequest(Request request) {
        // request.addRQRec(">prep="+zks.outstandingChanges.size());
        // 添加到阻塞队列里之后,会在 run 方法中进行处理
        submittedRequests.add(request);
    }
    public void run() {
        try {
            while (true) {
                Request request = submittedRequests.take();
                long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
                if (request.type == OpCode.ping) {
                    traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
                }
                if (LOG.isTraceEnabled()) {
                    ZooTrace.logRequest(LOG, traceMask, 'P', request, "");
                }
                if (Request.requestOfDeath == request) {
                    break;
                }
               	//去处理请求,处理请求中会按照request.type 进行类处理
                // 而watcher 此时的请求类型是在exists 方法中指定了 exists 类型
                // 最终都会执行 nextProcessor.processRequest(request); 方法
                pRequest(request);
            }
        } catch (RequestProcessorException e) {
            if (e.getCause() instanceof XidRolloverException) {
                LOG.info(e.getCause().getMessage());
            }
            handleException(this.getName(), e);
        } catch (Exception e) {
            handleException(this.getName(), e);
        }
        LOG.info("PrepRequestProcessor exited loop!");
    }
    protected void pRequest(Request request) throws RequestProcessorException {
        //调用 SyncRequestProcessor.processRequest()
        nextProcessor.processRequest(request);
    }
    

    SyncRequestProcessor.processRequest() 方法

    public void processRequest(Request request) {
        // request.addRQRec(">sync");
        // 也是通过调用run 方法进行异步操作
        queuedRequests.add(request);
    }
    @Override
    public void run() {
        try {
            while (true) {
                if (si != null) {
                } else if (toFlush.isEmpty()) {
                    // optimization for read heavy workloads
                    // iff this is a read, and there are no pending
                    // flushes (writes), then just pass this to the next
                    // processor
                    if (nextProcessor != null) {
                       	//调用 FinalRequestProcessor.processRequest()
                        nextProcessor.processRequest(si);
                        if (nextProcessor instanceof Flushable) {
                            ((Flushable)nextProcessor).flush();
                        }
                    }
                    continue;
                }
            }
        }
    } catch (Throwable t) {
    }
    }
    

    调用 FinalRequestProcessor.processRequest() 方法

    //调用 FinalRequestProcessor
    public void processRequest(Request request) {
        ServerCnxn cnxn = request.cnxn;
        try {
            switch (request.type) {
                case OpCode.exists: {
                    lastOp = "EXIS";
                    // TODO we need to figure out the security requirement for this!
                    ExistsRequest existsRequest = new ExistsRequest();
                    ByteBufferInputStream.byteBuffer2Record(request.request,
                                                            existsRequest);
                    String path = existsRequest.getPath();
                    if (path.indexOf('\0') != -1) {
                        throw new KeeperException.BadArgumentsException();
                    }
                    //获得 节点信息 ,最后调用DataTree.statNode 将watch 事件进行存储
                    Stat stat = zks.getZKDatabase().statNode(path, existsRequest
                                                             .getWatch() ? cnxn : null);
                    rsp = new ExistsResponse(stat);+++
                    break;
    		} 
    	} catch (SessionMovedException e) { 
    	}
    }
    

Zookeeper 概要 和 leader选举、watcher 机制分析

相关文章: