readOnly模式
在看MsgReadIndex类型的消息之前需要先对readOnly的模式有所了解,raft结构体中的readOnly作用是批量处理只读请求,只读请求有两种模式,分别是ReadOnlySafe和ReadOnlyLeaseBased,ReadOnlySafe是ETCD作者推荐的模式,因为这种模式不受节点之间时钟差异和网络分区的影响,我们主要看一下ReadOnlySafe使用的实现方式。
readOnly的模式的ReadOnlySafe模式会用到readOnly结构体,主要用于批量处理只读请求,readOnly的实现如下:
type readIndexStatus struct {
req pb.Message //记录了对应的MsgReadIndex请求
index uint64 //该MsgReadIndex请求到达时,对应的已提交位置
acks map[uint64]struct{} //记录了该MsgReadIndex相关的MsgHeartbeatResp响应的信息
}
type readOnly struct {
option ReadOnlyOption //当前只读请求的处理模式,ReadOnlySafe ReadOnlyOpt 和 ReadOnlyLeaseBased两种模式
/*
在etcd服务端收到MsgReadIndex消息时,会为其创建一个唯一的消息ID,并作为MsgReadIndex消息的第一条Entry记录。
在pendingReadIndex维护了消息ID与对应请求readIndexStatus实例的映射
*/
pendingReadIndex map[string]*readIndexStatus
readIndexQueue []string //记录了MsgReadIndex请求对应的消息ID,这样可以保证MsgReadIndex的顺序
}
//初始化readOnly
func newReadOnly(option ReadOnlyOption) *readOnly {
return &readOnly{
option: option,
pendingReadIndex: make(map[string]*readIndexStatus),
}
}
// addRequest adds a read only reuqest into readonly struct.
// `index` is the commit index of the raft state machine when it received
// the read only request.
// `m` is the original read only request message from the local or remote node.
//将已提交的位置(raftLog.committed)以及MsgReadIndex消息的相关信息存到readOnly中
/*
1.获取消息ID,在ReadIndex消息的第一个记录中记录了消息ID
2.判断该消息是否已经记录在pendingReadIndex中,如果已存在则直接返回
3.如果不存在,则维护到pendingReadIndex中,index是当前Leader已提交的位置,m是请求的消息
4.并将消息ID追加到readIndexQueue队列中
*/
func (ro *readOnly) addRequest(index uint64, m pb.Message) {
ctx := string(m.Entries[0].Data) //在ReadIndex消息的第一个记录中,记录了消息ID
if _, ok := ro.pendingReadIndex[ctx]; ok { //如果存在,则不再记录该MsgReadIndex请求
return
}
ro.pendingReadIndex[ctx] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]struct{})} //创建MsgReadIndex对应的readIndexStatus实例,并记录到pendingReadIndex
ro.readIndexQueue = append(ro.readIndexQueue, ctx) //记录消息ID
}
// recvAck notifies the readonly struct that the raft state machine received
// an acknowledgment of the heartbeat that attached with the read only request
// context.
/*
recvAck通知readonly结构,即raft状态机接受了对只读请求上下文附加的心跳的确认。
1.消息的Context即消息ID,根据消息id获取对应的readIndexStatus
2.如果获取不到则返回0
3.记录了该Follower节点返回的MsgHeartbeatResp响应的信息
4.返回Follower响应的数量
*/
func (ro *readOnly) recvAck(m pb.Message) int {
rs, ok := ro.pendingReadIndex[string(m.Context)]
if !ok {
return 0
}
rs.acks[m.From] = struct{}{}
// add one to include an ack from local node
return len(rs.acks) + 1
}
// advance advances the read only request queue kept by the readonly struct.
// It dequeues the requests until it finds the read only request that has
// the same context as the given `m`.
//清空readOnly中指定消息ID及之前的所有记录
/*
1.遍历readIndexQueue队列,如果能找到该消息的Context,则返回该消息及之前的所有记录rss,
并删除readIndexQueue队列和pendingReadIndex中对应的记录
2.如果没有Context对应的消息ID,则返回nil
*/
func (ro *readOnly) advance(m pb.Message) []*readIndexStatus {
var (
i int
found bool
)
ctx := string(m.Context)
rss := []*readIndexStatus{}
for _, okctx := range ro.readIndexQueue {
i++
rs, ok := ro.pendingReadIndex[okctx]
if !ok {
panic("cannot find corresponding read state from pending map")
}
rss = append(rss, rs)
if okctx == ctx {
found = true
break
}
}
if found {
ro.readIndexQueue = ro.readIndexQueue[i:]
for _, rs := range rss {
delete(ro.pendingReadIndex, string(rs.req.Entries[0].Data))
}
return rss
}
return nil
}
// lastPendingRequestCtx returns the context of the last pending read only
// request in readonly struct.
//返回记录中最后一个消息ID
func (ro *readOnly) lastPendingRequestCtx() string {
if len(ro.readIndexQueue) == 0 {
return ""
}
return ro.readIndexQueue[len(ro.readIndexQueue)-1]
}
MsgReadIndex消息
客户端发往集群的只读请求使用MsgReadIndex消息表示,通过Node接口的ReadIndex方法请求到集群中
node节点实现了ReadIndex方法
func (n *node) ReadIndex(ctx context.Context, rctx []byte) error {
return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}})
}
如果客户端发到的节点是Follower状态,则该Follower会向消息转发到当前集群的Leader
func stepFollower(r *raft, m pb.Message) error {
case pb.MsgReadIndex:
if r.lead == None {
r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term)
return nil
}
m.To = r.lead
r.send(m)
}
所以真正处理MsgReadIndex消息的是当前集群中的Leader
处理流程如下:
ETCD服务是集群模式
1.日志中已提交的位置所在的任期号不等于当前任期号则返回nil
2.如果当前节点处理readOnly的模式是ReadOnlySafe(ReadOnlySafe是ETCD作者推荐的模式,因为这种模式不受节点之间时钟差异和网络分区的影响)
(1)则将此次请求消息写入到readOnly中
(2)向集群中的其他节点发送心跳,心跳消息的上下文中携带了该消息ID
3.如果当前节点处理readOnly的模式是ReadOnlyLeaseBased
(1)获取当前节点已提交的位置ri
(2)如果客户端直接请求到该Leader节点,则直接将只读消息追加到readStates队列中,等待其他goroutine处理
(3)如果该消息是其他Follower转发回来的消息,则向该Follower节点发送MsgReadIndexResp类型的消息
ETCD服务是单节点
单节点的情况直接将客户端请求的消息追加到readStates队列中,等待其他goroutine处理
func stepLeader(r *raft, m pb.Message) error {
switch m.Type {
//其他类型的消息处理省略
case pb.MsgReadIndex:
if r.quorum() > 1 { //集群场景
if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term {
// Reject read only request when this leader has not committed any log entry at its term.
return nil
}
// thinking: use an interally defined context instead of the user given context.
// We can express this in terms of the term and index instead of a user-supplied value.
// This would allow multiple reads to piggyback on the same message.
/*
Leader节点检测自身在当前任期中是否已提交Entry记录,如果没有,则无法进行读取操作
*/
switch r.readOnly.option {
case ReadOnlySafe:
//记录当前节点的raftLog.committed字段值,即已提交位置
r.readOnly.addRequest(r.raftLog.committed, m)
r.bcastHeartbeatWithCtx(m.Entries[0].Data) //发送心跳
case ReadOnlyLeaseBased:
ri := r.raftLog.committed
if m.From == None || m.From == r.id { // from local member
r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
} else {
r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries})
}
}
} else {//单节点情况
r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
}
return nil
}
MsgReadIndexResp消息
从上面的流程中我们看到,当客户端将只读请求发送到Follower节点,Follower节点会将只读请求发送到Leader节点,Leader节点进行一些列处理后会向该节点返回MsgReadIndexResp消息。下面我们主要看一下Follower接收到MsgReadIndexResp消息后的处理流程。
Follower接收到MsgReadIndexResp消息会将该消息追加到该节点的readStates队列中
func stepFollower(r *raft, m pb.Message) error {
switch m.Type {
case pb.MsgReadIndexResp:
if len(m.Entries) != 1 {
r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries))
return nil
}
r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})
}
从以上可以看到客户端的只读请求最终会写入到客户端请求节点的readStates队列中,等待其他goroutine来处理,以上也是MsgReadIndex和MsgReadIndexResp类型消息的处理流程。