Go实现Raft第三篇:命令和日志复制

女主宣言

今天小编为大家分享一篇关于Golang实现Raft的文章,本篇文章为系列中的第三篇,对Raft中的命令和日志复制进行介绍并使用go进行实现。希望能对大家有所帮助。

PS:丰富的一线技术、多元化的表现形式,尽在“ 3 60云计算 ”,点关注哦!

文章为Raft系列文章中的第三篇,命令和日志复制。 该篇中 我们将增强Raft的相关功能实现,实际的处理客户端提交的命令并在Raft集群中进行复制。

1

客户端交互

在第一篇文章中我们简要讨论了客户端交互,如果不清晰建议可以再回顾一下。 这里我们先不关注客户端如何找到领导者, 将重点讨论当找到一个领导者时会发生什么。

  1. 首先,客户端将命令提交给领导者。在Raft集群中,命令通常只提交给单个节点。

  2. 领导者将命令复制到其跟随者。

  3. 最后,如果大多数集群节点都承认在其日志中有该命令,该命令将被提交,并向所有客户端通知新的提交。

注意提交和提交命令之间的不对称性 - 在检查我们即将讨论的实现决策时,这一点很重要。命令被提交到单个Raft节点,但是多个节点(特别是所有已连接/活动的节点啊)会在一段时间后将其提交并通知其客户端。

回顾此图:

状态机代表使用Raft进行复制的任意服务。

然后我们在Raft ConsensusModule模块的上下文中讨论客户端,我们通常指的是此服务,因为这是将提交报告到的地方。换句话说,从Consensus模块到服务状态机的黑色箭头就是该通知。

2

实现:提交管道

在我们的实现中,当一个 ConsensusModule 被创建时,它接受一个提交管道 - 一个用来向调用者发送提交命令的通道:commitChan chan<-CommitEntry。 定义如下:

// CommitEntry is the data reported by Raft to the commit channel. Each commit
// entry notifies the client that consensus was reached on a command and it can
// be applied to the client's state machine.
type CommitEntry struct {
  // Command is the client command being committed.
  Command interface{}

  // Index is the log index at which the client command is committed.
  Index int

  // Term is the Raft term at which the client command is committed.
  Term int
}

使用通道是一种设计选择,但不是唯一方式。也可以改用回调。创建ConsensusModule时,调用者将注册一个回调函数,只要有要提交的命令,就会调用该回调函数。

在实现通道上发送条目的功能之前。我们需要先讨论Raft服务器如何复制命令并确定命令是否已提交。

3

Raft日志

文章中多次提到Raft日志,但还没有详细介绍。 日志只是应该应用于状态机的线性命令序列; 如果有需要,日志应该足以从某个开始状态“重放”状态机。 在正常运行期间,所有Raft节点的日志都是相同的; 当领导者收到新命令时,将其存放在自己的日志中,然后复制到跟随者。 跟随者将命令放在日志中,并确认给领导者,领导者将保留已安全复制到群集中大多数服务器的最新日志索引的计数。

每个框都是一个日志条目;框顶部的数字是将其添加到日志中的任期。底部是此日志包含的键值命令。每个日志条目都有一个线性索引。框的颜色是任期的另一种表示形式。

如果将此日志应用于空键值存储,则最终结果将具有值x = 4,y = 7。

在我们的实现中,日志条目由以下形式表示:

type LogEntry struct {
  Command interface{}
  Term    int
}

每个ConsensusModule的日志都只是log []LogEntry。用户端通常不在乎任期。任期对Raft的正确性至关重要,在阅读代码时务必牢记。

4

提交新的命令

新的Submit方法,使客户端可以提交新命令:

func (cm *ConsensusModule) Submit(command interface{}) bool {
  cm.mu.Lock()
  defer cm.mu.Unlock()

  cm.dlog("Submit received by %v: %v", cm.state, command)
  if cm.state == Leader {
    cm.log = append(cm.log, LogEntry{Command: command, Term: cm.currentTerm})
    cm.dlog("... log=%v", cm.log)
    return true
  }
  return false
}

很简单,如果此CM是领导者,则将新命令附加到日志中并返回true。否则,将被忽略并返回false。

问:“提交”返回的真实值是否表明客户端已向领导者提交了命令?

答:在极少数情况下,领导者可能会与其他Raft服务器分开,而后者在一段时间后会继续选举新的领导者。但是,客户可能仍在与旧的领导者通信。客户端应等待一段合理的时间,以使其提交的命令出现在提交通道上;如果不是,则表示它联系了错误的领导者,应与其他领导者重试。

5

复制日志条目

我们看到,提交给领导者的新命令被添加到日志的末尾。这个新命令如何到达跟随者? 领导者遵循的步骤在Raft论文中进行了精确描述。 我们在 leaderSendHeartbeats 中完成实现。

func (cm *ConsensusModule) leaderSendHeartbeats() {
  cm.mu.Lock()
  savedCurrentTerm := cm.currentTerm
  cm.mu.Unlock()

  for _, peerId := range cm.peerIds {
    go func(peerId int) {
      cm.mu.Lock()
      ni := cm.nextIndex[peerId]
      prevLogIndex := ni - 1
      prevLogTerm := -1
      if prevLogIndex >= 0 {
        prevLogTerm = cm.log[prevLogIndex].Term
      }
      entries := cm.log[ni:]

      args := AppendEntriesArgs{
        Term:         savedCurrentTerm,
        LeaderId:     cm.id,
        PrevLogIndex: prevLogIndex,
        PrevLogTerm:  prevLogTerm,
        Entries:      entries,
        LeaderCommit: cm.commitIndex,
      }
      cm.mu.Unlock()
      cm.dlog("sending AppendEntries to %v: ni=%d, args=%+v", peerId, ni, args)
      var reply AppendEntriesReply
      if err := cm.server.Call(peerId, "ConsensusModule.AppendEntries", args, &reply); err == nil {
        cm.mu.Lock()
        defer cm.mu.Unlock()
        if reply.Term > savedCurrentTerm {
          cm.dlog("term out of date in heartbeat reply")
          cm.becomeFollower(reply.Term)
          return
        }

        if cm.state == Leader && savedCurrentTerm == reply.Term {
          if reply.Success {
            cm.nextIndex[peerId] = ni + len(entries)
            cm.matchIndex[peerId] = cm.nextIndex[peerId] - 1
            cm.dlog("AppendEntries reply from %d success: nextIndex := %v, matchIndex := %v", peerId, cm.nextIndex, cm.matchIndex)

            savedCommitIndex := cm.commitIndex
            for i := cm.commitIndex + 1; i < len(cm.log); i++ {
              if cm.log[i].Term == cm.currentTerm {
                matchCount := 1
                for _, peerId := range cm.peerIds {
                  if cm.matchIndex[peerId] >= i {
                    matchCount++
                  }
                }
                if matchCount*2 > len(cm.peerIds)+1 {
                  cm.commitIndex = i
                }
              }
            }
            if cm.commitIndex != savedCommitIndex {
              cm.dlog("leader sets commitIndex := %d", cm.commitIndex)
              cm.newCommitReadyChan <- struct{}{}
            }
          } else {
            cm.nextIndex[peerId] = ni - 1
            cm.dlog("AppendEntries reply from %d !success: nextIndex := %d", peerId, ni-1)
          }
        }
      }
    }(peerId)
  }
}

这比我们在上一部分中所做的要复杂得多,但实际上它仅遵循本文的图2。关于此代码的一些注意事项:

  • 现在已完全填充了AE RPC的字段:有关其含义,请参见本文中的图2。

  • AE响应有一个 success 字段,该字段告诉领导者跟随者是否看到prevLogIndex 和 prevLogTerm 匹配。领导者基于此字段更新此跟随者的nextIndex。

  • commitIndex 根据复制特定日志索引的关注者的数量进行更新。如果索引被多数复制,则 commitIndex 前进到该索引。

与我们之前讨论的用户端交互有关,这部分代码特别重要:

if cm.commitIndex != savedCommitIndex {
  cm.dlog("leader sets commitIndex := %d", cm.commitIndex)
  cm.newCommitReadyChan <- struct{}{}
}

newCommitReadyChan 是CM内部使用的通道,用于指示已准备好将新条目通过提交通道发送到客户端。它由在CM启动时在goroutine中运行的以下方法起作用:

func (cm *ConsensusModule) commitChanSender() {
  for range cm.newCommitReadyChan {
    // Find which entries we have to apply.
    cm.mu.Lock()
    savedTerm := cm.currentTerm
    savedLastApplied := cm.lastApplied
    var entries []LogEntry
    if cm.commitIndex > cm.lastApplied {
      entries = cm.log[cm.lastApplied+1 : cm.commitIndex+1]
      cm.lastApplied = cm.commitIndex
    }
    cm.mu.Unlock()
    cm.dlog("commitChanSender entries=%v, savedLastApplied=%d", entries, savedLastApplied)

    for i, entry := range entries {
      cm.commitChan <- CommitEntry{
        Command: entry.Command,
        Index:   savedLastApplied + i + 1,
        Term:    savedTerm,
      }
    }
  }
  cm.dlog("commitChanSender done")
}

此方法更新 lastApplied 状态变量以确定哪些条目已经发送到客户端,并且仅发送新条目。

6

更新跟随者的日志

我们已经看到了领导者如何处理新的日志条目。 现在介绍跟随者的代码实现。 特别是 AppendEntries RPC。

func (cm *ConsensusModule) AppendEntries(args AppendEntriesArgs, reply *AppendEntriesReply) error {
  cm.mu.Lock()
  defer cm.mu.Unlock()
  if cm.state == Dead {
    return nil
  }
  cm.dlog("AppendEntries: %+v", args)

  if args.Term > cm.currentTerm {
    cm.dlog("... term out of date in AppendEntries")
    cm.becomeFollower(args.Term)
  }

  reply.Success = false
  if args.Term == cm.currentTerm {
    if cm.state != Follower {
      cm.becomeFollower(args.Term)
    }
    cm.electionResetEvent = time.Now()

    // Does our log contain an entry at PrevLogIndex whose term matches
    // PrevLogTerm? Note that in the extreme case of PrevLogIndex=-1 this is
    // vacuously true.
    if args.PrevLogIndex == -1 ||
      (args.PrevLogIndex < len(cm.log) && args.PrevLogTerm == cm.log[args.PrevLogIndex].Term) {
      reply.Success = true

      // Find an insertion point - where there's a term mismatch between
      // the existing log starting at PrevLogIndex+1 and the new entries sent
      // in the RPC.
      logInsertIndex := args.PrevLogIndex + 1
      newEntriesIndex := 0

      for {
        if logInsertIndex >= len(cm.log) || newEntriesIndex >= len(args.Entries) {
          break
        }
        if cm.log[logInsertIndex].Term != args.Entries[newEntriesIndex].Term {
          break
        }
        logInsertIndex++
        newEntriesIndex++
      }
      // At the end of this loop:
      // - logInsertIndex points at the end of the log, or an index where the
      //   term mismatches with an entry from the leader
      // - newEntriesIndex points at the end of Entries, or an index where the
      //   term mismatches with the corresponding log entry
      if newEntriesIndex < len(args.Entries) {
        cm.dlog("... inserting entries %v from index %d", args.Entries[newEntriesIndex:], logInsertIndex)
        cm.log = append(cm.log[:logInsertIndex], args.Entries[newEntriesIndex:]...)
        cm.dlog("... log is now: %v", cm.log)
      }

      // Set commit index.
      if args.LeaderCommit > cm.commitIndex {
        cm.commitIndex = intMin(args.LeaderCommit, len(cm.log)-1)
        cm.dlog("... setting commitIndex=%d", cm.commitIndex)
        cm.newCommitReadyChan <- struct{}{}
      }
    }
  }

  reply.Term = cm.currentTerm
  cm.dlog("AppendEntries reply: %+v", *reply)
  return nil
}

当注意到领导者的 LeaderCommit 大于其自己的 cm.commitIndex 时,跟随者知道领导者考虑提交额外的条目时,将在 ch.newCommitReadyChan 上发送。

当领导者使用AE发送新的日志条目时,将发生以下情况:

  • 跟随者将新条目追加到其日志中,并向领导者回复success = true。

  • 结果,领导者为此跟随者更新其matchIndex。当足够的跟随者的下一个索引具有matchIndex时,领导者将更新commitIndex并将其发送给下一个AE中的所有跟随者(在leaderCommit字段中)。

  • 当跟随者收到新的LeaderCommit消息时,已经知道提交了新的日志条目,并且可以通过提交通道将其发送给其用户端。

7

选举安全

目前为止,我们已经研究了添加的新代码以支持日志复制。 但是,日志也会影响Raft的选举。 Raft使用选举程序来防止候选人赢得选举,除非其日志至少与集群中大多数节点的日志一样。

因此,RV包含lastLogIndex和lastLogTerm字段。当候选人发出RV时,将使用有关其最后一个日志条目的信息填充这些RV。跟随者将这些字段与自己的字段进行比较,并确定候选人是否是最新的才可以被选举。

func (cm *ConsensusModule) startElection() {
  cm.state = Candidate
  cm.currentTerm += 1
  savedCurrentTerm := cm.currentTerm
  cm.electionResetEvent = time.Now()
  cm.votedFor = cm.id
  cm.dlog("becomes Candidate (currentTerm=%d); log=%v", savedCurrentTerm, cm.log)

  var votesReceived int32 = 1

  // Send RequestVote RPCs to all other servers concurrently.
  for _, peerId := range cm.peerIds {
    go func(peerId int) {
      cm.mu.Lock()
      savedLastLogIndex, savedLastLogTerm := cm.lastLogIndexAndTerm()
      cm.mu.Unlock()

      args := RequestVoteArgs{
        Term:         savedCurrentTerm,
        CandidateId:  cm.id,
        LastLogIndex: savedLastLogIndex,
        LastLogTerm:  savedLastLogTerm,
      }

      cm.dlog("sending RequestVote to %d: %+v", peerId, args)
      var reply RequestVoteReply
      if err := cm.server.Call(peerId, "ConsensusModule.RequestVote", args, &reply); err == nil {
        cm.mu.Lock()
        defer cm.mu.Unlock()
        cm.dlog("received RequestVoteReply %+v", reply)

        if cm.state != Candidate {
          cm.dlog("while waiting for reply, state = %v", cm.state)
          return
        }

        if reply.Term > savedCurrentTerm {
          cm.dlog("term out of date in RequestVoteReply")
          cm.becomeFollower(reply.Term)
          return
        } else if reply.Term == savedCurrentTerm {
          if reply.VoteGranted {
            votes := int(atomic.AddInt32(&votesReceived, 1))
            if votes*2 > len(cm.peerIds)+1 {
              // Won the election!
              cm.dlog("wins election with %d votes", votes)
              cm.startLeader()
              return
            }
          }
        }
      }
    }(peerId)
  }

  // Run another election timer, in case this election is not successful.
  go cm.runElectionTimer()
}

lastLogIndexAndTerm是一个新的帮助器方法:

// lastLogIndexAndTerm returns the last log index and the last log entry's term
// (or -1 if there's no log) for this server.
// Expects cm.mu to be locked.
func (cm *ConsensusModule) lastLogIndexAndTerm() (int, int) {
  if len(cm.log) > 0 {
    lastIndex := len(cm.log) - 1
    return lastIndex, cm.log[lastIndex].Term
  } else {
    return -1, -1
  }
}

我们的实现是基于0的索引,而不是基于1的Raft索引。因此-1经常作为一个标记值。

这是一个更新的RV处理程序,实现选举安全检查:

func (cm *ConsensusModule) RequestVote(args RequestVoteArgs, reply *RequestVoteReply) error {
  cm.mu.Lock()
  defer cm.mu.Unlock()
  if cm.state == Dead {
    return nil
  }
  lastLogIndex, lastLogTerm := cm.lastLogIndexAndTerm()
  cm.dlog("RequestVote: %+v [currentTerm=%d, votedFor=%d, log index/term=(%d, %d)]", args, cm.currentTerm, cm.votedFor, lastLogIndex, lastLogTerm)

  if args.Term > cm.currentTerm {
    cm.dlog("... term out of date in RequestVote")
    cm.becomeFollower(args.Term)
  }

  if cm.currentTerm == args.Term &&
    (cm.votedFor == -1 || cm.votedFor == args.CandidateId) &&
    (args.LastLogTerm > lastLogTerm ||
      (args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastLogIndex)) {
    reply.VoteGranted = true
    cm.votedFor = args.CandidateId
    cm.electionResetEvent = time.Now()
  } else {
    reply.VoteGranted = false
  }
  reply.Term = cm.currentTerm
  cm.dlog("... RequestVote reply: %+v", reply)
  return nil
}

8

下一步

目前的Raft实现中, 有一个问题是没有进行持久 化操作 如果服务器故障重启,将会造成 信息丢失。 为此,我们将在下一部分增加持久化操作,以及对本篇中 部分 功能进行 优化。 敬请关注!

Raft参考:https://raft.github.io/raft.pdf

代码参考:https://github.com/eliben/raft

360云计算

由360云平台团队打造的技术分享公众号,内容涉及 数据库、大数据、微服务、容器、AIOps、IoT 等众多技术领域,通过夯实的技术积累和丰富的一线实战经验,为你带来最有料的技术分享

我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章