privatebooleancheckAndResolveConflict(final List<LogEntry> entries, final StableClosure done){ final LogEntry firstLogEntry = ArrayDeque.peekFirst(entries); // Leader 节点,基于 lastLogIndex 设置 logIndex 值 if (firstLogEntry.getId().getIndex() == 0) { // Node is currently the leader and |entries| are from the user who // don't know the correct indexes the logs should assign to. // So we have to assign indexes to the appending entries for (int i = 0; i < entries.size(); i++) { entries.get(i).getId().setIndex(++this.lastLogIndex); } returntrue; } // Follower 节点 else { // Node is currently a follower and |entries| are from the leader. // We should check and resolve the conflicts between the local logs and |entries| if (firstLogEntry.getId().getIndex() > this.lastLogIndex + 1) { // 待写入的日志与本地已有的日志之间存在断层 Utils.runClosureInThread(done, new Status(RaftError.EINVAL, "There's gap between first_index=%d and last_log_index=%d", firstLogEntry.getId().getIndex(), this.lastLogIndex)); returnfalse; }
// 待写入的所有日志的 logIndex 都小于已经应用的日志的最大 logIndex,直接返回 finallong appliedIndex = this.appliedId.getIndex(); final LogEntry lastLogEntry = ArrayDeque.peekLast(entries); if (lastLogEntry.getId().getIndex() <= appliedIndex) { LOG.warn( "Received entries of which the lastLog={} is not greater than appliedIndex={}, return immediately with nothing changed.", lastLogEntry.getId().getIndex(), appliedIndex); // Replicate old logs before appliedIndex should be considered successfully, response OK. Utils.runClosureInThread(done); returnfalse; }
// 待追加的日志与本地已有的日志之前正好衔接上,直接更新 lastLogIndex if (firstLogEntry.getId().getIndex() == this.lastLogIndex + 1) { // fast path this.lastLogIndex = lastLogEntry.getId().getIndex(); } // 说明待追加的日志与本地已有的日志之间存在交叉 else { // Appending entries overlap the local ones. We should find if there // is a conflicting index from which we should truncate the local ones. int conflictingIndex = 0; // 从头开始遍历寻找第一个 term 值不匹配的 logIndex for (; conflictingIndex < entries.size(); conflictingIndex++) { if (unsafeGetTerm(entries.get(conflictingIndex).getId().getIndex()) != entries.get(conflictingIndex).getId().getTerm()) { break; } } // 日志数据存在冲突,将本地冲突之后的日志数据阶段 if (conflictingIndex != entries.size()) { if (entries.get(conflictingIndex).getId().getIndex() <= this.lastLogIndex) { // Truncate all the conflicting entries to make local logs consensus with the leader. unsafeTruncateSuffix(entries.get(conflictingIndex).getId().getIndex() - 1); } this.lastLogIndex = lastLogEntry.getId().getIndex(); } // else this is a duplicated AppendEntriesRequest, we have nothing to do besides releasing all the entries // 将已经写入本地的日志数据从请求中剔除 if (conflictingIndex > 0) { // Remove duplication entries.subList(0, conflictingIndex).clear(); } } returntrue; } }
publicbooleancommitAt(finallong firstLogIndex, finallong lastLogIndex, final PeerId peer){ // TODO use lock-free algorithm here? finallong stamp = this.stampedLock.writeLock(); long lastCommittedIndex = 0; try { if (this.pendingIndex == 0) { returnfalse; } if (lastLogIndex < this.pendingIndex) { returntrue; }
if (lastLogIndex >= this.pendingIndex + this.pendingMetaQueue.size()) { thrownew ArrayIndexOutOfBoundsException(); }
finallong startAt = Math.max(this.pendingIndex, firstLogIndex); Ballot.PosHint hint = new Ballot.PosHint(); // 遍历检查当前批次中的 LogEntry 是否有成功被过半数节点复制的 for (long logIndex = startAt; logIndex <= lastLogIndex; logIndex++) { final Ballot bl = this.pendingMetaQueue.get((int) (logIndex - this.pendingIndex)); hint = bl.grant(peer, hint); // 当前 LogEntry 被过半数节点成功复制,记录 lastCommittedIndex if (bl.isGranted()) { lastCommittedIndex = logIndex; } } // 没有一条日志被过半数节点所成功复制,先返回 if (lastCommittedIndex == 0) { returntrue; } // When removing a peer off the raft group which contains even number of peers, // the quorum would decrease by 1, e.g. 3 of 4 changes to 2 of 3. In this case, // the log after removal may be committed before some previous logs, // since we use the new configuration to deal the quorum of the removal request, // we think it's safe to commit all the uncommitted previous logs, which is not well proved right now // 剔除已经被过半数节点复制的 LogIndex 对应的选票, // Raft 保证一个 LogEntry 被提交之后,在此之前的 LogEntry 一定是 committed 状态 this.pendingMetaQueue.removeFromFirst((int) (lastCommittedIndex - this.pendingIndex) + 1); LOG.debug("Committed log fromIndex={}, toIndex={}.", this.pendingIndex, lastCommittedIndex); this.pendingIndex = lastCommittedIndex + 1; // 更新集群的 lastCommittedIndex 值 this.lastCommittedIndex = lastCommittedIndex; } finally { this.stampedLock.unlockWrite(stamp); }
// 如果是 good,则说明还有可以继续处理的日志 while (iterImpl.isGood()) { // 获取当前待处理的 LogEntry 对象 final LogEntry logEntry = iterImpl.entry(); // 系统内部的 LogEntry 对象 if (logEntry.getType() != EnumOutter.EntryType.ENTRY_TYPE_DATA) { if (logEntry.getType() == EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION) { if (logEntry.getOldPeers() != null && !logEntry.getOldPeers().isEmpty()) { // Joint stage is not supposed to be noticeable by end users. this.fsm.onConfigurationCommitted(new Configuration(iterImpl.entry().getPeers())); } } if (iterImpl.done() != null) { // For other entries, we have nothing to do besides flush the // pending tasks and run this closure to notify the caller that the // entries before this one were successfully committed and applied. iterImpl.done().run(Status.OK()); } iterImpl.next(); continue; }
privatevoiddoApplyTasks(final IteratorImpl iterImpl){ final IteratorWrapper iter = new IteratorWrapper(iterImpl); finallong startApplyMs = Utils.monotonicMs(); finallong startIndex = iter.getIndex(); try { // 应用 StateMachine#onApply 方法 this.fsm.onApply(iter); } finally { this.nodeMetrics.recordLatency("fsm-apply-tasks", Utils.monotonicMs() - startApplyMs); this.nodeMetrics.recordSize("fsm-apply-tasks-count", iter.getIndex() - startIndex); } // 迭代器中的日志还没有被处理完,但是业务已经退出了 onApply 方法 if (iter.hasNext()) { LOG.error("Iterator is still valid, did you return before iterator reached the end?"); } // Try move to next in case that we pass the same log twice. iter.next(); }
staticvoidonRpcReturned(final ThreadId id, final RequestType reqType, final Status status, final Message request, final Message response, finalint seq, finalint stateVersion, finallong rpcSendTime){ if (id == null) { return; } finallong startTimeMs = Utils.nowMs(); Replicator r; // 获取当前 Replicator 对应的不可重入锁 if ((r = (Replicator) id.lock()) == null) { return; }
// 状态版本发生变化,说明 inflight 队列被重置过,忽略重置之前请求对应的响应 if (stateVersion != r.version) { LOG.debug("Replicator {} ignored old version response {}, current version is {}, request is {}\n, and response is {}\n, status is {}.", r, stateVersion, r.version, request, response, status); id.unlock(); return; }
finalboolean isLogDebugEnabled = LOG.isDebugEnabled(); StringBuilder sb = null; if (isLogDebugEnabled) { sb = new StringBuilder("Replicator ").append(r).append(" is processing RPC responses, "); } try { // 记录已经处理的响应数 int processed = 0; // 遍历处理响应 while (!holdingQueue.isEmpty()) { // 获取收到的请求序列最小的响应 final RpcResponse queuedPipelinedResponse = holdingQueue.peek();
// Sequence mismatch, waiting for next response. // 响应乱序,继续等待期望序列的响应 if (queuedPipelinedResponse.seq != r.requiredNextSeq) { if (processed > 0) { if (isLogDebugEnabled) { sb.append("has processed ").append(processed).append(" responses, "); } break; } else { // Do not processed any responses, UNLOCK id and return. continueSendEntries = false; id.unlock(); return; } }
/* 开始处理请求对应的响应 */
holdingQueue.remove(); processed++; // 获取 inflight 请求 final Inflight inflight = r.pollInflight(); if (inflight == null) { // 响应对应的请求已经被清除,忽略当前响应 if (isLogDebugEnabled) { sb.append("ignore response because request not found: ").append(queuedPipelinedResponse).append(",\n"); } continue; } // 请求序列与响应中记录的请求序列匹配不上,重置请求 inflight 队列,阻塞一会后重新发送探针请求 if (inflight.seq != queuedPipelinedResponse.seq) { // reset state LOG.warn("Replicator {} response sequence out of order, expect {}, but it is {}, reset state to try again.", r, inflight.seq, queuedPipelinedResponse.seq); r.resetInflights(); r.state = State.Probe; continueSendEntries = false; r.block(Utils.nowMs(), RaftError.EREQUEST.getNumber()); return; }
// 依据响应类型分别处理 try { switch (queuedPipelinedResponse.requestType) { // 处理 AppendEntries 请求 case AppendEntries: continueSendEntries = onAppendEntriesReturned( id, inflight, queuedPipelinedResponse.status, (AppendEntriesRequest) queuedPipelinedResponse.request, (AppendEntriesResponse) queuedPipelinedResponse.response, rpcSendTime, startTimeMs, r); break; // 处理 InstallSnapshot 请求 case Snapshot: continueSendEntries = onInstallSnapshotReturned( id, r, queuedPipelinedResponse.status, (InstallSnapshotRequest) queuedPipelinedResponse.request, (InstallSnapshotResponse) queuedPipelinedResponse.response); break; } } finally { if (continueSendEntries) { // Success, increase the response sequence. r.getAndIncrementRequiredNextSeq(); } else { // The id is already unlocked in onAppendEntriesReturned/onInstallSnapshotReturned, we SHOULD break out. break; } } } } finally { if (isLogDebugEnabled) { sb.append("after processed, continue to send entries: ").append(continueSendEntries); LOG.debug(sb.toString()); } // 继续发送 AppendEntries 请求 if (continueSendEntries) { // unlock in sendEntries. r.sendEntries(); } } }
publicstatic ThreadId start(final ReplicatorOptions opts, final RaftOptions raftOptions){ if (opts.getLogManager() == null || opts.getBallotBox() == null || opts.getNode() == null) { thrownew IllegalArgumentException("Invalid ReplicatorOptions."); } // 创建复制器 Replicator 对象 final Replicator r = new Replicator(opts, raftOptions); // 检查到目标节点的连通性 if (!r.rpcService.connect(opts.getPeerId().getEndpoint())) { LOG.error("Fail to init sending channel to {}.", opts.getPeerId()); // Return and it will be retried later. returnnull; }
public Message handleAppendEntriesRequest(final AppendEntriesRequest request, final RpcRequestClosure done){ boolean doUnlock = true; finallong startMs = Utils.monotonicMs(); this.writeLock.lock(); finalint entriesCount = request.getEntriesCount(); try { // 当前节点处于非活跃状态,响应错误 if (!this.state.isActive()) { LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm); return RpcFactoryHelper // .responseFactory() // .newResponse(AppendEntriesResponse.getDefaultInstance(), RaftError.EINVAL, "Node %s is not in active state, state %s.", getNodeId(), this.state.name()); }
// 解析请求来源节点 ID final PeerId serverId = new PeerId(); if (!serverId.parse(request.getServerId())) { // 解析失败,响应错误 LOG.warn("Node {} received AppendEntriesRequest from {} serverId bad format.", getNodeId(), request.getServerId()); return RpcFactoryHelper // .responseFactory() // .newResponse(AppendEntriesResponse.getDefaultInstance(), RaftError.EINVAL, "Parse serverId failed: %s.", request.getServerId()); }
// 校验请求中的 term 值,如果小于当前节点,则拒绝请求并返回自己当前的 term 值 if (request.getTerm() < this.currTerm) { LOG.warn("Node {} ignore stale AppendEntriesRequest from {}, term={}, currTerm={}.", getNodeId(), request.getServerId(), request.getTerm(), this.currTerm); return AppendEntriesResponse.newBuilder() // .setSuccess(false) // .setTerm(this.currTerm) // .build(); }
// 请求来源节点并不是当前节点所知道的 Leader 节点, // 可能出现网络分区,尝试将 term 值加 1,以触发 Leader 节点 stepdown if (!serverId.equals(this.leaderId)) { LOG.error("Another peer {} declares that it is the leader at term {} which was occupied by leader {}.", serverId, this.currTerm, this.leaderId); // Increase the term by 1 and make both leaders step down to minimize the loss of split brain stepDown(request.getTerm() + 1, false, new Status(RaftError.ELEADERCONFLICT, "More than one leader in the same term.")); return AppendEntriesResponse.newBuilder() // .setSuccess(false) // .setTerm(request.getTerm() + 1) // .build(); }
privatestaticbooleanonAppendEntriesReturned(final ThreadId id, final Inflight inflight, final Status status, final AppendEntriesRequest request, final AppendEntriesResponse response, finallong rpcSendTime, finallong startTimeMs, final Replicator r){ // inflight 请求与响应中记录的请求对应的 logIndex 不匹配,重置请求 inflight 队列,重新发送探针请求 if (inflight.startIndex != request.getPrevLogIndex() + 1) { LOG.warn("Replicator {} received invalid AppendEntriesResponse, in-flight startIndex={}, request prevLogIndex={}, reset the replicator state and probe again.", r, inflight.startIndex, request.getPrevLogIndex()); r.resetInflights(); r.state = State.Probe; // unlock id in sendEmptyEntries r.sendEmptyEntries(false); returnfalse; }
// ... metrics
// 目标 Follower 发生错误,重置请求 inflight 队列,重新发送探针请求 if (!status.isOk()) { // If the follower crashes, any RPC to the follower fails immediately, // so we need to block the follower for a while instead of looping until it comes back or be removed // dummy_id is unlock in block notifyReplicatorStatusListener(r, ReplicatorEvent.ERROR, status); if (++r.consecutiveErrorTimes % 10 == 0) { LOG.warn("Fail to issue RPC to {}, consecutiveErrorTimes={}, error={}", r.options.getPeerId(), r.consecutiveErrorTimes, status); } // 重置 inflight 队列,阻塞一会儿重新发送探针请求 r.resetInflights(); r.state = State.Probe; // unlock in in block r.block(startTimeMs, status.getCode()); returnfalse; }
/* 目标 Follower 节点运行正常 */
r.consecutiveErrorTimes = 0; // 目标 Follower 节点拒绝响应 if (!response.getSuccess()) { // Follower 节点的 term 值更大 if (response.getTerm() > r.options.getTerm()) { final NodeImpl node = r.options.getNode(); r.notifyOnCaughtUp(RaftError.EPERM.getNumber(), true); // 销毁当前复制器 Replicator r.destroy(); // 提升当前节点的 term 值,并执行 stepdown node.increaseTermTo(response.getTerm(), new Status(RaftError.EHIGHERTERMRESPONSE, "Leader receives higher term heartbeat_response from peer:%s", r.options.getPeerId())); returnfalse; } // 更新最近一次向目标节点发送 RPC 请求的时间戳 if (rpcSendTime > r.lastRpcSendTimestamp) { r.lastRpcSendTimestamp = rpcSendTime; } // 重置 inflight 队列,调整 nextIndex 之后重新发送探针请求 r.resetInflights(); // prev_log_index and prev_log_term doesn't match if (response.getLastLogIndex() + 1 < r.nextIndex) { LOG.debug("LastLogIndex at peer={} is {}", r.options.getPeerId(), response.getLastLogIndex()); // The peer contains less logs than leader r.nextIndex = response.getLastLogIndex() + 1; } // Follower 节点本地的 logIndex 更大,可能包含老的 Leader 节点复制的日志, // 递减 nextIndex 之后重试,直到找到两个节点相同日志的交叉点为止 else { // The peer contains logs from old term which should be truncated, // decrease _last_log_at_peer by one to test the right index to keep if (r.nextIndex > 1) { LOG.debug("logIndex={} dismatch", r.nextIndex); r.nextIndex--; } else { LOG.error("Peer={} declares that log at index=0 doesn't match, which is not supposed to happen", r.options.getPeerId()); } } // dummy_id is unlock in _send_heartbeat // 重新发送探针请求 r.sendEmptyEntries(false); returnfalse; }
/* 目标 Follower 节点响应成功 */
// 请求期间 term 值已经发生变化,当前节点可能已经不是 Leader 节点,清空 inflight 队列 if (response.getTerm() != r.options.getTerm()) { r.resetInflights(); r.state = State.Probe; LOG.error("Fail, response term {} dismatch, expect term {}", response.getTerm(), r.options.getTerm()); id.unlock(); returnfalse; } // 更新最近一次向目标节点发送 RPC 请求的时间戳 if (rpcSendTime > r.lastRpcSendTimestamp) { r.lastRpcSendTimestamp = rpcSendTime; } finalint entriesSize = request.getEntriesCount(); // 如果是复制日志请求,当 Follower 节点复制成功之后需要尝试执行 BallotBox#commitAt 以检测当前日志是否被过半数的节点成功复制 if (entriesSize > 0) { if (r.options.getReplicatorType().isFollower()) { // Only commit index when the response is from follower. r.options.getBallotBox().commitAt(r.nextIndex, r.nextIndex + entriesSize - 1, r.options.getPeerId()); } }
staticbooleancontinueSending(final ThreadId id, finalint errCode){ // 当前 Replicator 已被销毁 if (id == null) { // It was destroyed already returntrue; } final Replicator r = (Replicator) id.lock(); if (r == null) { returnfalse; } r.waitId = -1; // 超时,重新发送探针请求 if (errCode == RaftError.ETIMEDOUT.getNumber()) { r.blockTimer = null; // Send empty entries after block timeout to check the correct // _next_index otherwise the replicator is likely waits in executor.shutdown(); // _wait_more_entries and no further logs would be replicated even if the // last_index of this followers is less than |next_index - 1| r.sendEmptyEntries(false); } // LogManager 正常运行,继续尝试向目标 Follower 节点发送数据 elseif (errCode != RaftError.ESTOP.getNumber()) { // id is unlock in _send_entries r.sendEntries(); } // LogManager 被停止,停止向目标节点发送日志数据 else { LOG.warn("Replicator {} stops sending entries.", id); id.unlock(); } returntrue; }
// Parse request long index = prevLogIndex; final List<LogEntry> entries = new ArrayList<>(entriesCount); ByteBuffer allData = null; if (request.hasData()) { allData = request.getData().asReadOnlyByteBuffer(); }
final List<RaftOutter.EntryMeta> entriesList = request.getEntriesList(); // 遍历逐一解析请求中的 LogEntry 数据,记录到 entries 列表中 for (int i = 0; i < entriesCount; i++) { index++; // 获取 LogEntry 元数据信息 final RaftOutter.EntryMeta entry = entriesList.get(i);
if (entry.getPeersCount() > 0) { if (entry.getType() != EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION) { thrownew IllegalStateException( "Invalid log entry that contains peers but is not ENTRY_TYPE_CONFIGURATION type: " + entry.getType()); }
// 填充集群节点配置信息 fillLogEntryPeers(entry, logEntry); } elseif (entry.getType() == EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION) { thrownew IllegalStateException( "Invalid log entry that contains zero peers but is ENTRY_TYPE_CONFIGURATION type"); } return logEntry; } returnnull; }
// 如果是复制日志请求,当 Follower 节点复制成功之后需要尝试执行 BallotBox#commitAt 以检测当前日志是否被过半数的节点成功复制 if (entriesSize > 0) { if (r.options.getReplicatorType().isFollower()) { // Only commit index when the response is from follower. r.options.getBallotBox().commitAt(r.nextIndex, r.nextIndex + entriesSize - 1, r.options.getPeerId()); } if (LOG.isDebugEnabled()) { LOG.debug("Replicated logs in [{}, {}] to peer {}", r.nextIndex, r.nextIndex + entriesSize - 1, r.options.getPeerId()); } }