MsgBeat和MsgHeartbeat消息
Leader推动心跳计时器(heartbeatElapsed),而Follower推动选举计时器(electionElapsed),选举计时器的流程前面已经提到,这里主要介绍心跳计时器。当上层模块调用Tick()时,Leader会推动心跳计时器,在tickHeartbeat函数中有详细介绍。有两种情况会是Leader向集群中其他的节点发送心跳消息分别是心跳计时器超时和在ReadOnly模式下Leader接收到客户端请求的只读消息,这里我们主要介绍心跳计时器超时的情况。
MsgBeat和MsgHeartbeat消息的主要区别是MsgBeat是本地消息,而发给集群中其他节点的心跳消息是使用MsgHeartbeat消息
Leader调用tickHeartbeat推动心跳计时器
当心跳计时器超时时会向其他节点发送心跳,如果raft的配置checkQuorum为true,则也会发送MsgCheckQuorum消息。tickHeartbeat的主要流程:
1.递增心跳计时器和选举计时器
2.如果选举计时器大于等于选举超时时间
重置选举计时器,Leader节点不会主动发起选举
检测当前节点是否与集群中的大多数节点连通,如果超过半数以上的节点连通则发送MsgCheckQuorum确认
如果有节点正在转移,则禁止节点转移
3.如果选举计时器没有超时
如果当前节点不是Leader,则直接返回
心跳计时器超时,则发送MsgBeat消息,继续维持向其余节点发送心跳消息,并重置心跳计时器
func (r *raft) tickHeartbeat() {
r.heartbeatElapsed++ //递增心跳计时器
r.electionElapsed++ //递增选举计时器
if r.electionElapsed >= r.electionTimeout { //当选举计时器大于等于选举超时时间
r.electionElapsed = 0 //重置选举计时器,Leader节点不会主动发起选举
if r.checkQuorum { //检测当前节点是否与集群中的大多数节点连通
r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum})
}
// If current leader cannot transfer leadership in electionTimeout, it becomes leader again.
if r.state == StateLeader && r.leadTransferee != None { //禁止节点转移
r.abortLeaderTransfer() //清空raft.leadTransferee字段,放弃转移
}
}
if r.state != StateLeader { //检测当前节点是否是Leader,不是Leader直接返回
return
}
if r.heartbeatElapsed >= r.heartbeatTimeout { //心跳计时器超时,则发生MsgBeat消息,继续维持向其余节点发送心跳
r.heartbeatElapsed = 0 //重置心跳计时器
r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
}
}
Leader发送MsgBeat类型的消息流程
Leader发送MsgBeat类型的消息主要是通过bcastHeartbeat方法向集群中的其他节点广播发送,如果readOnly中有只读消息,会在消息的上下文中携带readOnly中最后一条消息的消息ID,Follower节点收到后会在上下文中携带该消息ID响应。
//广播发送心跳
func (r *raft) bcastHeartbeat() {
//获取readOnly最新一次的上下文
lastCtx := r.readOnly.lastPendingRequestCtx()
if len(lastCtx) == 0 {
r.bcastHeartbeatWithCtx(nil)
} else {
r.bcastHeartbeatWithCtx([]byte(lastCtx))
}
}
//遍历向集群中的其他节点发送心跳消息
func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
r.forEachProgress(func(id uint64, _ *Progress) {
if id == r.id { //过滤当前节点
return
}
r.sendHeartbeat(id, ctx) //向指定的节点发送MsgBeat消息
})
}
//向指定节点发送消息
func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
// Attach the commit as min(to.matched, r.committed).
// When the leader sends out heartbeat message,
// the receiver(follower) might not be matched with the leader
// or it might not have all the committed entries.
// The leader MUST NOT forward the follower's commit to
// an unmatched index.
//由于集群中的节点不一定都收到全部已提交的Entry记录,所以心跳消息携带commit提交位置
消息类型为MsgHeartbeat
commit := min(r.getProgress(to).Match, r.raftLog.committed)
m := pb.Message{
To: to,
Type: pb.MsgHeartbeat,
Commit: commit,
Context: ctx,
}
r.send(m)
}
Follower节点接收到MsgHeartbeat消息的处理流程
Follower收到心跳计时器会重置选举计时器,并且指定Leader,并且调用handleHeartbeat方法处理消息。
func stepFollower(r *raft,m pb.Message) error {
switch m.Type {
//其他类型消息的处理略
......
case pb.MsgHeartbeat: //如果是心跳消息
r.electionElapsed = 0 //重置选举计时器
r.lead = m.From //指定leader
r.handleHeartbeat(m) //处理心跳消息
}
}
处理心跳消息比较简单,节点尝试更新raftLog中已提交的位置,并且发送心跳回复消息MsgHeartbeatResp
func (r *raft) handleHeartbeat(m pb.Message) {
r.raftLog.commitTo(m.Commit) //更新raftLog中已提交的位置
r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context}) //心跳回复
}
MsgHeartbeatResp消息
1.设置该节点的RecentActive字段为true,表示该节点存活
2.将Paused设为false,表示可以继续向该Follower节点发送消息
3.如果消息队列满了,则释放inflights中第一个消息,这样就可以开始后续消息的发送
4.当Leader节点收到Follower节点的MsgHeartbeat消息之后,会比较对应的Match值与Leader节点的raftLog,从而判断Follower节点是否已拥有了全部的Entry记录
如果该Follower缺少Entry,则Leader会发送缺少的Entry记录
5.如果该节点的只读模式不是ReadOnlySafe,则返回
6.统计目前为止响应上述携带消息ID的MsgHeartbeat消息的节点个数,如果没有超过半数则退出
7.如果响应心跳的节点数超过半数,则会清空readOnly中指定消息ID及其之前的所有相关记录
实现如下
根据MsgReadIndex消息的From字段,判断该MsgReadIndex消息是否为Follower节点转发到Leader节点的消息
如果是客户端直接发送到Leader节点的消息,
则将MsgReadIndex消息对应的已提交位置以及其消息ID封装成ReadState实例,添加到raft.readStates中保存。
后续会有其他goroutine读取该数组,并对相应的MsgReadIndex消息进行响应
如果其他Follower节点转发到Leader节点的MsgReadIndex消息,
则Leader会向Follower节点返回相应的MsgReadIndex消息,并由Follower节点响应Client。
func stepLeader(r *raft, m pb.Message) error {
//其他类型的消息略
case pb.MsgHeartbeatResp: //处理心跳回复消息
pr.RecentActive = true //设置RecentActive字段
pr.resume() //设置Paused为false,即可以重新发送消息
// free one slot for the full inflights window to allow progress.
if pr.State == ProgressStateReplicate && pr.ins.full() { //队列已满
pr.ins.freeFirstOne() //释放第一个消息,这样就可以开始后续消息的发送
}
//判断Follower是否已拥有全部的Entry记录
if pr.Match < r.raftLog.lastIndex() {
r.sendAppend(m.From) //通过向指定节点发送MsgApp消息完成Entry记录的复制
}
if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
return nil
}
ackCount := r.readOnly.recvAck(m)
if ackCount < r.quorum() {
return nil
}
rss := r.readOnly.advance(m)
for _, rs := range rss {
req := rs.req
if req.From == None || req.From == r.id {
/*
根据MsgReadIndex消息的From字段,判断该MsgReadIndex消息是否为Follower节点转发到Leader节点的消息
如果是客户端直接发送到Leader节点的消息,
则将MsgReadIndex消息对应的已提交位置以及其消息ID封装成ReadState实例,添加到raft.readStates中保存。
后续会有其他goroutine读取该数组,并对相应的MsgReadIndex消息进行响应
*/
r.readStates = append(r.readStates, ReadState{Index: rs.index, RequestCtx: req.Entries[0].Data})
} else {
//如果其他Follower节点转发到Leader节点的MsgReadIndex消息,
// 则Leader会向Follower节点返回相应的MsgReadIndex消息,并由Follower节点响应。
r.send(pb.Message{To: req.From, Type: pb.MsgReadIndexResp, Index: rs.index, Entries: req.Entries})
}
}
}
MsgCheckQuorum消息
MsgCheckQuorum消息和MsgBeat消息的Term字段都为0,因为它们都属于本地消息。当Leader 的心跳计时器超时,并且开启了checkQuorum模式(raft的checkQuorum字段为true)。该Leader节点就会发送MsgCheckQuorum消息检测与集群中其他节点是否保持半数以上的连接,如果没有则变成Follower节点。
MsgCheckQuorum类型的消息比较简单,这里就不在累述。
func stepLeader(r *raft, m pb.Message) error {
switch m.Type {
//其他类型的消息省略
case pb.MsgCheckQuorum:
if !r.checkQuorumActive() { //检测当前节点是否与集群中大多数节点连通,如果不连通则切换成Follower
r.logger.Warningf("%x stepped down to follower since quorum is not active", r.id)
r.becomeFollower(r.Term, None)
}
return nil
}