readOnly模式

       在看MsgReadIndex类型的消息之前需要先对readOnly的模式有所了解,raft结构体中的readOnly作用是批量处理只读请求,只读请求有两种模式,分别是ReadOnlySafe和ReadOnlyLeaseBased,ReadOnlySafe是ETCD作者推荐的模式,因为这种模式不受节点之间时钟差异和网络分区的影响,我们主要看一下ReadOnlySafe使用的实现方式。

readOnly的模式的ReadOnlySafe模式会用到readOnly结构体,主要用于批量处理只读请求,readOnly的实现如下:

type readIndexStatus struct {
	req   pb.Message   //记录了对应的MsgReadIndex请求
	index uint64		//该MsgReadIndex请求到达时,对应的已提交位置
	acks  map[uint64]struct{} //记录了该MsgReadIndex相关的MsgHeartbeatResp响应的信息
}

type readOnly struct {
	option           ReadOnlyOption  //当前只读请求的处理模式,ReadOnlySafe ReadOnlyOpt 和	ReadOnlyLeaseBased两种模式
	/*
	在etcd服务端收到MsgReadIndex消息时,会为其创建一个唯一的消息ID,并作为MsgReadIndex消息的第一条Entry记录。
	在pendingReadIndex维护了消息ID与对应请求readIndexStatus实例的映射
	*/
	pendingReadIndex map[string]*readIndexStatus
	readIndexQueue   []string   //记录了MsgReadIndex请求对应的消息ID,这样可以保证MsgReadIndex的顺序
}

//初始化readOnly
func newReadOnly(option ReadOnlyOption) *readOnly {
	return &readOnly{
		option:           option,
		pendingReadIndex: make(map[string]*readIndexStatus),
	}
}

// addRequest adds a read only reuqest into readonly struct.
// `index` is the commit index of the raft state machine when it received
// the read only request.
// `m` is the original read only request message from the local or remote node.
//将已提交的位置(raftLog.committed)以及MsgReadIndex消息的相关信息存到readOnly中
/*
1.获取消息ID,在ReadIndex消息的第一个记录中记录了消息ID
2.判断该消息是否已经记录在pendingReadIndex中,如果已存在则直接返回
3.如果不存在,则维护到pendingReadIndex中,index是当前Leader已提交的位置,m是请求的消息
4.并将消息ID追加到readIndexQueue队列中
*/
func (ro *readOnly) addRequest(index uint64, m pb.Message) {
	ctx := string(m.Entries[0].Data)	//在ReadIndex消息的第一个记录中,记录了消息ID
	if _, ok := ro.pendingReadIndex[ctx]; ok {	//如果存在,则不再记录该MsgReadIndex请求
		return
	}
	ro.pendingReadIndex[ctx] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]struct{})} //创建MsgReadIndex对应的readIndexStatus实例,并记录到pendingReadIndex
	ro.readIndexQueue = append(ro.readIndexQueue, ctx)  //记录消息ID
}

// recvAck notifies the readonly struct that the raft state machine received
// an acknowledgment of the heartbeat that attached with the read only request
// context.
/*
recvAck通知readonly结构,即raft状态机接受了对只读请求上下文附加的心跳的确认。
1.消息的Context即消息ID,根据消息id获取对应的readIndexStatus
2.如果获取不到则返回0
3.记录了该Follower节点返回的MsgHeartbeatResp响应的信息
4.返回Follower响应的数量
*/
func (ro *readOnly) recvAck(m pb.Message) int {
	rs, ok := ro.pendingReadIndex[string(m.Context)]
	if !ok {
		return 0
	}

	rs.acks[m.From] = struct{}{}
	// add one to include an ack from local node
	return len(rs.acks) + 1
}

// advance advances the read only request queue kept by the readonly struct.
// It dequeues the requests until it finds the read only request that has
// the same context as the given `m`.
//清空readOnly中指定消息ID及之前的所有记录
/*
1.遍历readIndexQueue队列,如果能找到该消息的Context,则返回该消息及之前的所有记录rss,
	并删除readIndexQueue队列和pendingReadIndex中对应的记录
2.如果没有Context对应的消息ID,则返回nil
*/
func (ro *readOnly) advance(m pb.Message) []*readIndexStatus {
	var (
		i     int
		found bool
	)

	ctx := string(m.Context)
	rss := []*readIndexStatus{}

	for _, okctx := range ro.readIndexQueue {
		i++
		rs, ok := ro.pendingReadIndex[okctx]
		if !ok {
			panic("cannot find corresponding read state from pending map")
		}
		rss = append(rss, rs)
		if okctx == ctx {
			found = true
			break
		}
	}

	if found {
		ro.readIndexQueue = ro.readIndexQueue[i:]
		for _, rs := range rss {
			delete(ro.pendingReadIndex, string(rs.req.Entries[0].Data))
		}
		return rss
	}

	return nil
}

// lastPendingRequestCtx returns the context of the last pending read only
// request in readonly struct.
//返回记录中最后一个消息ID
func (ro *readOnly) lastPendingRequestCtx() string {
	if len(ro.readIndexQueue) == 0 {
		return ""
	}
	return ro.readIndexQueue[len(ro.readIndexQueue)-1]
}

MsgReadIndex消息

 etcd中raft协议的消息(五)—— 客户端只读相关的消息(MsgReadIndex和MsgReadIndexResp消息)

 

 客户端发往集群的只读请求使用MsgReadIndex消息表示,通过Node接口的ReadIndex方法请求到集群中

node节点实现了ReadIndex方法

func (n *node) ReadIndex(ctx context.Context, rctx []byte) error {
	return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}})
}

如果客户端发到的节点是Follower状态,则该Follower会向消息转发到当前集群的Leader

func stepFollower(r *raft, m pb.Message) error {
case pb.MsgReadIndex:
   if r.lead == None {
      r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term)
      return nil
   }
   m.To = r.lead
   r.send(m)
}

所以真正处理MsgReadIndex消息的是当前集群中的Leader

处理流程如下:

ETCD服务是集群模式

            1.日志中已提交的位置所在的任期号不等于当前任期号则返回nil

            2.如果当前节点处理readOnly的模式是ReadOnlySafe(ReadOnlySafe是ETCD作者推荐的模式,因为这种模式不受节点之间时钟差异和网络分区的影响)

                   (1)则将此次请求消息写入到readOnly中

                   (2)向集群中的其他节点发送心跳,心跳消息的上下文中携带了该消息ID

            3.如果当前节点处理readOnly的模式是ReadOnlyLeaseBased

                       (1)获取当前节点已提交的位置ri

                      (2)如果客户端直接请求到该Leader节点,则直接将只读消息追加到readStates队列中,等待其他goroutine处理

                      (3)如果该消息是其他Follower转发回来的消息,则向该Follower节点发送MsgReadIndexResp类型的消息

 

ETCD服务是单节点

                      单节点的情况直接将客户端请求的消息追加到readStates队列中,等待其他goroutine处理

func stepLeader(r *raft, m pb.Message) error {

   switch m.Type {
		//其他类型的消息处理省略
		case pb.MsgReadIndex:
		if r.quorum() > 1 { //集群场景
			if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term {
				// Reject read only request when this leader has not committed any log entry at its term.
				return nil
			}

			// thinking: use an interally defined context instead of the user given context.
			// We can express this in terms of the term and index instead of a user-supplied value.
			// This would allow multiple reads to piggyback on the same message.
			/*
			Leader节点检测自身在当前任期中是否已提交Entry记录,如果没有,则无法进行读取操作
			*/
			switch r.readOnly.option {
			case ReadOnlySafe:
				//记录当前节点的raftLog.committed字段值,即已提交位置
				r.readOnly.addRequest(r.raftLog.committed, m)
				r.bcastHeartbeatWithCtx(m.Entries[0].Data)  //发送心跳
			case ReadOnlyLeaseBased:
				ri := r.raftLog.committed
				if m.From == None || m.From == r.id { // from local member
					r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
				} else {
					r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries})
				}
			}
		} else {//单节点情况
			r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
		}

		return nil
	}

MsgReadIndexResp消息

        从上面的流程中我们看到,当客户端将只读请求发送到Follower节点,Follower节点会将只读请求发送到Leader节点,Leader节点进行一些列处理后会向该节点返回MsgReadIndexResp消息。下面我们主要看一下Follower接收到MsgReadIndexResp消息后的处理流程。

   Follower接收到MsgReadIndexResp消息会将该消息追加到该节点的readStates队列中

    

func stepFollower(r *raft, m pb.Message) error {
	switch m.Type {
			case pb.MsgReadIndexResp:
		if len(m.Entries) != 1 {
			r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries))
			return nil
		}
		r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})
	}

从以上可以看到客户端的只读请求最终会写入到客户端请求节点的readStates队列中,等待其他goroutine来处理,以上也是MsgReadIndex和MsgReadIndexResp类型消息的处理流程。

相关文章: