MsgBeat和MsgHeartbeat消息

      Leader推动心跳计时器(heartbeatElapsed),而Follower推动选举计时器(electionElapsed),选举计时器的流程前面已经提到,这里主要介绍心跳计时器。当上层模块调用Tick()时,Leader会推动心跳计时器,在tickHeartbeat函数中有详细介绍。有两种情况会是Leader向集群中其他的节点发送心跳消息分别是心跳计时器超时和在ReadOnly模式下Leader接收到客户端请求的只读消息,这里我们主要介绍心跳计时器超时的情况。

 MsgBeat和MsgHeartbeat消息的主要区别是MsgBeat是本地消息,而发给集群中其他节点的心跳消息是使用MsgHeartbeat消息

etcd中raft协议的消息(四) —— 心跳相关的消息(MsgBeat、MsgHeartbeat、MsgHeartbeatResp和MsgCheckQuorum)

Leader调用tickHeartbeat推动心跳计时器

    当心跳计时器超时时会向其他节点发送心跳,如果raft的配置checkQuorum为true,则也会发送MsgCheckQuorum消息。tickHeartbeat的主要流程:

1.递增心跳计时器和选举计时器

2.如果选举计时器大于等于选举超时时间

                重置选举计时器,Leader节点不会主动发起选举

                 检测当前节点是否与集群中的大多数节点连通,如果超过半数以上的节点连通则发送MsgCheckQuorum确认

                 如果有节点正在转移,则禁止节点转移

3.如果选举计时器没有超时

              如果当前节点不是Leader,则直接返回

              心跳计时器超时,则发送MsgBeat消息,继续维持向其余节点发送心跳消息,并重置心跳计时器

func (r *raft) tickHeartbeat() {
	r.heartbeatElapsed++	//递增心跳计时器
	r.electionElapsed++	//递增选举计时器

	if r.electionElapsed >= r.electionTimeout {  //当选举计时器大于等于选举超时时间
		r.electionElapsed = 0	//重置选举计时器,Leader节点不会主动发起选举
		if r.checkQuorum { //检测当前节点是否与集群中的大多数节点连通
			r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum})
		}
		// If current leader cannot transfer leadership in electionTimeout, it becomes leader again.
		if r.state == StateLeader && r.leadTransferee != None { //禁止节点转移
			r.abortLeaderTransfer()		//清空raft.leadTransferee字段,放弃转移
		}
	}

	if r.state != StateLeader {	//检测当前节点是否是Leader,不是Leader直接返回
		return
	}

	if r.heartbeatElapsed >= r.heartbeatTimeout { //心跳计时器超时,则发生MsgBeat消息,继续维持向其余节点发送心跳
		r.heartbeatElapsed = 0 //重置心跳计时器
		r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
	}
}

Leader发送MsgBeat类型的消息流程

     Leader发送MsgBeat类型的消息主要是通过bcastHeartbeat方法向集群中的其他节点广播发送,如果readOnly中有只读消息,会在消息的上下文中携带readOnly中最后一条消息的消息ID,Follower节点收到后会在上下文中携带该消息ID响应。

//广播发送心跳
func (r *raft) bcastHeartbeat() {
	//获取readOnly最新一次的上下文
	lastCtx := r.readOnly.lastPendingRequestCtx()
	if len(lastCtx) == 0 {
		r.bcastHeartbeatWithCtx(nil)
	} else {
		r.bcastHeartbeatWithCtx([]byte(lastCtx))
	}
}

//遍历向集群中的其他节点发送心跳消息
func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
	r.forEachProgress(func(id uint64, _ *Progress) {
		if id == r.id {   //过滤当前节点
			return
		}
		r.sendHeartbeat(id, ctx)   //向指定的节点发送MsgBeat消息
	})
}

//向指定节点发送消息
func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
	// Attach the commit as min(to.matched, r.committed).
	// When the leader sends out heartbeat message,
	// the receiver(follower) might not be matched with the leader
	// or it might not have all the committed entries.
	// The leader MUST NOT forward the follower's commit to
	// an unmatched index.
	//由于集群中的节点不一定都收到全部已提交的Entry记录,所以心跳消息携带commit提交位置
   消息类型为MsgHeartbeat
	commit := min(r.getProgress(to).Match, r.raftLog.committed)
	m := pb.Message{
		To:      to,
		Type:    pb.MsgHeartbeat,
		Commit:  commit,
		Context: ctx,
	}

	r.send(m)
}

Follower节点接收到MsgHeartbeat消息的处理流程

    

Follower收到心跳计时器会重置选举计时器,并且指定Leader,并且调用handleHeartbeat方法处理消息。

func  stepFollower(r *raft,m pb.Message) error {
	switch m.Type {
//其他类型消息的处理略
......

   	case pb.MsgHeartbeat:  //如果是心跳消息
		r.electionElapsed = 0 //重置选举计时器
		r.lead = m.From	//指定leader
		r.handleHeartbeat(m)	//处理心跳消息
        }
}

处理心跳消息比较简单,节点尝试更新raftLog中已提交的位置,并且发送心跳回复消息MsgHeartbeatResp

 func (r *raft) handleHeartbeat(m pb.Message) {
	r.raftLog.commitTo(m.Commit)  //更新raftLog中已提交的位置
	r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})  //心跳回复
}

MsgHeartbeatResp消息

    1.设置该节点的RecentActive字段为true,表示该节点存活

    2.将Paused设为false,表示可以继续向该Follower节点发送消息

    3.如果消息队列满了,则释放inflights中第一个消息,这样就可以开始后续消息的发送

    4.当Leader节点收到Follower节点的MsgHeartbeat消息之后,会比较对应的Match值与Leader节点的raftLog,从而判断Follower节点是否已拥有了全部的Entry记录

    如果该Follower缺少Entry,则Leader会发送缺少的Entry记录

    5.如果该节点的只读模式不是ReadOnlySafe,则返回

    6.统计目前为止响应上述携带消息ID的MsgHeartbeat消息的节点个数,如果没有超过半数则退出

    7.如果响应心跳的节点数超过半数,则会清空readOnly中指定消息ID及其之前的所有相关记录

        实现如下

                    根据MsgReadIndex消息的From字段,判断该MsgReadIndex消息是否为Follower节点转发到Leader节点的消息

                          如果是客户端直接发送到Leader节点的消息,

                          则将MsgReadIndex消息对应的已提交位置以及其消息ID封装成ReadState实例,添加到raft.readStates中保存。

                          后续会有其他goroutine读取该数组,并对相应的MsgReadIndex消息进行响应

                          如果其他Follower节点转发到Leader节点的MsgReadIndex消息,

                           则Leader会向Follower节点返回相应的MsgReadIndex消息,并由Follower节点响应Client。

func stepLeader(r *raft, m pb.Message) error {
	//其他类型的消息略
  case pb.MsgHeartbeatResp:  //处理心跳回复消息
		pr.RecentActive = true //设置RecentActive字段
		pr.resume()	//设置Paused为false,即可以重新发送消息

		// free one slot for the full inflights window to allow progress.
		if pr.State == ProgressStateReplicate && pr.ins.full() { //队列已满
			pr.ins.freeFirstOne() //释放第一个消息,这样就可以开始后续消息的发送
		}
		//判断Follower是否已拥有全部的Entry记录
		if pr.Match < r.raftLog.lastIndex() {
			r.sendAppend(m.From)	//通过向指定节点发送MsgApp消息完成Entry记录的复制
		}


		if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
			return nil
		}

		ackCount := r.readOnly.recvAck(m)
		if ackCount < r.quorum() {
			return nil
		}

		rss := r.readOnly.advance(m)
		for _, rs := range rss {
			req := rs.req
			if req.From == None || req.From == r.id {
				/*
				根据MsgReadIndex消息的From字段,判断该MsgReadIndex消息是否为Follower节点转发到Leader节点的消息
				如果是客户端直接发送到Leader节点的消息,
				则将MsgReadIndex消息对应的已提交位置以及其消息ID封装成ReadState实例,添加到raft.readStates中保存。
				后续会有其他goroutine读取该数组,并对相应的MsgReadIndex消息进行响应
				*/
				r.readStates = append(r.readStates, ReadState{Index: rs.index, RequestCtx: req.Entries[0].Data})
			} else {
				//如果其他Follower节点转发到Leader节点的MsgReadIndex消息,
				// 则Leader会向Follower节点返回相应的MsgReadIndex消息,并由Follower节点响应。
				r.send(pb.Message{To: req.From, Type: pb.MsgReadIndexResp, Index: rs.index, Entries: req.Entries})
			}
		}
}

  MsgCheckQuorum消息

       MsgCheckQuorum消息和MsgBeat消息的Term字段都为0,因为它们都属于本地消息。当Leader 的心跳计时器超时,并且开启了checkQuorum模式(raft的checkQuorum字段为true)。该Leader节点就会发送MsgCheckQuorum消息检测与集群中其他节点是否保持半数以上的连接,如果没有则变成Follower节点。

      MsgCheckQuorum类型的消息比较简单,这里就不在累述。

func stepLeader(r *raft, m pb.Message) error {
	switch m.Type {
	//其他类型的消息省略
	case pb.MsgCheckQuorum:
		if !r.checkQuorumActive() {  //检测当前节点是否与集群中大多数节点连通,如果不连通则切换成Follower
			r.logger.Warningf("%x stepped down to follower since quorum is not active", r.id)
			r.becomeFollower(r.Term, None)
		}
		return nil
}

   

 

相关文章:

  • 2021-07-23
  • 2021-10-03
  • 2021-11-26
  • 2021-12-26
  • 2020-04-20
  • 2022-01-02
  • 2021-07-14
  • 2022-01-23
猜你喜欢
  • 2021-09-19
  • 2021-08-30
  • 2021-11-26
  • 2022-01-10
  • 2021-04-10
  • 2021-08-23
  • 2021-07-07
相关资源
相似解决方案