RMI
RPC
RPC 远程过程调用,一般用来实现部署在不同机器上的系统之间的方法调用,使得程序能够像访问本地系统资源一样,通过网络传输去访问远端系统资源;对于客户端来说,传输层使用什么协议,序列化、反序列化都是透明的。
Java RMI与RPC的区别
RMI
RMI 全称是remote method invocation – 远程方法调用。一种用于远程过程调用的应用程序编程接口,是纯java的网络分布式应用系统的核心解决方案之一。
远程对象必须继承UnicastRemoteObject,这样才能保证客户端访问获得远程对象时,该远程对象会把自身的一个拷贝以Socket 形式传输给客户端,客户端获得的拷贝称为“stub” , 而服务器端本身已经存在的远程对象称为“skeleton”,此时客户端的stub 是客户端的一个代理,用于与服务器端进行通信,而skeleton 是服务端的一个代理,用于接收客户端的请求之后调用远程方法来响应客户端的请求
RMI远程调用步骤:
1,客户调用客户端辅助对象stub上的方法
2,客户端辅助对象stub打包调用信息(变量,方法名),通过网络发送给服务端辅助对象skeleton
3,服务端辅助对象skeleton将客户端辅助对象发送来的信息解包,找出真正被调用的方法以及该方法所在对象
4,调用真正服务对象上的真正方法,并将结果返回给服务端辅助对象skeleton
5,服务端辅助对象将结果打包,发送给客户端辅助对象stub
6,客户端辅助对象将返回值解包,返回给调用者
7,客户获得返回值
Zookeeper
是什么:Zookeeper 是一个开源的分布式协调服务,
节点特性:包含持久化、临时节点、有序节点,同级节点的唯一性、临时节点不能存在子节点
数据模型:znode ,类似文件夹目录
功能
使用集群防止单点故障,
- 具有三个角色:
- leader:所有事务请求必须由一个全局唯的服器来协调处理
- follower:参与事务请求的投票以及leader的选举投票
- observer:只提供非事物 请求,在不影响集群事务处理能力的前提下提升非事务处理的能力,observer不需要参与投票的过程,但是必须要同步leader的数据,从而在处理请求的时候保证数据的一致性
采用ZAB协议保证数据的一致性,
ZAB 协议是为分布式协调服务 Zookeeper 专门设计的一种支持崩溃恢复的原子广播协议。
在Zookeeper 中主要依赖ZAB协议来实现分布式数据的一致性。
消息广播
当整个集群启动或leader崩溃的时候,ZAB协议便会进入恢复模式,当选举出leader的时候,并与过半的follower节点完成数据同步以后,就会进入消息广播节点
- 消息广播的过程实际上是一个简化版本的二阶段提交过程
![]()
崩溃恢复
当leader节点崩溃或者leader服务器失去了过半的follower节点的联系,变户籍进入崩溃恢复阶段
- 已经处理的消息不能丢失
- 被丢弃的消息不能再次出现
主要是通过zxid 和 epoch 来实现
ZAB 来实现选举
2pc来做数据一致性
leader 选举(QuorumPeerMain)
###服务器启动时的leader选举
- 每个节点启动的时候状态都是LOOKING ,处于观望状态,
- 每个server发出一个投票,比如:(myid,zxid,epoch) 进行封装:server1(1,0,1),server2(2,0,1)会根据自己的epoch和zxid 及 myid进行比较票据,各自将这个票据发给集群中其他机器。
- 先比较epoch 大者胜出
- 如果epoch相等,则进行比较zxid(事务id) 大者胜出
- 如果zxid也相等,则进行比较myid,大者胜出
- 如果票据胜出者将进行修改票据并广播票据给其他机器,再次进行投票,直到选出leader
- 统计票据,当过半的机器接受相同的投票信息,对server1,server2而言,都统计出集群中已经有两台机器接受了(2,0,1) 的投票信息,此时便认为已经选出了leader
- 改变服务器状态。一旦确定了leader,每个服务器就会更新自己的状态,如果是follower 就会变更为following,如果是leader,就变更为leading。
运行过程中的leader选举
- 当leader节点出现宕机或不可用时,会进入新一轮的leader选举
- 变更状态,leader挂后,剩下非Observer服务器都会将自己的服务器状态变更为LOOKING状态,然后开始进入leader选举
- 处理投票此时的(myid,zxid,epoch) 可能会不相同,此时投票过程与启动过程相同。
- 统计投票,与启动过程相同。
- 改变服务器状态,与启动过程相同。
【分布式】Zookeeper的Leader选举-选举过程介绍比较清晰
Leader 选举源码分析
QuorumPeerMain.main() 分析入口
public static void main(String[] args) {
QuorumPeerMain main = new QuorumPeerMain();
try {
main.initializeAndRun(args);
} catch (Exception e) {
LOG.error("Unexpected exception, exiting abnormally", e);
System.exit(ExitCode.UNEXPECTED_ERROR.getValue());
}
}
protected void initializeAndRun(String[] args)
throws ConfigException, IOException {
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
config.parse(args[0]);
}
// Start and schedule the the purge task
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(
config.getDataDir(),
config.getDataLogDir(),
config.getSnapRetainCount(),
config.getPurgeInterval());
purgeMgr.start();
//如果是集群,则读取配置的信息
if (args.length == 1 && config.servers.size() > 0) {
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// there is only server in the quorum -- run as standalone
ZooKeeperServerMain.main(args);
}
}
public void runFromConfig(QuorumPeerConfig config) throws IOException {
LOG.info("Starting quorum peer");
try {
ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns());
// 从配置文件中加载配置信息
quorumPeer = new QuorumPeer(config.getServers(),
new File(config.getDataDir()),
new File(config.getDataLogDir()),
//将配置文件中的选举类型给quorumPeer对象,默认类型是3
config.getElectionAlg(),
config.getServerId(),
config.getTickTime(),
config.getInitLimit(),
config.getSyncLimit(),
config.getQuorumListenOnAllIPs(),
cnxnFactory,
config.getQuorumVerifier());
quorumPeer.setClientPortAddress(config.getClientPortAddress());
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setLearnerType(config.getPeerType());
quorumPeer.setSyncEnabled(config.getSyncEnabled());
// sets quorum sasl authentication configurations
quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
if(quorumPeer.isQuorumSaslAuthEnabled()){
quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
}
quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
quorumPeer.initialize();
// 开始启动
quorumPeer.start();
quorumPeer.join();
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Quorum Peer interrupted", e);
}
}
QuorumPeer
public synchronized void start() {
//加载数据
loadDataBase();
cnxnFactory.start();
//开始选举 leader
startLeaderElection();
// 此时调用 run()
super.start();
}
public void run() {
try {
while (running) {
switch (getPeerState()) {
case LOOKING:
LOG.info("LOOKING");
if (Boolean.getBoolean("readonlymode.enabled")) {
LOG.info("Attempting to start ReadOnlyZooKeeperServer");
try {
roZkMgr.start();
setBCVote(null);
//todo leader选举核心内容
//设置当前的投票,通过策略模式来决定当前用哪个选举算法来进行领导选举,
//默认是FastLeaderElection
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
setPeerState(ServerState.LOOKING);
} finally {
// If the thread is in the the grace period, interrupt
// to come out of waiting.
roZkMgr.interrupt();
roZk.shutdown();
}
}
break;
}
}
} finally {
LOG.warn("QuorumPeer main thread exited");
try {
MBeanRegistry.getInstance().unregisterAll();
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
jmxQuorumBean = null;
jmxLocalPeerBean = null;
}
}
FastLeaderElection
public Vote lookForLeader() throws InterruptedException {
try {
self.jmxLeaderElectionBean = new LeaderElectionBean();
MBeanRegistry.getInstance().register(
self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
self.jmxLeaderElectionBean = null;
}
if (self.start_fle == 0) {
self.start_fle = System.currentTimeMillis();
}
try {
/**
* 封装接受的投票信息
*/
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
/**
* 退出选举是的封装结果:存储选举的结果信息
*/
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
int notTimeout = finalizeWait;
synchronized(this){
//增加逻辑时钟
logicalclock++;
//将自己的myid,zxid,epoch封装到自己的对象中
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
LOG.info("New election. My id = " + self.getId() +
", proposed zxid=0x" + Long.toHexString(proposedZxid));
//发送通知,将包含自己信息的实体添加到发送队列中
sendNotifications();
/*
* Loop in which we exchange notifications until we find a leader
*/
//如果是LOOKING状态,则开始选举
while ((self.getPeerState() == ServerState.LOOKING) &&
(!stop)){
/*
* Remove next notification from queue, times out after 2 times
* the termination time
*/
//从接受队列中拿到投屏消息,自己的投票也在这里面处理
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);
/*
* Sends more notifications if haven't received enough.
* Otherwise processes new notification.
*/
//未取到结果
if(n == null){
if(manager.haveDelivered()){
//如果空闲的情况下,消息发送完了,继续重新发送,直到选举中leader为止
sendNotifications();
} else {
//消息还没投递出去,可能是其他server还没启动,尝试再连接
manager.connectAll();
}
/*
* Exponential backoff
*/
int tmpTimeOut = notTimeout*2;
notTimeout = (tmpTimeOut < maxNotificationInterval?
tmpTimeOut : maxNotificationInterval);
LOG.info("Notification time out: " + notTimeout);
}
//如果取到结果,校验接受到消息是否属于这个集群中的
else if(self.getVotingView().containsKey(n.sid)) {
/*
* Only proceed if the vote comes from a replica in the
* voting view.
*/
//选举leader
switch (n.state) {
case LOOKING:
// If notification > current, replace and send messages out
//如果通知的消息大于当前的消息,则表示当前时新一轮的选举,
//将通知消息的epoch号(朝代号)给当前的对象
// logicalclock 逻辑时钟,一开始是1,表示是新的开始,
//如果取出来的消息的epoch 比当前的逻辑时钟大的话,表示逻辑时钟已经是落后的朝代了,进行重新赋值
if (n.electionEpoch > logicalclock) {
logicalclock = n.electionEpoch;
// 启动接受的投票信息
recvset.clear();
//比较当前取出的消息是否可以胜出,
/**
比较规则:
首先比较epoch,大者获胜
其次,epoch相同时比较zxid,大者获胜
最后比较myid,大者获胜
*/
//TODO 比较myid,zxid,epoch,将当前对象的票据改成胜者的票据
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
//当前消息获胜的话,将投票的票据(leader、zxid、epoch)信息改成当前消息的票据
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
//否则,将当前的票据进行保持
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
// 继续广播,即将当前对象包含票据的信息添加到发送队列中,
sendNotifications();
} else if (n.electionEpoch < logicalclock) {
// 如果取出来的消息的 epoch 比当前的逻辑时钟小,说明该消息已经是以前的了。不做任何处理
if(LOG.isDebugEnabled()){
LOG.debug("Notification election epoch is smaller than logicalclock."
+ "n.electionEpoch = 0x"
+ Long.toHexString(n.electionEpoch)
+ ", logicalclock=0x" + Long.toHexString(logicalclock));
}
break;
// 这里表示 取出来的消息的 epoch 和逻辑时钟相等,就继续比较zxid 和 myid ,
// 如果胜出,则将该消息的票据进行修改,并且继续广播
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
if(LOG.isDebugEnabled()){
LOG.debug("Adding vote: from=" + n.sid +
", proposed leader=" + n.leader +
", proposed zxid=0x" + Long.toHexString(n.zxid) +
", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
}
//将当前胜出的最终投票结果添加到接受投票信息的集合中,用来做选举的最终结果的判断
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
//统计recvset集合,查看投某个id的票数是否超过一半
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock, proposedEpoch))) {
// Verify if there is any change in the proposed leader
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){
recvqueue.put(n);
break;
}
}
/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
//确定leader
if (n == null) {
//修改状态,LEADING or FOLLOWING or OBSERVEING
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
//返回最终投票结果
Vote endVote = new Vote(proposedLeader,
proposedZxid,
logicalclock,
proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
//OBSERVING机器不参数选举
case OBSERVING:
LOG.debug("Notification from observer: " + n.sid);
break;
//如果收到的选票状态不是LOOKING,比如这台机器刚加入一个已经正在运行的zk集群时
case FOLLOWING:
case LEADING:
/*
* Consider all notifications from the same epoch
* together.
*/
if(n.electionEpoch == logicalclock){
recvset.put(n.sid, new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch));
if(ooePredicate(recvset, outofelection, n)) {
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
/*
* Before joining an established ensemble, verify
* a majority is following the same leader.
*/
outofelection.put(n.sid, new Vote(n.version,
n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch,
n.state));
if(ooePredicate(outofelection, outofelection, n)) {
synchronized(this){
logicalclock = n.electionEpoch;
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
}
Vote endVote = new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
default:
LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)",
n.state, n.sid);
break;
}
} else {
LOG.warn("Ignoring notification from non-cluster member " + n.sid);
}
}
return null;
} finally {
try {
if(self.jmxLeaderElectionBean != null){
MBeanRegistry.getInstance().unregister(
self.jmxLeaderElectionBean);
}
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
self.jmxLeaderElectionBean = null;
LOG.debug("Number of connection processing threads: {}",
manager.getConnectionThreadCount());
}
}
Watcher机制
事件机制
watcher监听机制是Zookeeper中非常重要的特性,我们基于zookeeper 上创建的节点,可以对这些节点绑定监听事件。比如可以监听节点数据的变更、节点删除、子节点状态变更等事件。通过这个事件机制,可以基于zookeeper实现分布式锁,集群管理等功能。
watcher 特性
当数据发生变化时,zookeeper 会产生一个watcher 事件,并且会发送到客户端,但是客户的只会收到一次通知。如果后续这个节点再次发生变化,那么之前设置 watcher 的客户端不会再次收到消息。即watcher是一次性的操作。可以通过循环监听去达到永久监听的效果
注册事件机制
通过这三个操作来绑定事件:
- getData
- exists
- getChildren
凡是事务类型的操作,都会触发监听事件。(create/delete/setData)
事件类型
| 事件类型 | 发生原因 |
|---|---|
| None(-1) | 客户端连接状态发生变化的时候,会收到none的事件 |
| NodeCreated(1) | 创建节点的事件。比如 create /demo |
| NodeDeleted(2) | 删除节点的事件 。比如 delete /demo |
| NodeDataChanaged(3) | 节点数据发生变更。 |
| NodeChildrenChanged(4) | 子节点被创建、删除会触发事件 |
事件操作类型对应事件机制
| 操作类型 | /demo 监听事件 (exists/getData/getChild)
|
/demo/chindren |
|---|---|---|
| create(/demo) | NodeCreated (exists/getData)
|
无 |
| delete(/demo) | NodeDeleted (exists/getData)
|
无 |
| setData(/demo) | NodeDataChanaged (exists/getData)
|
无 |
| create(/demo/children) | NodeChildChangerend (getChild)
|
NodeCreated |
| delete(/demo/children) | NodeChildChangerend (getChild)
|
NodeDeleted |
| setData(/demo/children) | NodeDataChanaged |
Watcher 事件源码分析
注册事件,发送数据到服务端
- 构造Zookeeper客户端,客户端注册Watcher 事件,并发送数据给服务端
//构造一个Zookeeper的方法,传入一个Watcher事件,
ZooKeeper zooKeeper = new ZooKeeper("192.168.45.131:2181", 4000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if(Event.KeeperState.SyncConnected==event.getState()){
//如果是连接响应
countDownLatch.countDown();
}
}
});
//绑定一个exists watcher事件
zooKeeper.exists("/demo", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("内部watcher:"+event.getType()+":"+event.getPath());
}
});
源码的实现
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
boolean canBeReadOnly) throws IOException {
//将构造时传过来的watcher作为watcherManage的默认watcher
watchManager.defaultWatcher = watcher;
ConnectStringParser connectStringParser = new ConnectStringParser(
connectString);
HostProvider hostProvider = new StaticHostProvider(
connectStringParser.getServerAddresses());
//初始化ClientCnxn,并实例化SendThread 和 EventThread 两个内部类线程
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), canBeReadOnly);
//调用启动方法,启动sendThread.start();eventThread.start(); 两个线程
cnxn.start();
}
至此客户端进行 zookeeper 的构造结束。
- 下面是 exists 的实现
public Stat exists(final String path, Watcher watcher) {
final String clientPath = path;
PathUtils.validatePath(clientPath);
// the watch contains the un-chroot path
WatchRegistration wcb = null;
if (watcher != null) {
//对 watcher 进行绑定注册
wcb = new ExistsWatchRegistration(watcher, clientPath);
}
final String serverPath = prependChroot(clientPath);
//封装请求头,声明 请求类型是 exists
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.exists);
//封装 exists 的请求信息,将 节点path 和是否存在watcher 进行封装
ExistsRequest request = new ExistsRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
// 设置服务端响应的接收类
SetDataResponse response = new SetDataResponse();
//将 requestHeader、existsRequest、setDataResponse、watchRegistration 提交到发送队列中去
//此时的 cnxn 便是构造 zookeeper 时的 ClientCnxn 对象
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
//返回exists 得到的结果信息 stat 信息
return response.getStat().getCzxid() == -1 ? null : response.getStat();
}
进入到 ClientCnxn 的 submitRequest 方法中
public ReplyHeader submitRequest(RequestHeader h, Record request,
Record response, WatchRegistration watchRegistration)
throws InterruptedException {
ReplyHeader r = new ReplyHeader();
//将消息添加到队列,并构造一个Packet 传输对象
//组装数据包,packet 是客户端和服务端进行通信的最小单元
Packet packet = queuePacket(h, r, request, response, null, null, null,
null, watchRegistration);
synchronized (packet) {
while (!packet.finished) {
//在数据包没有处理完成之前,一致阻塞
packet.wait();
}
}
return r;
}
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
Record response, AsyncCallback cb, String clientPath,
String serverPath, Object ctx, WatchRegistration watchRegistration){
Packet packet = null;
// Note that we do not generate the Xid for the packet yet. It is
// generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
// where the packet is actually sent.
synchronized (outgoingQueue) {
// 将相关传输对象装换成Packet
packet = new Packet(h, r, request, response, watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
// If the client is asking to close the session then
// mark as closing
if (h.getType() == OpCode.closeSession) {
closing = true;
}
//添加到 队列中,
// LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列,
//即可以从队列的两端插入和移除元素。
// 双向队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。
// LinkedList类是双向列表
outgoingQueue.add(packet);
}
}
//此处是多路复用机制,唤醒Selector ,告诉他有数据包添加过来了
sendThread.getClientCnxnSocket().wakeupCnxn();
return packet;
}
- SendThread 的发送过程
在初始化连接的时候,zookeeper初始化了两个线程并且启动了。接下来是分析SendThread 的发送过程,因为是一个线程,所以会调用run方法
//SendThread#run 方法中,这个调用最为重要,
//调用 clientCnxnSocket 发起传输,进入到ClientCnxnSocketNIO.doTransport 方法中
//其中 pendingQueue 是一个用来存放已经发送、等待回应的 packet 队列
//outgoingQueue 是 exists 中将事件通过封装成 Packet 添加到该队列中
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
总结:client 调用 exists 注册监听以后,会做几个事情:
- 将请求数据封装为 packet ,添加到 outgoingQueue
- SendThread 线程会执行数据发送操作,主要是将 outgoingQueue 队列中的数据发送到服务端
- 通过ClientCnxnSocket.doTransport ,其中ClientCnxnSocket 只是zookeeper 客户端和服务端连接通信的封装。
- 最后在ClientCnxnSocketNIO.doIO()方法中通过
sock.write(p.bb);将请求的数据包发送到服务端
服务端接受请求处理流程
-
从服务端NettyServerCnxn类 ,开始分析
服务端有个NettyServerCnxn 类,用来处理客户端发送过来的请求
//服务端接受客户端发来的数据 NettyServerCnxn public void receiveMessage(ChannelBuffer message) { try { while(message.readable() && !throttled) { //byteBuffer 不为空 if (bb != null) { //已经读完 if (bb.remaining() == 0) { if (initialized) { // 处理客户端传过来的数据包 zks.processPacket(this, bb); } } } } } catch(IOException e) { LOG.warn("Closing connection to " + getRemoteSocketAddress(), e); close(); } }//ZooKeeperServer public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) { if (h.getType() == OpCode.auth) { } else { if (h.getType() == OpCode.sasl) { }else { //不是权限也不是sasl,组装请求参数 Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo()); si.setOwner(ServerCnxn.me); //提交请求 submitRequest(si); } } cnxn.incrOutstandingRequests(h); } public void submitRequest(Request si) { try { touch(si.cnxn); boolean validpacket = Request.isValid(si.type); if (validpacket) { //访问调用链中的方法 firstProcessor.processRequest(si); if (si.cnxn != null) { incInProcess(); } } } catch (MissingSessionException e) { if (LOG.isDebugEnabled()) { LOG.debug("Dropping request: " + e.getMessage()); } } catch (RequestProcessorException e) { LOG.error("Unable to process request:" + e.getMessage(), e); } }此处的firstProcessor.processRequest(si); 是一个调用链,查看源码 找到实例化的过程
/** * 封装了三个调用链: * PrepRequestProcessor(SyncRequestProcessor(FinalRequestProcessor())) */ protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor); //此处和下面启动了两个线程, //所以在processRequest 方法执行完后,会用到线程异步去执行 ((SyncRequestProcessor)syncProcessor).start(); firstProcessor = new PrepRequestProcessor(this, syncProcessor); ((PrepRequestProcessor)firstProcessor).start(); }所以了解了调用过程,接下来
firstProcessor.processRequest(si);实际上是调用 PrepRequestProcessor.processRequest() 方法//PrepRequestProcessor public void processRequest(Request request) { // request.addRQRec(">prep="+zks.outstandingChanges.size()); // 添加到阻塞队列里之后,会在 run 方法中进行处理 submittedRequests.add(request); } public void run() { try { while (true) { Request request = submittedRequests.take(); long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK; if (request.type == OpCode.ping) { traceMask = ZooTrace.CLIENT_PING_TRACE_MASK; } if (LOG.isTraceEnabled()) { ZooTrace.logRequest(LOG, traceMask, 'P', request, ""); } if (Request.requestOfDeath == request) { break; } //去处理请求,处理请求中会按照request.type 进行类处理 // 而watcher 此时的请求类型是在exists 方法中指定了 exists 类型 // 最终都会执行 nextProcessor.processRequest(request); 方法 pRequest(request); } } catch (RequestProcessorException e) { if (e.getCause() instanceof XidRolloverException) { LOG.info(e.getCause().getMessage()); } handleException(this.getName(), e); } catch (Exception e) { handleException(this.getName(), e); } LOG.info("PrepRequestProcessor exited loop!"); } protected void pRequest(Request request) throws RequestProcessorException { //调用 SyncRequestProcessor.processRequest() nextProcessor.processRequest(request); }SyncRequestProcessor.processRequest() 方法
public void processRequest(Request request) { // request.addRQRec(">sync"); // 也是通过调用run 方法进行异步操作 queuedRequests.add(request); } @Override public void run() { try { while (true) { if (si != null) { } else if (toFlush.isEmpty()) { // optimization for read heavy workloads // iff this is a read, and there are no pending // flushes (writes), then just pass this to the next // processor if (nextProcessor != null) { //调用 FinalRequestProcessor.processRequest() nextProcessor.processRequest(si); if (nextProcessor instanceof Flushable) { ((Flushable)nextProcessor).flush(); } } continue; } } } } catch (Throwable t) { } }调用 FinalRequestProcessor.processRequest() 方法
//调用 FinalRequestProcessor public void processRequest(Request request) { ServerCnxn cnxn = request.cnxn; try { switch (request.type) { case OpCode.exists: { lastOp = "EXIS"; // TODO we need to figure out the security requirement for this! ExistsRequest existsRequest = new ExistsRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, existsRequest); String path = existsRequest.getPath(); if (path.indexOf('\0') != -1) { throw new KeeperException.BadArgumentsException(); } //获得 节点信息 ,最后调用DataTree.statNode 将watch 事件进行存储 Stat stat = zks.getZKDatabase().statNode(path, existsRequest .getWatch() ? cnxn : null); rsp = new ExistsResponse(stat);+++ break; } } catch (SessionMovedException e) { } }