上一篇我们分析了 supervisor 节点的启动和运行过程,提及到 supervisor 的核心工作就是基于 ZK 从 nimbus 节点领取分配给它的任务,并启动 worker 执行。一个 worker 就是一个 JVM 进程,运行在 supervisor 节点上,多个 task 可以同时运行在一个 worker 进程之中,每个 task 都对应一个线程。
Worker 进程的启动位于 Worker 类中,前面我们在分析 supervisor 节点的启动过程时提及到了对于 Worker 类 main 函数的触发,supervisor 在启动相应 worker 进程时会指定 topologyId、supervisorId、workerPort、workerId,以及 classpath 等参数,worker 在拿到这些参数之后会先获取当前机器上端口对应的老进程,并逐一 kill 掉,然后调用 Worker#mk_worker
方法创建并启动对应的 worker 实例,该方法的核心实现如下:
1 2
| Worker w = new Worker(conf, context, topologyId, supervisorId, port, workerId, jarPath); return w.execute();
|
Worker 类仅包含一个实例属性 WorkerData,它封装了所有与 worker 运行相关的属性,实例化 Worker 对象的过程也是初始化 WorkerData 属性的过程,该过程主要包含以下工作:
- 初始化基本属性(包括运行基本配置项、消息上下文对象等),同时设置初始状态;
- 检查当前运行模式,对于集群模式会在本地创建相应的工作目录
workers/${worker_id}/pids
;
- 从 ZK 获取当前集群的运行状态;
- 加载 topology 配置信息,并注册相应的配置动态更新监听器;
- 注册一系列 mertics 监控项,用于打点 worker 的运行状态;
- 加载当前 topology 对应的序列化对象;
- 创建并初始化相关的消息传输队列。
初始化完成之后会调用 Worker#execute
方法创建并启动 worker 进程,该方法主要的执行流程可以概括如下:
- 为当前 worker 创建并启动一个 socket 连接,用于接收消息并分发给名下的 task 线程;
- 启动一个线程用于维护当前 worker 状态变更时,更新与其它 worker 之间的连接关系;
- 启动一个线程用于定期获取当前 topology 在 ZK 上的基本信息,当 topology 状态发生变更时触发本地相应操作;
- 启动一个线程循环消费当前 worker 的 tuple 队列发送给对应的下游 task 线程;
- 启动一个线程用于定期更新本地的 worker 心跳信息;
- 创建并启动当前 worker 下所有的 task 任务。
方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| public WorkerShutdown execute() throws Exception { List<AsyncLoopThread> threads = new ArrayList<>();
AsyncLoopThread controlRvThread = this.startDispatchThread(); threads.add(controlRvThread);
RefreshConnections refreshConn = this.makeRefreshConnections(); AsyncLoopThread refreshConnLoopThread = new AsyncLoopThread(refreshConn, false, Thread.MIN_PRIORITY, true); threads.add(refreshConnLoopThread);
RefreshActive refreshZkActive = new RefreshActive(workerData); AsyncLoopThread refreshZk = new AsyncLoopThread(refreshZkActive, false, Thread.MIN_PRIORITY, true); threads.add(refreshZk);
DrainerCtrlRunnable drainerCtrlRunnable = new DrainerCtrlRunnable(workerData, MetricDef.SEND_THREAD); AsyncLoopThread controlSendThread = new AsyncLoopThread(drainerCtrlRunnable, false, Thread.MAX_PRIORITY, true); threads.add(controlSendThread);
AsyncLoopThread syncContainerHbThread = SyncContainerHb.mkWorkerInstance(workerData.getStormConf()); if (syncContainerHbThread != null) { threads.add(syncContainerHbThread); }
JStormMetricsReporter metricReporter = new JStormMetricsReporter(workerData); metricReporter.init(); workerData.setMetricsReporter(metricReporter);
RunnableCallback heartbeatFn = new WorkerHeartbeatRunnable(workerData); AsyncLoopThread hb = new AsyncLoopThread(heartbeatFn, false, null, Thread.NORM_PRIORITY, true); threads.add(hb);
List<TaskShutdownDaemon> shutdownTasks = this.createTasks(); workerData.setShutdownTasks(shutdownTasks);
List<AsyncLoopThread> serializeThreads = workerData.setSerializeThreads(); threads.addAll(serializeThreads); List<AsyncLoopThread> deserializeThreads = workerData.setDeserializeThreads(); threads.addAll(deserializeThreads);
return new WorkerShutdown(workerData, threads); }
|
消息接收与分发
Storm 会为 worker 基于 Netty 创建并返回一个 socket 连接用于接收消息,同时 worker 与名下所有 task 之间会维持一个传输队列,并启动一个线程循环消费接收到的消息投递给对应 task 的传输队列中。该过程位于 Worker#startDispatchThread
方法中,该方法实现如下(去掉了一些非关键代码):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| private AsyncLoopThread startDispatchThread() { IContext context = workerData.getContext(); String topologyId = workerData.getTopologyId();
Map stormConf = workerData.getStormConf(); long timeout = JStormUtils.parseLong(stormConf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT), 10); WaitStrategy waitStrategy = new TimeoutBlockingWaitStrategy(timeout, TimeUnit.MILLISECONDS); int queueSize = JStormUtils.parseInt(stormConf.get(Config.TOPOLOGY_CTRL_BUFFER_SIZE), 256); DisruptorQueue recvControlQueue = DisruptorQueue.mkInstance("Dispatch-control", ProducerType.MULTI, queueSize, waitStrategy, false, 0, 0);
IConnection recvConnection = context.bind( topologyId, workerData.getPort(), workerData.getDeserializeQueues(), recvControlQueue, false, workerData.getTaskIds()); workerData.setRecvConnection(recvConnection);
RunnableCallback recvControlDispatcher = new VirtualPortCtrlDispatch( workerData, recvConnection, recvControlQueue, MetricDef.RECV_THREAD); return new AsyncLoopThread(recvControlDispatcher, false, Thread.MAX_PRIORITY, true); }
|
这里的消息队列底层都依赖于 disruptor 实现,最终对于接收到的消息都会调用 VirtualPortCtrlDispatch#handleEvent
方法进行处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public void handleEvent(Object event, boolean endOfBatch) throws Exception { TaskMessage message = (TaskMessage) event; int task = message.task();
Object tuple = null; try { tuple = this.deserialize(message.message(), task); } catch (Throwable e) { if (Utils.exceptionCauseIsInstanceOf(KryoException.class, e)) { throw new RuntimeException(e); } LOG.warn("serialize msg error", e); }
DisruptorQueue queue = controlQueues.get(task); if (queue == null) { LOG.warn("Received invalid control message for task-{}, Dropping...{} ", task, tuple); return; } if (tuple != null) { queue.publish(tuple); } }
|
创建并启动用于维护 worker 之间连接关系的线程
在这一步会创建一个 RefreshConnections 对象,它继承了 RunnableCallback 类,所以同样是被异步循环线程模型接管(按照指定间隔循环调用其 RefreshConnections#run
方法),Storm 会定期检测 ZK 上的 topology 任务分配信息是否有更新,如果有比本地更新的任务分配(依赖于任务分配时间戳进行判定)则会判断新任务分配的类型来相应的更新本地的信息。
如果当前的任务分配类型仅仅是更新集群上已有的 topology,则 Storm 会遍历通知各个 task 执行相应的更新操作,同时会回调已注册的所有更新监听器以更新配置信息,实现如下:
1 2 3 4 5 6 7 8 9 10 11
| List<TaskShutdownDaemon> taskShutdowns = workerData.getShutdownTasks(); Map newConf = StormConfig.read_supervisor_topology_conf(conf, topologyId); workerData.getStormConf().putAll(newConf); for (TaskShutdownDaemon taskSD : taskShutdowns) { taskSD.update(newConf); }
workerData.getUpdateListener().update(newConf); workerData.setAssignmentType(AssignmentType.UpdateTopology);
|
如果当前是更新以外的任务分配类型(Assign、ScaleTopology),则 Storm 会从新的任务分配信息中分别获取新增的、待删除的,以及需要更新的 taskId 列表,并执行相应的创建、删除,以及更新 task 操作,同时会更新 worker 上所有 task 的下游 task 列表信息。部分代码实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| Set<Integer> addedTasks = this.getAddedTasks(assignment);
Set<Integer> removedTasks = this.getRemovedTasks(assignment);
Set<Integer> updatedTasks = this.getUpdatedTasks(assignment);
workerData.updateWorkerData(assignment); workerData.updateKryoSerializer();
this.shutdownTasks(removedTasks);
this.createTasks(addedTasks);
this.updateTasks(updatedTasks);
Set<Integer> tmpOutboundTasks = Worker.worker_output_tasks(workerData); if (!outboundTasks.equals(tmpOutboundTasks)) { for (int taskId : tmpOutboundTasks) { if (!outboundTasks.contains(taskId)) { workerData.addOutboundTaskStatusIfAbsent(taskId); } } for (int taskId : workerData.getOutboundTaskStatus().keySet()) { if (!tmpOutboundTasks.contains(taskId)) { workerData.removeOutboundTaskStatus(taskId); } } workerData.setOutboundTasks(tmpOutboundTasks); outboundTasks = tmpOutboundTasks; } workerData.setAssignmentType(AssignmentType.Assign);
|
创建并启动定期获取 topology 基本信息的线程
在这一步会创建一个 RefreshActive 对象,它同样继承了 RunnableCallback 类,所以同样也是被异步循环线程模型接管(按照指定间隔循环调用其 RefreshActive#run
方法),Storm 会定期获取当前 topology 在 ZK 上的基本信息,当 topology 状态发生变更时触发本地执行相应的操作。
如果 topology 状态信息变为 active、upgrading,或者 rollback 时,Storm 会依次将本地 task 的状态设置为 TaskStatus.RUN
,如果当前 task 对应的组件是 spout,则会触发 ISpout#activate
方法。如果当前 topology 状态不为 inactive 时,Storm 会依次将本地的 task 状态设置为 TaskStatus.PAUSE
,如果当前 task 对应的组件是 spout,则会触发 ISpout#deactivate
方法。最后更新本地记录的 topology 状态。相关实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| if (newTopologyStatus.equals(StatusType.active) || newTopologyStatus.equals(StatusType.upgrading) || newTopologyStatus.equals(StatusType.rollback)) { for (TaskShutdownDaemon task : tasks) { if (task.getTask().getTaskStatus().isInit()) { task.getTask().getTaskStatus().setStatus(TaskStatus.RUN); } else { task.active(); } } } else if (oldTopologyStatus == null || !oldTopologyStatus.equals(StatusType.inactive)) { for (TaskShutdownDaemon task : tasks) { if (task.getTask().getTaskStatus().isInit()) { task.getTask().getTaskStatus().setStatus(TaskStatus.PAUSE); } else { task.deactive(); } } } workerData.setTopologyStatus(newTopologyStatus);
|
创建并启动循环消费 worker tuple 队列的线程
在这一步会创建一个 DrainerCtrlRunnable 对象,它同样继承了 RunnableCallback 类,所以同样也是被异步循环线程模型接管(按照指定间隔循环调用其 DrainerCtrlRunnable#run
方法),Storm 会循环消费当前 worker 的 tuple 队列 transferCtrlQueue,并最终调用 DrainerCtrlRunnable#handleEvent
方法对拿到的消息进行处理,该方法的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public void handleEvent(Object event, boolean endOfBatch) throws Exception { if (event == null) { return; } ITupleExt tuple = (ITupleExt) event; int targetTask = tuple.getTargetTaskId();
IConnection conn = this.getConnection(targetTask); if (conn != null) { byte[] tupleMessage = null; try { tupleMessage = this.serialize(tuple); } catch (Throwable e) { } TaskMessage message = new TaskMessage(TaskMessage.CONTROL_MESSAGE, targetTask, tupleMessage); conn.sendDirect(message); } }
|
方法的逻辑比较简单,拿到当前 tuple 对应的下游 taskId,然后与之建立连接(netty)并将 tuple 发送给它。
创建并启动当前 worker 下所有的 task 线程
方法 Worker#createTasks
用于为当前 worker 下的所有 task 任务创建一个 Task 对象,并为每个 task 启动一个线程执行,同时为每个 task 任务创建一个 TaskShutdownDaemon 对象用于管理对应的 task 线程,方法的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| private List<TaskShutdownDaemon> createTasks() throws Exception { List<TaskShutdownDaemon> shutdownTasks = new ArrayList<>();
Set<Integer> taskIds = workerData.getTaskIds();
Set<Thread> threads = new HashSet<>(); List<Task> taskArrayList = new ArrayList<>(); for (int taskId : taskIds) { Task task = new Task(workerData, taskId); Thread thread = new Thread(task); threads.add(thread); taskArrayList.add(task); thread.start(); } for (Thread thread : threads) { thread.join(); } for (Task t : taskArrayList) { shutdownTasks.add(t.getTaskShutdownDameon()); } return shutdownTasks; }
|
Task 类实现了 Runnable 接口,其 run 方法中简单调用了 Task#execute
方法,该方法首先会向系统 bolt 发送一条“startup”消息,然后依据当前的组件类型创建对应的任务执行器,创建的过程位于 Task#mkExecutor
方法中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public BaseExecutors mkExecutor() { BaseExecutors baseExecutor = null;
if (taskObj instanceof IBolt) { if (taskId == topologyContext.getTopologyMasterId()) { baseExecutor = new TopologyMasterBoltExecutors(this); } else { baseExecutor = new BoltExecutors(this); } } else if (taskObj instanceof ISpout) { if (this.isSingleThread(stormConf)) { baseExecutor = new SingleThreadSpoutExecutors(this); } else { baseExecutor = new MultipleThreadSpoutExecutors(this); } }
return baseExecutor; }
|
BaseExecutors 类是一个 RunnableCallback 类,所以其 run 方法会被异步循环调用。继承自 BaseExecutors 类有 5 个(如下),而 Task#mkExecutor
方法基于组件类型分别选择了相应的实现类进行实例化。
- BoltExecutors
- TopologyMasterBoltExecutors
- SpoutExecutors
- SingleThreadSpoutExecutors
- MultipleThreadSpoutExecutors
先来看一下 BoltExecutors 和 TopologyMasterBoltExecutors,这是 bolt 组件的任务执行器,其中 TopologyMasterBoltExecutors 继承自 BoltExecutors,所以接下来我们主要来看一下 BoltExecutors 的实现。BoltExecutors 类的 run 方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public void run() { if (!isFinishInit) { this.initWrapper(); } while (!taskStatus.isShutdown()) { try { this.consumeExecuteQueue(); } catch (Throwable e) { } } }
|
方法首先会判定是否完成了初始化操作,如果未完成则会调用 BaseExecutors#initWrapper
执行初始化,这期间主要是调用了 IBolt#prepare
方法,这也是我们在实现一个 bolt 时执行初始化的方法。如果当前 task 线程没有被销毁,则会一直循环调用 BoltExecutors#consumeExecuteQueue
消费当前 task 的消息队列。前面的分析我们知道 worker 会对接收到的消息按照 taskId 投递给对应 task 的消息队列,而消息队列的消费过程就在这里发生。针对接收到消息会逐条进行处理,这里最终调用的是 BoltExecutors#onEvent
方法,处理的消息就是我们熟悉的 Tuple 对象,而该方法的核心就是调用 IBolt#execute
方法,也就是调用用户自定义的策略对收到的 tuple 进行处理。
再来看一下 SingleThreadSpoutExecutors 和 MultipleThreadSpoutExecutors,这两类都继承自 SpoutExecutors 类,区别仅在于对于消息的附加处理和正常的业务逻辑是否位于同一个线程中,而核心逻辑都是调用 ISpout#nextTuple
方法,也就是执行用户自定义的业务逻辑。
针对 worker 的运行机制就分析到这里,但是 Storm 对于消息的处理并没有结束,下一篇我们将一起探寻 ack 机制,看看 Storm 如何保证消息至少被执行一次(at least once)。