在《理解 Raft 分布式共识算法 》一文中,我们对于 Raft 算法的理论进行了介绍。过去几年,围绕 Raft 算法涌现出了一系列各类语言的实现(参考 Raft 算法官网 ),这也充分证明了该算法相对于 Paxos 算法在理解和实现层面的友好性。从本文开始,我就以 SOFA-JRaft 为例,用多篇文章来分析一个生产级别的 Raft 算法应该如何实现。
SOFA-JRaft 是一个基于 Raft 算法的 java 语言实现算法库,提供生产级别的稳定性、容错性,以及高性能,支持 MULTI-RAFT-GROUP,适用于高负载低延迟的场景。
本系列文章将从源码层面剖析 SOFA-JRaft 的设计与实现,区别于 SOFA:JRaftLab/
项目中的一系列文章侧重于介绍 SOFA-JRaft 各模块的设计,本系列文章更加侧重于 SOFA-JRaft 各模块的实现,从实现层面再反观 SOFA-JRaft 的架构设计。
注:本系列文章如不做特殊说明,均使用 JRaft 指代 SOFA-JRaft,使用 Raft 指代 Raft 算法。
Leader 选举示例
在正式开始之前还是先引用一个 JRaft 的官方示例,演示如何基于 JRaft 实现分布式场景下的主节点选举。ElectionBootstrap 是整个示例的驱动类,对应的 main 函数实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public static void main (final String[] args) { if (args.length < 4 ) { System.out.println( "Useage : java com.alipay.sofa.jraft.example.election.ElectionBootstrap {dataPath} {groupId} {serverId} {initConf}" ); System.out.println( "Example: java com.alipay.sofa.jraft.example.election.ElectionBootstrap /tmp/server1 election_test 127.0.0.1:8081 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083" ); System.exit(1 ); } final String dataPath = args[0 ]; final String groupId = args[1 ]; final String serverIdStr = args[2 ]; final String initialConfStr = args[3 ]; final ElectionNodeOptions electionOpts = new ElectionNodeOptions(); electionOpts.setDataPath(dataPath); electionOpts.setGroupId(groupId); electionOpts.setServerAddress(serverIdStr); electionOpts.setInitialServerAddressList(initialConfStr); final ElectionNode node = new ElectionNode(); node.addLeaderStateListener(new LeaderStateListener() { final PeerId serverId = node.getNode().getLeaderId(); final String ip = serverId.getIp(); final int port = serverId.getPort(); @Override public void onLeaderStart (long leaderTerm) { System.out.println("[ElectionBootstrap] Leader's ip is: " + ip + ", port: " + port); System.out.println("[ElectionBootstrap] Leader start on term: " + leaderTerm); } @Override public void onLeaderStop (long leaderTerm) { System.out.println("[ElectionBootstrap] Leader stop on term: " + leaderTerm); } }); node.init(electionOpts); }
由上述实现可以看出在 Leader 选举场景下启动一个 JRaft 节点需要指定 4 个参数,包括:
数据存储根路径,用于存储日志、元数据,以及快照数据。
组 ID,一个组可以看做是一个独立的 Raft 集群,JRaft 支持 MULTI-RAFT-GROUP。
节点地址,即当前节点的 IP 和端口号。
初始集群节点列表,即初始构成 JRaft 集群的节点列表。
ElectionNode 是整个启动示例的核心实现类,方法 ElectionNode#init
实现了初始化和启动单个 JRaft 节点的逻辑,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 public boolean init (final ElectionNodeOptions opts) { if (this .started) { LOG.info("[ElectionNode: {}] already started." , opts.getServerAddress()); return true ; } NodeOptions nodeOpts = opts.getNodeOptions(); if (nodeOpts == null ) { nodeOpts = new NodeOptions(); } this .fsm = new ElectionOnlyStateMachine(this .listeners); nodeOpts.setFsm(this .fsm); final Configuration initialConf = new Configuration(); if (!initialConf.parse(opts.getInitialServerAddressList())) { throw new IllegalArgumentException("Fail to parse initConf: " + opts.getInitialServerAddressList()); } nodeOpts.setInitialConf(initialConf); final String dataPath = opts.getDataPath(); try { FileUtils.forceMkdir(new File(dataPath)); } catch (final IOException e) { LOG.error("Fail to make dir for dataPath {}." , dataPath); return false ; } nodeOpts.setLogUri(Paths.get(dataPath, "log" ).toString()); nodeOpts.setRaftMetaUri(Paths.get(dataPath, "meta" ).toString()); final String groupId = opts.getGroupId(); final PeerId serverId = new PeerId(); if (!serverId.parse(opts.getServerAddress())) { throw new IllegalArgumentException("Fail to parse serverId: " + opts.getServerAddress()); } final RpcServer rpcServer = RaftRpcServerFactory.createRaftRpcServer(serverId.getEndpoint()); this .raftGroupService = new RaftGroupService(groupId, serverId, nodeOpts, rpcServer); this .node = this .raftGroupService.start(); if (this .node != null ) { this .started = true ; } return this .started; }
实现一个 JRaft 节点的初始化和启动过程主要分为两个步骤:
构造 Raft 节点配置 NodeOptions 对象;
初始化并启动节点,以 RPC 服务的方式运行。
构建 NodeOptions 配置对象的过程比较简单,这里主要说明一下状态机 StateMachine 选项。以典型的基于 JRaft 实现的 KV 数据库为例,当我们往数据库中写入一条数据时,对于 JRaft 而言就像是我们往 Leader 节点发送了一条指令。Leader 节点会将该指令封装成一条日志条目 LogEntry 记录到本地,并复制给集群中的其它 Follower 节点。当集群中过半数节点都完成了对该 LogEntry 的复制之后,Leader 节点认为可以提交该条目(即将该 LogEntry 的状态修改为 committed),并在未来的某个时刻将该 LogEntry 中的指令应用到本地存储介质中。
整个流程中封装指令为 LogEntry 对象,接着将 LogEntry 复制到大部分 Follower 节点,并提交该 LogEntry 的过程都是通用的,由 JRaft 负责实现。然而,对于指令的解析和应用则需要结合具体的应用场景,以 KV 数据库场景为例,需要解析出 LogEntry 中的指令,并依据指令类型决定对相应的 key 做下一步具体操作,是更新还是删除。JRaft 定义了一个 StateMachine 接口,并通过 StateMachine#onApply
方法将已经成功同步给集群中过半数节点的 LogEntry 对应的指令透传给用户,由用户去实现对指令的处理逻辑。
继续回到本示例,Leader 选举是 Raft 算法内置的功能,可以不涉及用户指令,所以上述示例对于 StateMachine 接口的实现类 ElectionOnlyStateMachine 也仅仅是简单打印了些日志,这里不再展开。
JRaft 节点的初始化和启动过程由 RaftGroupService 类封装,JRaft 将其定义为一个框架类,用于封装 JRaft 节点的创建和启动过程。方法 RaftGroupService#start
实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public synchronized Node start (final boolean startRpcServer) { if (this .started) { return this .node; } if (this .serverId == null || this .serverId.getEndpoint() == null || this .serverId.getEndpoint().equals(new Endpoint(Utils.IP_ANY, 0 ))) { throw new IllegalArgumentException("Blank serverId:" + this .serverId); } if (StringUtils.isBlank(this .groupId)) { throw new IllegalArgumentException("Blank group id" + this .groupId); } NodeManager.getInstance().addAddress(this .serverId.getEndpoint()); this .node = RaftServiceFactory.createAndInitRaftNode(this .groupId, this .serverId, this .nodeOptions); if (startRpcServer) { this .rpcServer.init(null ); } else { LOG.warn("RPC server is not started in RaftGroupService." ); } this .started = true ; LOG.info("Start the RaftGroupService successfully." ); return this .node; }
其中创建并初始化 JRaft 节点的过程实际上是委托给了 Node#init
方法执行。Node 接口用于描绘一个 JRaft 节点,是整个 JRaft 算法库中最核心的类,本系列文章对于 JRaft 设计和实现的分析基本上都围绕着 Node 接口展开。我们将在本文的后面小节对 JRaft 节点的初始化过程进行展开分析。
整体架构设计
作为本系列文章的第一篇,在开始剖析源码之前,我打算用一小节的篇幅对 JRaft 的整体架构设计进行一个简单的介绍,这样能够让读者对 JRaft 的实现有一个整体层面的认知。
上图描绘了 JRaft 的整体架构设计,可以大致将 JRaft 的实现分为以下几个模块:
数据存储模块 :包括日志数据存储、元数据存储,以及快照数据存储。
日志复制模块 :JRaft 针对每个 Follower 节点都会为其创建并绑定一个复制器 Replicator 实例,Replicator 主要负责向对应的 Follower 节点复制数据、安装快照,以及维持心跳。ReplicatorGroup 负责管理一个 group 维度下的所有复制器 Replicator 实例。
周期性任务计时器 :计时器在整个 Raft 算法的选主过程中起着至关重要的作用,JRaft 默认借鉴 Netty 的单层时间轮算法实现了一个高效的计时器,并应用到预选举、正式选举、周期性生成快照,以及 Leader 节点角色降级等多个计时场景。
选票箱模块 :Raft 算法的共识决策采用多数投票机制,所以选票和选票箱是对这一机制的直观实现。
集群配置管理模块 :Raft 作为一个服务于分布式应用的协议,免不了会遇到节点的上下线、网络分区等问题,所以需要对集群中的节点进行管理,以保证整个协议的正常运行。
状态机调度模块 :前面我们已经对状态机 StateMachine 进行了一个简单的介绍,而状态机调度器 FSMCaller 相当于在 JRaft 集群与业务 StateMachine 实现之间建立了一座桥梁,用于调度业务 StateMachine 的运行。
CLI 模块 :CLI 即 Client CommandLine Service,是在 JRaft 节点提供的 RPC 服务中暴露的一系列用于管理 JRaft 集群的服务接口,例如增删节点、变更节点配置、重置节点配置,以及转移 Leader 节点等功能。
作为基于 JRaft 算法库实现的应用程序,我们通常可以基于 CLI 服务管理 JRaft 集群,实现 StateMachine 接口以感知 JRaft 集群的运行状态,以及调用 Node#apply
方法向 JRaft 集群提交指令。这些指令在被 JRaft 成功复制到过半数的节点上之后,最终会通过调用 StateMachine#onApply
方法透传给业务,由业务负责处理这些指令。
节点初始化
在对 JRaft 的整体架构设计有一个基本的认知之后,本文的最后我们来分析一下 JRaft 节点的初始化启动过程。前面我们通过 Leader 选举的例子演示了 JRaft 的基本使用,JRaft 的使用可以简单概括为以下四个步骤:
实现 StateMachine 接口,并创建状态机实例;
初始化 NodeOptions 配置对象,用于设置节点的运行参数;
基于 NodeOptions 配置对象创建并初始化 Node 实例;
启动节点,以 RPC 服务的方式运行。
其中步骤 1 和 2 都比较简单,通过前面的示例可以一目了然,本小节我们重点分析一下步骤 3 和 4,看看 JRaft 是如何初始化和启动一个 JRaft 参与节点的。我们从 Node#init
方法切入,该方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 public boolean init (final NodeOptions opts) { Requires.requireNonNull(opts, "Null node options" ); Requires.requireNonNull(opts.getRaftOptions(), "Null raft options" ); Requires.requireNonNull(opts.getServiceFactory(), "Null jraft service factory" ); this .serviceFactory = opts.getServiceFactory(); this .options = opts; this .raftOptions = opts.getRaftOptions(); this .metrics = new NodeMetrics(opts.isEnableMetrics()); this .serverId.setPriority(opts.getElectionPriority()); this .electionTimeoutCounter = 0 ; if (this .serverId.getIp().equals(Utils.IP_ANY)) { LOG.error("Node can't started from IP_ANY." ); return false ; } if (!NodeManager.getInstance().serverExists(this .serverId.getEndpoint())) { LOG.error("No RPC server attached to, did you forget to call addService?" ); return false ; } this .timerManager = TIMER_FACTORY.getRaftScheduler( this .options.isSharedTimerPool(), this .options.getTimerPoolSize(), "JRaft-Node-ScheduleThreadPool" ); final String suffix = getNodeId().toString(); String name = "JRaft-VoteTimer-" + suffix; this .voteTimer = new RepeatedTimer(name, this .options.getElectionTimeoutMs(), TIMER_FACTORY.getVoteTimer(this .options.isSharedVoteTimer(), name)) { @Override protected void onTrigger () { handleVoteTimeout(); } @Override protected int adjustTimeout (final int timeoutMs) { return randomTimeout(timeoutMs); } }; name = "JRaft-ElectionTimer-" + suffix; this .electionTimer = new RepeatedTimer(name, this .options.getElectionTimeoutMs(), TIMER_FACTORY.getElectionTimer(this .options.isSharedElectionTimer(), name)) { @Override protected void onTrigger () { handleElectionTimeout(); } @Override protected int adjustTimeout (final int timeoutMs) { return randomTimeout(timeoutMs); } }; name = "JRaft-StepDownTimer-" + suffix; this .stepDownTimer = new RepeatedTimer(name, this .options.getElectionTimeoutMs() >> 1 , TIMER_FACTORY.getStepDownTimer(this .options.isSharedStepDownTimer(), name)) { @Override protected void onTrigger () { handleStepDownTimeout(); } }; name = "JRaft-SnapshotTimer-" + suffix; this .snapshotTimer = new RepeatedTimer(name, this .options.getSnapshotIntervalSecs() * 1000 , TIMER_FACTORY.getSnapshotTimer(this .options.isSharedSnapshotTimer(), name)) { private volatile boolean firstSchedule = true ; @Override protected void onTrigger () { handleSnapshotTimeout(); } @Override protected int adjustTimeout (final int timeoutMs) { if (!this .firstSchedule) { return timeoutMs; } this .firstSchedule = false ; if (timeoutMs > 0 ) { int half = timeoutMs / 2 ; return half + ThreadLocalRandom.current().nextInt(half); } else { return timeoutMs; } } }; this .configManager = new ConfigurationManager(); this .applyDisruptor = DisruptorBuilder.<LogEntryAndClosure>newInstance() .setRingBufferSize(this .raftOptions.getDisruptorBufferSize()) .setEventFactory(new LogEntryAndClosureFactory()) .setThreadFactory(new NamedThreadFactory("JRaft-NodeImpl-Disruptor-" , true )) .setProducerType(ProducerType.MULTI) .setWaitStrategy(new BlockingWaitStrategy()) .build(); this .applyDisruptor.handleEventsWith(new LogEntryAndClosureHandler()); this .applyDisruptor.setDefaultExceptionHandler(new LogExceptionHandler<Object>(getClass().getSimpleName())); this .applyQueue = this .applyDisruptor.start(); if (this .metrics.getMetricRegistry() != null ) { this .metrics.getMetricRegistry().register("jraft-node-impl-disruptor" , new DisruptorMetricSet(this .applyQueue)); } this .fsmCaller = new FSMCallerImpl(); if (!initLogStorage()) { LOG.error("Node {} initLogStorage failed." , getNodeId()); return false ; } if (!initMetaStorage()) { LOG.error("Node {} initMetaStorage failed." , getNodeId()); return false ; } if (!initFSMCaller(new LogId(0 , 0 ))) { LOG.error("Node {} initFSMCaller failed." , getNodeId()); return false ; } this .ballotBox = new BallotBox(); final BallotBoxOptions ballotBoxOpts = new BallotBoxOptions(); ballotBoxOpts.setWaiter(this .fsmCaller); ballotBoxOpts.setClosureQueue(this .closureQueue); if (!this .ballotBox.init(ballotBoxOpts)) { LOG.error("Node {} init ballotBox failed." , getNodeId()); return false ; } if (!initSnapshotStorage()) { LOG.error("Node {} initSnapshotStorage failed." , getNodeId()); return false ; } final Status st = this .logManager.checkConsistency(); if (!st.isOk()) { LOG.error("Node {} is initialized with inconsistent log, status={}." , getNodeId(), st); return false ; } this .conf = new ConfigurationEntry(); this .conf.setId(new LogId()); if (this .logManager.getLastLogIndex() > 0 ) { checkAndSetConfiguration(false ); } else { this .conf.setConf(this .options.getInitialConf()); this .targetPriority = getMaxPriorityOfNodes(this .conf.getConf().getPeers()); } if (!this .conf.isEmpty()) { Requires.requireTrue(this .conf.isValid(), "Invalid conf: %s" , this .conf); } else { LOG.info("Init node {} with empty conf." , this .serverId); } this .replicatorGroup = new ReplicatorGroupImpl(); this .rpcService = new DefaultRaftClientService(this .replicatorGroup); final ReplicatorGroupOptions rgOpts = new ReplicatorGroupOptions(); rgOpts.setHeartbeatTimeoutMs(heartbeatTimeout(this .options.getElectionTimeoutMs())); rgOpts.setElectionTimeoutMs(this .options.getElectionTimeoutMs()); rgOpts.setLogManager(this .logManager); rgOpts.setBallotBox(this .ballotBox); rgOpts.setNode(this ); rgOpts.setRaftRpcClientService(this .rpcService); rgOpts.setSnapshotStorage(this .snapshotExecutor != null ? this .snapshotExecutor.getSnapshotStorage() : null ); rgOpts.setRaftOptions(this .raftOptions); rgOpts.setTimerManager(this .timerManager); this .options.setMetricRegistry(this .metrics.getMetricRegistry()); if (!this .rpcService.init(this .options)) { LOG.error("Fail to init rpc service." ); return false ; } this .replicatorGroup.init(new NodeId(this .groupId, this .serverId), rgOpts); this .readOnlyService = new ReadOnlyServiceImpl(); final ReadOnlyServiceOptions rosOpts = new ReadOnlyServiceOptions(); rosOpts.setFsmCaller(this .fsmCaller); rosOpts.setNode(this ); rosOpts.setRaftOptions(this .raftOptions); if (!this .readOnlyService.init(rosOpts)) { LOG.error("Fail to init readOnlyService." ); return false ; } this .state = State.STATE_FOLLOWER; if (LOG.isInfoEnabled()) { LOG.info("Node {} init, term={}, lastLogId={}, conf={}, oldConf={}." , getNodeId(), this .currTerm, this .logManager.getLastLogId(false ), this .conf.getConf(), this .conf.getOldConf()); } if (this .snapshotExecutor != null && this .options.getSnapshotIntervalSecs() > 0 ) { LOG.debug("Node {} start snapshot timer, term={}." , getNodeId(), this .currTerm); this .snapshotTimer.start(); } if (!this .conf.isEmpty()) { stepDown(this .currTerm, false , new Status()); } if (!NodeManager.getInstance().add(this )) { LOG.error("NodeManager add {} failed." , getNodeId()); return false ; } this .writeLock.lock(); if (this .conf.isStable() && this .conf.getConf().size() == 1 && this .conf.getConf().contains(this .serverId)) { electSelf(); } else { this .writeLock.unlock(); } return true ; }
整个 JRaft 节点的初始化过程执行了大量的工作,整体可以概括为以下几个方面:
创建并初始化延时任务调度器 TimerManager,主要用于处理内部的延时任务(与周期性任务相区分)。
创建计时器,用于执行周期性任务,包括:预选举计时器(electionTimer)、正式选举计时器(voteTimer)、角色降级计时器(stepDownTimer),以及快照周期性生成计时器(snapshotTimer)。
创建集群节点配置管理器 ConfigurationManager,并初始化集群节点配置信息。
初始化 Task 处理相关的 disruptor 队列,用于异步处理业务调用 Node#apply
方法向集群提交的 Task 列表。
初始化日志数据存储模块,并对日志数据执行一致性校验。
初始化元数据存储模块。
初始化快照数据存储模块。
创建并初始化状态机调度器 FSMCaller。
创建并初始化选票箱 BallotBox。
创建并初始化复制器管理组 ReplicatorGroup。
创建并初始化 RPC 客户端 RaftClientService。
创建并初始化只读服务 ReadOnlyService,用于支持线性一致性读。
如果启用了快照生成机制,则启动周期性快照生成任务。
如果初始集群节点不为空,则尝试执行角色降级(stepdown),以对本地状态进行初始化,并启动预选举计时器。
如果集群只有当前这一个节点,则尝试选举自己为 Leader。
下面挑选几个稍微复杂一点的展开说明。
周期任务调度器
Raft 算法的运行依赖于超时机制,所以在实现层面需要提供对应的计时器,用于调度周期性任务。上面在初始化 JRaft 节点期间构造了一系列的计时器,包括:预选举计时器(electionTimer)、正式选举计时器(voteTimer)、角色降级计时器(stepDownTimer),以及周期性快照生成计时器(snapshotTimer)。本小节将分析这些计时器背后的实现,即 RepeatedTimer 类。
首先,我们先来体验一下 RepeatedTimer 的使用方式,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private static class TestRepeatedTimer extends RepeatedTimer { public TestRepeatedTimer (String name, int timeoutMs) { super (name, timeoutMs); } @Override protected void onTrigger () { System.out.println("on trigger" ); } @Override protected int adjustTimeout (int timeoutMs) { return RandomUtils.nextInt(timeoutMs); } } final TestRepeatedTimer timer = new TestRepeatedTimer("test" , (int ) TimeUnit.SECONDS.toMillis(1L ));timer.start();
上述示例中我们通过继承 RepeatedTimer 抽象类定义了一个测试用的 TestRepeatedTimer 实现类,该实现类会周期性的往控制台打印“on trigger”字符串。方法 RepeatedTimer#onTrigger
是 RepeatedTimer 中声明的唯一一个抽象方法,我们需要通过该方法实现自己的周期性业务逻辑。上述示例中,我们还覆盖实现了 RepeatedTimer#adjustTimeout
方法,以实现在运行期间对计时周期进行随机化调整。最后,通过调用 RepeatedTimer#start
方法,我们可以启动该计时器。
下面对 RepeatedTimer 的运行机制进行分析。RepeatedTimer 定义了如下构造方法:
1 2 3 4 5 6 7 8 9 10 11 public RepeatedTimer (final String name, final int timeoutMs) { this (name, timeoutMs, new HashedWheelTimer(new NamedThreadFactory(name, true ), 1 , TimeUnit.MILLISECONDS, 2048 )); } public RepeatedTimer (final String name, final int timeoutMs, final Timer timer) { super (); this .name = name; this .timeoutMs = timeoutMs; this .stopped = true ; this .timer = Requires.requireNonNull(timer, "timer" ); }
其中 Timer 是一个接口(定义如下),其功能是延迟指定时间执行提交的任务,即 TimerTask。
1 2 3 4 public interface Timer { Timeout newTimeout (final TimerTask task, final long delay, final TimeUnit unit) ; Set<Timeout> stop () ; }
围绕 Timer 接口,JRaft 提供了 DefaultTimer 和 HashedWheelTimer 两个实现类,其中前者基于 JDK 内置的 ScheduledExecutorService 实现,后者则基于单层时间轮算法实现。相对而言,HashedWheelTimer 较 DefaultTimer 在性能和精度层面表现更优,所以 JRaft 将其作为默认 Timer 应用于 RepeatedTimer 中。
本小节重点关注 RepeatedTimer 的实现机制,关于 HashedWheelTimer 的设计和实现可以参考 Netty 相关的源码分析文章。接下来,我们从 RepeatedTimer#start
方法开始,该方法用于启动对应的计时器,实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public void start () { this .lock.lock(); try { if (this .destroyed) { return ; } if (!this .stopped) { return ; } this .stopped = false ; if (this .running) { return ; } this .running = true ; schedule(); } finally { this .lock.unlock(); } }
上述方法主要是设置一些本地状态标识,对于首次启动的计时器会调用 RepeatedTimer#schedule
方法开始调度执行周期性任务,该方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 private void schedule () { if (this .timeout != null ) { this .timeout.cancel(); } final TimerTask timerTask = timeout -> { try { RepeatedTimer.this .run(); } catch (final Throwable t) { LOG.error("Run timer task failed, taskName={}." , RepeatedTimer.this .name, t); } }; this .timeout = this .timer.newTimeout(timerTask, adjustTimeout(this .timeoutMs), TimeUnit.MILLISECONDS); } public void run () { this .invoking = true ; try { onTrigger(); } catch (final Throwable t) { LOG.error("Run timer failed." , t); } boolean invokeDestroyed = false ; this .lock.lock(); try { this .invoking = false ; if (this .stopped) { this .running = false ; invokeDestroyed = this .destroyed; } else { this .timeout = null ; schedule(); } } finally { this .lock.unlock(); } if (invokeDestroyed) { onDestroy(); } }
具体运行逻辑如上述代码注释,当一轮定时任务执行完成时,如果计时器未被停止,则会调用 RepeatedTimer#schedule
方法提交下一轮任务,以此实现周期性任务调度。不同于常规计时器始终按照相同的时间间隔调度任务,RepeatedTimer 定义了一个 RepeatedTimer#adjustTimeout
方法,以支持在运行期间对调度间隔进行动态调整。
这一机制对于 Raft 算法而言尤为重要,在 Raft 集群节点运行期间可能存在两个 Follower 节点同时发起 Leader 选举进程的情况,如果这两个 Follower 节点正好都得到半数投票,则本轮选举失败,需要在下一轮调度周期再次发起 Leader 选举请求。如果计时器始终按照相同的时间间隔进行调度,则这两个节点将会在未来相同的时刻再次发起 Leader 选举请求,如果不幸再次均分投票,则又拉长了集群的无 Leader 节点窗口,而通过动态调整调度间隔这么一个简单的策略则能够很好的避免此类问题。
数据存储
JRaft 的数据存储层主要包含对三类数据的存储:日志数据、元数据,以及快照数据。其中日志数据存储的也就是前面提及到的 LogEntry 数据,包含系统内部运行产生的日志,以及业务向集群提交 Task 所生成的日志,日志数据默认采用 RocksDB 进行存储;元数据用于记录当前节点的 currentTerm 值,以及投票 votedFor 信息;快照数据是对日志数据存储的一种优化手段,用于将那些已经被应用的日志进行压缩存储,以节省磁盘空间占用,同时缩短新接入节点同步集群数据的时间。
日志数据存储
Raft 节点在初始化期间会调用 NodeImpl#initLogStorage
方法初始化日志数据存储模块,该方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private boolean initLogStorage () { Requires.requireNonNull(this .fsmCaller, "Null fsm caller" ); this .logStorage = this .serviceFactory.createLogStorage(this .options.getLogUri(), this .raftOptions); this .logManager = new LogManagerImpl(); final LogManagerOptions opts = new LogManagerOptions(); opts.setLogEntryCodecFactory(this .serviceFactory.createLogEntryCodecFactory()); opts.setLogStorage(this .logStorage); opts.setConfigurationManager(this .configManager); opts.setFsmCaller(this .fsmCaller); opts.setNodeMetrics(this .metrics); opts.setDisruptorBufferSize(this .raftOptions.getDisruptorBufferSize()); opts.setRaftOptions(this .raftOptions); return this .logManager.init(opts); }
整个方法的主要逻辑在于创建和初始化 LogManager 实例。LogManager 是一个接口,由名称可以推断出它是一个日志管理器,基于 LogStorage 提供了对于日志数据的读写功能,定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public interface LogManager extends Lifecycle <LogManagerOptions >, Describer { void addLastLogIndexListener (final LastLogIndexListener listener) ; void removeLastLogIndexListener (final LastLogIndexListener listener) ; void join () throws InterruptedException ; void appendEntries (final List<LogEntry> entries, StableClosure done) ; void setSnapshot (final SnapshotMeta meta) ; void clearBufferedLogs () ; LogEntry getEntry (final long index) ; long getTerm (final long index) ; long getFirstLogIndex () ; long getLastLogIndex () ; long getLastLogIndex (final boolean isFlush) ; LogId getLastLogId (final boolean isFlush) ; ConfigurationEntry getConfiguration (final long index) ; ConfigurationEntry checkAndSetConfiguration (final ConfigurationEntry current) ; long wait (final long expectedLastLogIndex, final NewLogCallback cb, final Object arg) ; boolean removeWaiter (final long id) ; void setAppliedId (final LogId appliedId) ; Status checkConsistency () ; }
本文重点分析 JRaft 节点的初始化过程,所以不打算对 LogManager 接口中声明各个方法实现逐一展开分析,后续遇到对相应方法的调用时再结合上下文进行介绍。JRaft 针对 LogManager 接口提供了 LogManagerImpl 实现类,对应的 LogManager#init
方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 public boolean init (final LogManagerOptions opts) { this .writeLock.lock(); try { if (opts.getLogStorage() == null ) { LOG.error("Fail to init log manager, log storage is null" ); return false ; } this .raftOptions = opts.getRaftOptions(); this .nodeMetrics = opts.getNodeMetrics(); this .logStorage = opts.getLogStorage(); this .configManager = opts.getConfigurationManager(); LogStorageOptions lsOpts = new LogStorageOptions(); lsOpts.setConfigurationManager(this .configManager); lsOpts.setLogEntryCodecFactory(opts.getLogEntryCodecFactory()); if (!this .logStorage.init(lsOpts)) { LOG.error("Fail to init logStorage" ); return false ; } this .firstLogIndex = this .logStorage.getFirstLogIndex(); this .lastLogIndex = this .logStorage.getLastLogIndex(); this .diskId = new LogId(this .lastLogIndex, getTermFromLogStorage(this .lastLogIndex)); this .fsmCaller = opts.getFsmCaller(); this .disruptor = DisruptorBuilder.<StableClosureEvent>newInstance() .setEventFactory(new StableClosureEventFactory()) .setRingBufferSize(opts.getDisruptorBufferSize()) .setThreadFactory(new NamedThreadFactory("JRaft-LogManager-Disruptor-" , true )) .setProducerType(ProducerType.MULTI) .setWaitStrategy(new TimeoutBlockingWaitStrategy( this .raftOptions.getDisruptorPublishEventWaitTimeoutSecs(), TimeUnit.SECONDS)) .build(); this .disruptor.handleEventsWith(new StableClosureEventHandler()); this .disruptor.setDefaultExceptionHandler(new LogExceptionHandler<Object>(this .getClass().getSimpleName(), (event, ex) -> reportError(-1 , "LogManager handle event error" ))); this .diskQueue = this .disruptor.start(); } finally { this .writeLock.unlock(); } return true ; }
整个 LogManager 的初始化过程除了对本地变量进行赋值外,主要做了两件事情:
初始化日志存储服务 LogStorage 实例。
创建并启动一个 Disruptor 队列,用于异步处理日志操作相关的事件。
LogStorage 接口定义了与 LogEntry 存储相关的 API,包括读写、截断,以及获取 logIndex 和 term 等。JRaft 默认基于 RocksDB 存储引擎对 LogEntry 提供本地存储和读写,相应的实现类包括 RocksDBLogStorage 和 RocksDBSegmentLogStorage。
本文同样重点关注针对 LogStorage 的初始化过程,由 RocksDBLogStorage#init
方法实现,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 public boolean init (final LogStorageOptions opts) { Requires.requireNonNull(opts.getConfigurationManager(), "Null conf manager" ); Requires.requireNonNull(opts.getLogEntryCodecFactory(), "Null log entry codec factory" ); this .writeLock.lock(); try { if (this .db != null ) { LOG.warn("RocksDBLogStorage init() already." ); return true ; } this .logEntryDecoder = opts.getLogEntryCodecFactory().decoder(); this .logEntryEncoder = opts.getLogEntryCodecFactory().encoder(); Requires.requireNonNull(this .logEntryDecoder, "Null log entry decoder" ); Requires.requireNonNull(this .logEntryEncoder, "Null log entry encoder" ); this .dbOptions = createDBOptions(); if (this .openStatistics) { this .statistics = new DebugStatistics(); this .dbOptions.setStatistics(this .statistics); } this .writeOptions = new WriteOptions(); this .writeOptions.setSync(this .sync); this .totalOrderReadOptions = new ReadOptions(); this .totalOrderReadOptions.setTotalOrderSeek(true ); return initAndLoad(opts.getConfigurationManager()); } catch (final RocksDBException e) { LOG.error("Fail to init RocksDBLogStorage, path={}." , this .path, e); return false ; } finally { this .writeLock.unlock(); } } private boolean initAndLoad (final ConfigurationManager confManager) throws RocksDBException { this .hasLoadFirstLogIndex = false ; this .firstLogIndex = 1 ; final List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(); final ColumnFamilyOptions cfOption = createColumnFamilyOptions(); this .cfOptions.add(cfOption); columnFamilyDescriptors.add(new ColumnFamilyDescriptor("Configuration" .getBytes(), cfOption)); columnFamilyDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, cfOption)); openDB(columnFamilyDescriptors); load(confManager); return onInitLoaded(); }
JRaft 在 RocksDB 中定义了两个 ColumnFamily,除了默认的 ColumnFamily 外,还定义了一个名为 Configuration
的 ColumnFamily 用于存储集群节点配置相关的 LogEntry 实例,而默认的 ColumnFamily 除了包含 Configuration
中的数据之外,还用于存储用户数据相关的 LogEntry 实例。本文如不做特殊说明,均使用 conf family 指代前者,使用 data family 指代后者。
上述方法中我们重点看一下对于 RocksDBLogStorage#load
方法的调用,该方法会从头遍历 conf family 中的数据,以从中加载之前集群节点的配置信息和 firstLogIndex 值。实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 private void load (final ConfigurationManager confManager) { checkState(); try (final RocksIterator it = this .db.newIterator(this .confHandle, this .totalOrderReadOptions)) { it.seekToFirst(); while (it.isValid()) { final byte [] ks = it.key(); final byte [] bs = it.value(); if (ks.length == 8 ) { final LogEntry entry = this .logEntryDecoder.decode(bs); if (entry != null ) { if (entry.getType() == EntryType.ENTRY_TYPE_CONFIGURATION) { final ConfigurationEntry confEntry = new ConfigurationEntry(); confEntry.setId(new LogId(entry.getId().getIndex(), entry.getId().getTerm())); confEntry.setConf(new Configuration(entry.getPeers(), entry.getLearners())); if (entry.getOldPeers() != null ) { confEntry.setOldConf(new Configuration(entry.getOldPeers(), entry.getOldLearners())); } if (confManager != null ) { confManager.add(confEntry); } } } else { LOG.warn("Fail to decode conf entry at index {}, the log data is: {}." , Bits.getLong(ks, 0 ), BytesUtil.toHex(bs)); } } else { if (Arrays.equals(FIRST_LOG_IDX_KEY, ks)) { setFirstLogIndex(Bits.getLong(bs, 0 )); truncatePrefixInBackground(0L , this .firstLogIndex); } else { LOG.warn("Unknown entry in configuration storage key={}, value={}." , BytesUtil.toHex(ks), BytesUtil.toHex(bs)); } } it.next(); } } }
具体执行过程如上述代码注释。JRaft 在从本地读取到 firstLogIndex 值之后,会启动一个后台线程,用于对本地记录的位于 firstLogIndex 之前的 LogEntry 进行剔除,实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private void truncatePrefixInBackground (final long startIndex, final long firstIndexKept) { Utils.runInThread(() -> { this .readLock.lock(); try { if (this .db == null ) { return ; } onTruncatePrefix(startIndex, firstIndexKept); this .db.deleteRange(this .defaultHandle, getKeyBytes(startIndex), getKeyBytes(firstIndexKept)); this .db.deleteRange(this .confHandle, getKeyBytes(startIndex), getKeyBytes(firstIndexKept)); } catch (final RocksDBException | IOException e) { LOG.error("Fail to truncatePrefix {}." , firstIndexKept, e); } finally { this .readLock.unlock(); } }); }
上述方法在启动初始化期间会将 [0, firstLogIndex)
之间的 LogEntry 从本地 RocksDB 中剔除。除此之外,方法 LogStorage#truncatePrefix
在执行时也是委托上述方法完成对从 firstLogIndex 到指定 logIndex 之间的日志数据进行剔除操作。
LogManager 在初始化期间还会创建并启动一个 Disruptor 队列,用于异步处理日志操作相关的事件,包括获取最新的 LogId、日志截断、重置日志数据存储服务,以及关闭日志管理器等。
方法 LogManagerImpl#offerEvent
定义了往该 Disruptor 消息队列发送消息的逻辑,而具体处理消息的逻辑则有 StableClosureEventHandler 类实现。StableClosureEventHandler 类实现自 EventHandler 接口,对应的 StableClosureEventHandler#onEvent
方法依据事件类型对消息实施分别处理,具体实现后续结合应用场景进行深入分析,这里不再展开。
JRaft 节点在初始化期间还会调用 LogManager#checkConsistency
方法对日志数据进行一致性校验,实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public Status checkConsistency () { this .readLock.lock(); try { Requires.requireTrue(this .firstLogIndex > 0 ); Requires.requireTrue(this .lastLogIndex >= 0 ); if (this .lastSnapshotId.equals(new LogId(0 , 0 ))) { if (this .firstLogIndex == 1 ) { return Status.OK(); } return new Status(RaftError.EIO, "Missing logs in (0, %d)" , this .firstLogIndex); } else { if (this .lastSnapshotId.getIndex() >= this .firstLogIndex - 1 && this .lastSnapshotId.getIndex() <= this .lastLogIndex) { return Status.OK(); } return new Status(RaftError.EIO, "There's a gap between snapshot={%d, %d} and log=[%d, %d] " , this .lastSnapshotId.toString(), this .lastSnapshotId.getTerm(), this .firstLogIndex, this .lastLogIndex); } } finally { this .readLock.unlock(); } }
校验的逻辑主要是确保快照数据与当前数据的连续性,不允许存在数据断层。
元数据存储
JRaft 节点在初始化期间会调用 NodeImpl#initMetaStorage
方法初始化元数据存储模块,这里的元数据包括 currentTerm 值和当前节点的 votedFor 信息。该方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private boolean initMetaStorage () { this .metaStorage = this .serviceFactory .createRaftMetaStorage(this .options.getRaftMetaUri(), this .raftOptions); RaftMetaStorageOptions opts = new RaftMetaStorageOptions(); opts.setNode(this ); if (!this .metaStorage.init(opts)) { LOG.error("Node {} init meta storage failed, uri={}." , this .serverId, this .options.getRaftMetaUri()); return false ; } this .currTerm = this .metaStorage.getTerm(); this .votedId = this .metaStorage.getVotedFor().copy(); return true ; }
JRaft 定义了 RaftMetaStorage 接口用于抽象元数据存储服务,该接口的定义如下:
1 2 3 4 5 6 7 8 9 public interface RaftMetaStorage extends Lifecycle <RaftMetaStorageOptions >, Storage { boolean setTerm (final long term) ; long getTerm () ; boolean setVotedFor (final PeerId peerId) ; PeerId getVotedFor () ; boolean setTermAndVotedFor (final long term, final PeerId peerId) ; }
针对该接口,JRaft 提供了 LocalRaftMetaStorage 实现类,基于本地文件系统采用 protobuf 协议对元数据执行序列化之后进行存储。
LocalRaftMetaStorage 在初始化时(即执行 LocalRaftMetaStorage#init
方法期间)会从本地文件系统加载并反序列化元数据,以初始化 currentTerm 和 votedFor 属性值。运行期间对于这两个属性值的更改全部记录在内存中,并在关闭时(即执行 LocalRaftMetaStorage#shutdown
方法期间)将内存中的数据序列化后落盘。
快照数据存储
JRaft 节点在初始化期间会调用 NodeImpl#initSnapshotStorage
方法初始化快照数据存储。与日志数据存储模块的设计相类似,JRaft 针对快照数据存储模块同样采用了操作与存储相分离的策略,其中 SnapshotExecutor 主要负责生成和安装快照,而 SnapshotStorage 则主要负责针对快照文件的读写,以及从远端 Leader 节点拷贝快照数据。
方法 NodeImpl#initSnapshotStorage
的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private boolean initSnapshotStorage () { if (StringUtils.isEmpty(this .options.getSnapshotUri())) { LOG.warn("Do not set snapshot uri, ignore initSnapshotStorage." ); return true ; } this .snapshotExecutor = new SnapshotExecutorImpl(); final SnapshotExecutorOptions opts = new SnapshotExecutorOptions(); opts.setUri(this .options.getSnapshotUri()); opts.setFsmCaller(this .fsmCaller); opts.setNode(this ); opts.setLogManager(this .logManager); opts.setAddr(this .serverId != null ? this .serverId.getEndpoint() : null ); opts.setInitTerm(this .currTerm); opts.setFilterBeforeCopyRemote(this .options.isFilterBeforeCopyRemote()); opts.setSnapshotThrottle(this .options.getSnapshotThrottle()); return this .snapshotExecutor.init(opts); }
上述实现主要是用来创建和初始化 SnapshotExecutor,同时我们也可以看到快照机制对于 Raft 算法而言并不是必须的。如果一个应用并不会让 Raft 算法的运行产生大量的日志文件,或者对应的日志无法被压缩,则无需启动快照机制。JRaft 在初始化快照存储模块时会检查应用是否设置了 snapshotUri 参数,如果未设置则表明业务不希望启动快照机制。
SnapshotExecutor 的初始化过程主要是从本地加载最新的快照文件数据,对应的 SnapshotExecutorImpl#init
方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 public boolean init (final SnapshotExecutorOptions opts) { if (StringUtils.isBlank(opts.getUri())) { LOG.error("Snapshot uri is empty." ); return false ; } this .logManager = opts.getLogManager(); this .fsmCaller = opts.getFsmCaller(); this .node = opts.getNode(); this .term = opts.getInitTerm(); this .snapshotStorage = this .node.getServiceFactory() .createSnapshotStorage(opts.getUri(), this .node.getRaftOptions()); if (opts.isFilterBeforeCopyRemote()) { this .snapshotStorage.setFilterBeforeCopyRemote(); } if (opts.getSnapshotThrottle() != null ) { this .snapshotStorage.setSnapshotThrottle(opts.getSnapshotThrottle()); } if (!this .snapshotStorage.init(null )) { LOG.error("Fail to init snapshot storage." ); return false ; } final LocalSnapshotStorage tmp = (LocalSnapshotStorage) this .snapshotStorage; if (tmp != null && !tmp.hasServerAddr()) { tmp.setServerAddr(opts.getAddr()); } final SnapshotReader reader = this .snapshotStorage.open(); if (reader == null ) { return true ; } this .loadingSnapshotMeta = reader.load(); if (this .loadingSnapshotMeta == null ) { LOG.error("Fail to load meta from {}." , opts.getUri()); Utils.closeQuietly(reader); return false ; } LOG.info("Loading snapshot, meta={}." , this .loadingSnapshotMeta); this .loadingSnapshot = true ; this .runningJobs.incrementAndGet(); final FirstSnapshotLoadDone done = new FirstSnapshotLoadDone(reader); Requires.requireTrue(this .fsmCaller.onSnapshotLoad(done)); try { done.waitForRun(); } catch (final InterruptedException e) { LOG.warn("Wait for FirstSnapshotLoadDone run is interrupted." ); Thread.currentThread().interrupt(); return false ; } finally { Utils.closeQuietly(reader); } if (!done.status.isOk()) { LOG.error("Fail to load snapshot from {}, FirstSnapshotLoadDone status is {}." , opts.getUri(), done.status); return false ; } return true ; }
关于加载快照数据的执行过程(即 FSMCaller#onSnapshotLoad
方法的实现)我们将在后面介绍 JRaft 快照机制的文章中针对性的介绍,这里暂且跳过。
状态机调度器
前面我们曾简单介绍过 StateMachine 接口,JRaft 通过该接口抽象描述了 Raft 算法中引入的状态机。这也是 JRaft 向业务透传自己运行状态的核心接口,业务可以通过该接口捕获 JRaft 的运行事件。除了最核心的应用 LogEntry 中的指令外,还包括当前节点作为 LEADER 或 FOLLOWER 角色的启停事件、集群节点配置变更、快照加载与存储,以及集群运行错误与停机等。
StateMachine 接口定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public interface StateMachine { void onApply (final Iterator iter) ; void onShutdown () ; void onSnapshotSave (final SnapshotWriter writer, final Closure done) ; boolean onSnapshotLoad (final SnapshotReader reader) ; void onLeaderStart (final long term) ; void onLeaderStop (final Status status) ; void onError (final RaftException e) ; void onConfigurationCommitted (final Configuration conf) ; void onStopFollowing (final LeaderChangeContext ctx) ; void onStartFollowing (final LeaderChangeContext ctx) ; }
那么 JRaft 是如何将这些事件通知到业务的呢?具体点来说,通知到业务实现的状态机的呢?这就是状态机调度器 FSMCaller 所做的工作。该接口定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public interface FSMCaller extends Lifecycle <FSMCallerOptions >, Describer { void addLastAppliedLogIndexListener (final LastAppliedLogIndexListener listener) ; boolean onCommitted (final long committedIndex) ; boolean onSnapshotLoad (final LoadSnapshotClosure done) ; boolean onSnapshotSave (final SaveSnapshotClosure done) ; boolean onLeaderStop (final Status status) ; boolean onLeaderStart (final long term) ; boolean onStartFollowing (final LeaderChangeContext ctx) ; boolean onStopFollowing (final LeaderChangeContext ctx) ; boolean onError (final RaftException error) ; long getLastAppliedIndex () ; void join () throws InterruptedException ; }
从 StateMachine 和 FSMCaller 接口的定义上是不是可以看出有一种相互呼应的感觉呢。简而言之,JRaft 通过调用 FSMCaller 中声明的方法实现将内部运行状态透传给业务,而 FSMCaller 在本地则基于 Disruptor 消息队列以事件的形式缓存这些内部状态,并通过异步的方式回调 StateMachine 接口声明的相应方法,这就是 FSMCaller 整体的运行逻辑。
本小节重点介绍 FSMCaller 的初始化过程和整体执行流程,具体实现细节层面先不展开,留到后面结合具体场景再深入分析。
JRaft 节点在初始化期间会调用 NodeImpl#initFSMCaller
方法对 FSMCaller 进行初始化,该方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private boolean initFSMCaller (final LogId bootstrapId) { if (this .fsmCaller == null ) { LOG.error("Fail to init fsm caller, null instance, bootstrapId={}." , bootstrapId); return false ; } this .closureQueue = new ClosureQueueImpl(); final FSMCallerOptions opts = new FSMCallerOptions(); opts.setAfterShutdown(status -> afterShutdown()); opts.setLogManager(this .logManager); opts.setFsm(this .options.getFsm()); opts.setClosureQueue(this .closureQueue); opts.setNode(this ); opts.setBootstrapId(bootstrapId); opts.setDisruptorBufferSize(this .raftOptions.getDisruptorBufferSize()); return this .fsmCaller.init(opts); }
FSMCaller 在初始化期间(即执行 FSMCallerImpl#init
方法)除了完成一些属性的赋值工作外,主要是创建和启动了一个 Disruptor 队列,用于异步处理各种状态机事件:
1 2 3 4 5 6 7 8 9 10 this .disruptor = DisruptorBuilder.<ApplyTask>newInstance() .setEventFactory(new ApplyTaskFactory()) .setRingBufferSize(opts.getDisruptorBufferSize()) .setThreadFactory(new NamedThreadFactory("JRaft-FSMCaller-Disruptor-" , true )) .setProducerType(ProducerType.MULTI) .setWaitStrategy(new BlockingWaitStrategy()) .build(); this .disruptor.handleEventsWith(new ApplyTaskHandler());this .disruptor.setDefaultExceptionHandler(new LogExceptionHandler<Object>(getClass().getSimpleName()));this .taskQueue = this .disruptor.start();
方法 FSMCallerImpl#enqueueTask
用于往该 Disruptor 队列写入具体的状态机事件,而 FSMCallerImpl 之于 FSMCaller 接口中声明的方法在实现层面基本上都是简单的调用了该方法。这里以 FSMCallerImpl#onCommitted
方法为例,实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public boolean onCommitted (final long committedIndex) { return enqueueTask((task, sequence) -> { task.type = TaskType.COMMITTED; task.committedIndex = committedIndex; }); } private boolean enqueueTask (final EventTranslator<ApplyTask> tpl) { if (this .shutdownLatch != null ) { LOG.warn("FSMCaller is stopped, can not apply new task." ); return false ; } if (!this .taskQueue.tryPublishEvent(tpl)) { setError(new RaftException(ErrorType.ERROR_TYPE_STATE_MACHINE, new Status(RaftError.EBUSY, "FSMCaller is overload." ))); return false ; } return true ; }
那么 FSMCaller 又是怎么处理这些消息队列中的事件的呢?熟悉 Disruptor 的同学应该都知道这个时候应该去看 FSMCaller 是如何实现 EventHandler 接口的。FSMCaller 针对 EventHandler 接口定义了 ApplyTaskHandler 实现类:
1 2 3 4 5 6 7 8 9 10 private class ApplyTaskHandler implements EventHandler <ApplyTask > { private long maxCommittedIndex = -1 ; @Override public void onEvent ( final ApplyTask event, final long sequence, final boolean endOfBatch) throws Exception { this .maxCommittedIndex = runApplyTask(event, this .maxCommittedIndex, endOfBatch); } }
ApplyTaskHandler 通过调用 FSMCallerImpl#runApplyTask
方法对 Disruptor 消息队列中缓存的状态机事件进行处理。该方法本质上是一个事件分发器,基于具体的状态机事件类型调用对应的 do*
方法实现对事件的处理操作。
方法 FSMCallerImpl#runApplyTask
的实现比较直观,不再展开,关于各个 do*
方法的实现将留到后续结合具体场景展开分析。
选票箱
投票机制是 Raft 算法运行的基础,JRaft 在实现上为每个节点都设置了一个选票箱 BallotBox 实例,用于对 LogEntry 是否提交进行仲裁。
JRaft 节点在初始化期间会创建并初始化自己的选票箱,具体过程比较简单,实现如下:
1 2 3 4 5 6 7 8 9 this .ballotBox = new BallotBox();final BallotBoxOptions ballotBoxOpts = new BallotBoxOptions();ballotBoxOpts.setWaiter(this .fsmCaller); ballotBoxOpts.setClosureQueue(this .closureQueue); if (!this .ballotBox.init(ballotBoxOpts)) { LOG.error("Node {} init ballotBox failed." , getNodeId()); return false ; }
这里需要注意的一点是,BallotBox 中持有的 ClosureQueue 实例是在前面介绍的 NodeImpl#initFSMCaller
中创建的,所以 FSMCaller 和 BallotBox 对象持有的 ClosureQueue 实例是同一个。BallotBox 负责往 ClosureQueue 中写数据,而 FSMCaller 则负责从 ClosureQueue 中读数据。
总结
本文我们通过一个 Leader 选举的示例介绍了 JRaft 算法库的基本使用,并对 JRaft 的整体架构设计和节点的初始化过程进行了分析。总的来说,JRaft 在模块划分上还是比较清晰的,不过也有值得吐槽的一点,例如 Node 类的实现太重,是否可以通过类似状态模式一类的思想重构一下?
参考
Raft Consensus Algorithm
SOFA-JRaft 官网
SOFAJRaft:生产级高性能 Java 实现
SOFAJRaft:生产级 Raft 算法库存储模块剖析