JRaft 在设计层面将选举的过程拆分为预选举和正式选举两个过程,之所以这样设计是为了避免无效的选举进程递增 term 值,进而造成浪费,同时也会导致正常运行的 Leader 节点执行角色降级。Raft 算法要求当节点接收到 term 值更大的请求时需要递增本地的 term 值,以此实现集群中 term 值的同步。对于 Leader 节点而言,当收到 term 值更大的请求时,该节点会认为集群中有新的 Leader 节点生成,于是需要执行角色降级。这一机制能够保证在出现网络分区等问题时,在网络恢复时能够促使 term 值较小的 Leader 节点退位为 Follower 节点,从而实现让集群达到一个新的平稳状态。然而,如果集群中某个 Follower 节点因为某些原因未能接收到 Leader 节点的主权宣示指令,就会一直尝试发动新一轮的选举革命,进而递增 term 值,导致 Leader 节点执行角色降级,最终影响整个集群的正常运行。
预选举的引入则能够很好的解决此类问题,当一个 Follower 节点尝试发起一轮新的选举革命时,该节点不会立即递增 term 值,而是尝试将 term 值加 1 去试探性的征集选票,只有当集群中过半数的节点同意投票的前提下才会进入正式投票的环节,这样对于无效选举而言一般只会停留在预选举阶段,不会对集群的正常运行造成影响。
privatevoidpreVote(){ long oldTerm; try { LOG.info("Node {} term {} start preVote.", getNodeId(), this.currTerm); // 当前节点正在安装快照,则放弃预选举 if (this.snapshotExecutor != null && this.snapshotExecutor.isInstallingSnapshot()) { LOG.warn("Node {} term {} doesn't do preVote when installing snapshot as the configuration may be out of date.", getNodeId(), this.currTerm); return; } // 当前节点不是一个有效的节点 if (!this.conf.contains(this.serverId)) { LOG.warn("Node {} can't do preVote as it is not in conf <{}>.", getNodeId(), this.conf); return; } oldTerm = this.currTerm; } finally { this.writeLock.unlock(); }
// 从本地磁盘获取最新的 LogId final LogId lastLogId = this.logManager.getLastLogId(true);
boolean doUnlock = true; this.writeLock.lock(); try { // pre_vote need defense ABA after unlock&writeLock if (oldTerm != this.currTerm) { LOG.warn("Node {} raise term {} when get lastLogId.", getNodeId(), this.currTerm); return; }
上一篇曾介绍过 RocksDBLogStorage 设置了两个 column family,即 conf family 和 data family,其中后者复用了 RocksDB 提供的默认 column family。由上述实现可以看到,JRaft 针对配置类型的 LogEntry 会同时写入这两个 family 中,而其它类型的 LogEntry 仅会写入到 data family 中。
public Message handlePreVoteRequest(final RequestVoteRequest request){ boolean doUnlock = true; this.writeLock.lock(); try { // 当前节点处于非活跃状态,响应错误 if (!this.state.isActive()) { LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm); return RpcFactoryHelper // .responseFactory() // .newResponse(RequestVoteResponse.getDefaultInstance(), RaftError.EINVAL, "Node %s is not in active state, state %s.", getNodeId(), this.state.name()); } // 解析发起投票的节点 ID final PeerId candidateId = new PeerId(); if (!candidateId.parse(request.getServerId())) { // 解析错误,响应错误 LOG.warn("Node {} received PreVoteRequest from {} serverId bad format.", getNodeId(), request.getServerId()); return RpcFactoryHelper // .responseFactory() // .newResponse(RequestVoteResponse.getDefaultInstance(), RaftError.EINVAL, "Parse candidateId failed: %s.", request.getServerId()); } boolean granted = false; // noinspection ConstantConditions do { // 当前节点与对应 leader 节点之间的租约仍然有效,拒绝投票 if (this.leaderId != null && !this.leaderId.isEmpty() && isCurrentLeaderValid()) { LOG.info("Node {} ignore PreVoteRequest from {}, term={}, currTerm={}, because the leader {}'s lease is still valid.", getNodeId(), request.getServerId(), request.getTerm(), this.currTerm, this.leaderId); break; } // 发起投票节点的 term 值小于当前节点,拒绝投票 if (request.getTerm() < this.currTerm) { LOG.info("Node {} ignore PreVoteRequest from {}, term={}, currTerm={}.", getNodeId(), request.getServerId(), request.getTerm(), this.currTerm); // A follower replicator may not be started when this node become leader, so we must check it. // 如果当前节点是 leader 节点,检查与发起投票节点之间的复制关系 checkReplicator(candidateId); break; } elseif (request.getTerm() == this.currTerm + 1) { // A follower replicator may not be started when this node become leader, so we must check it. // check replicator state checkReplicator(candidateId); } doUnlock = false; this.writeLock.unlock();
// 获取本地最新的 LogId final LogId lastLogId = this.logManager.getLastLogId(true);
doUnlock = true; this.writeLock.lock(); // 封装请求中的 logIndex 和 term 值 final LogId requestLastLogId = new LogId(request.getLastLogIndex(), request.getLastLogTerm()); // 如果请求的 term 值更大,或者在 term 值相等的前提下,请求的 logIndex 不小于当前节点的 logIndex 值, // 则投上自己的一票 granted = requestLastLogId.compareTo(lastLogId) >= 0;
LOG.info( "Node {} received PreVoteRequest from {}, term={}, currTerm={}, granted={}, requestLastLogId={}, lastLogId={}.", getNodeId(), request.getServerId(), request.getTerm(), this.currTerm, granted, requestLastLogId, lastLogId); } while (false);
public Message handleRequestVoteRequest(final RequestVoteRequest request){ boolean doUnlock = true; this.writeLock.lock(); try { // 节点处于非活跃状态,响应错误 if (!this.state.isActive()) { LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm); return RpcFactoryHelper // .responseFactory() // .newResponse(RequestVoteResponse.getDefaultInstance(), RaftError.EINVAL, "Node %s is not in active state, state %s.", getNodeId(), this.state.name()); } // 解析发起正式选举的节点 ID final PeerId candidateId = new PeerId(); // 解析失败,响应错误 if (!candidateId.parse(request.getServerId())) { LOG.warn("Node {} received RequestVoteRequest from {} serverId bad format.", getNodeId(), request.getServerId()); return RpcFactoryHelper // .responseFactory() // .newResponse(RequestVoteResponse.getDefaultInstance(), RaftError.EINVAL, "Parse candidateId failed: %s.", request.getServerId()); }
// noinspection ConstantConditions do { // check term if (request.getTerm() >= this.currTerm) { LOG.info("Node {} received RequestVoteRequest from {}, term={}, currTerm={}.", getNodeId(), request.getServerId(), request.getTerm(), this.currTerm); // 候选节点的 term 值大于当前节点,执行 stepdown if (request.getTerm() > this.currTerm) { // increase current term, change state to follower stepDown(request.getTerm(), false, new Status(RaftError.EHIGHERTERMRESPONSE, "Raft node receives higher term RequestVoteRequest.")); } } // 候选节点的 term 值小于当前节点,拒绝投票 else { // ignore older term LOG.info("Node {} ignore RequestVoteRequest from {}, term={}, currTerm={}.", getNodeId(), request.getServerId(), request.getTerm(), this.currTerm); break; } doUnlock = false; this.writeLock.unlock();
// 从本地获取最新的 logIndex 和对应的 term 值 final LogId lastLogId = this.logManager.getLastLogId(true);
doUnlock = true; this.writeLock.lock(); // vote need ABA check after unlock&writeLock if (request.getTerm() != this.currTerm) { LOG.warn("Node {} raise term {} when get lastLogId.", getNodeId(), this.currTerm); break; }
// 如果 logIsOk,则说明候选节点的 term 值大于当前节点,或者 term 相同,但是候选节点的 logIndex 不比当前节点小 finalboolean logIsOk = new LogId(request.getLastLogIndex(), request.getLastLogTerm()) .compareTo(lastLogId) >= 0;
// 如果 logIsOk,且当前节点目前没有投票给其它节点 if (logIsOk && (this.votedId == null || this.votedId.isEmpty())) { stepDown(request.getTerm(), false, new Status(RaftError.EVOTEFORCANDIDATE, "Raft node votes for some candidate, step down to restart election_timer.")); this.votedId = candidateId.copy(); // 更新本地元数据信息 this.metaStorage.setVotedFor(candidateId); } } while (false);
publicvoidhandleRequestVoteResponse(final PeerId peerId, finallong term, final RequestVoteResponse response){ this.writeLock.lock(); try { // 当前节点已经不是 CANDIDATE 角色,可能以及竞选成功,或者被打回 FOLLOWER 角色了,忽略响应 if (this.state != State.STATE_CANDIDATE) { LOG.warn("Node {} received invalid RequestVoteResponse from {}, state not in STATE_CANDIDATE but {}.", getNodeId(), peerId, this.state); return; }
// 期间 term 值已经发生变化,忽略响应 if (term != this.currTerm) { LOG.warn("Node {} received stale RequestVoteResponse from {}, term={}, currTerm={}.", getNodeId(), peerId, term, this.currTerm); return; }
// 目标节点的 term 值比当前节点大,需要执行 stepdown if (response.getTerm() > this.currTerm) { LOG.warn("Node {} received invalid RequestVoteResponse from {}, term={}, expect={}.", getNodeId(), peerId, response.getTerm(), this.currTerm); stepDown(response.getTerm(), false, new Status(RaftError.EHIGHERTERMRESPONSE, "Raft node receives higher term request_vote_response.")); return; } // check granted quorum? if (response.getGranted()) { this.voteCtx.grant(peerId); // 如果票数过半,则竞选成功 if (this.voteCtx.isGranted()) { becomeLeader(); } } } finally { this.writeLock.unlock(); } }