Supervisor 节点可以理解为单机任务调度器,它负责监听 nimbus 节点的任务资源分配,启动相应的 worker 进程执行 nimbus 分配给当前节点的任务,同时监测 worker 的运行状态,一旦发现有 worker 运行异常,就会杀死该 worker 进程,并将原先分配给 worker 的任务交还给 nimbus 节点进行重新分配。
Supervisor 节点的启动过程位于 Supervisor 类中,main 方法的实现比较简单,主要就是创建了一个 Supervisor 类对象,并调用实例方法 Supervisor#run
,该方法的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| public void run() { try {
Map<Object, Object> conf = Utils.readStormConfig();
StormConfig.validate_distributed_mode(conf);
this.createPid(conf);
SupervisorManger supervisorManager = this.mkSupervisor(conf, null);
JStormUtils.redirectOutput("/dev/null");
this.initShutdownHook(supervisorManager);
while (!supervisorManager.isFinishShutdown()) { try { Thread.sleep(1000); } catch (InterruptedException ignored) { } } } }
|
整个方法的逻辑比较清晰(如代码注释),核心实现位于 Supervisor#mkSupervisor
方法中,该方法主要用于创建和启动 supervisor 节点,基本执行流程如下:
- 创建并清空本地存放临时文件的目录(这些文件是从 nimbus 节点下载而来);
- 创建 ZK 操作对象和 worker 运行错误数据上报器;
- 创建 LocalState 对象,并获取(或创建)当前 supervisor 节点对应的 ID;
- 启动心跳机制,同步节点信息到 ZK;
- 启动并定期执行
SyncSupervisorEvent#run()
方法(默认间隔 10 秒),从 nimbus 节点领取分配给当前节点的任务并启动执行;
- 启动轻量级 HTTP 服务,主要用于查看和下载当前节点的运行日志数据;
- 启动 supervisor 运行状况检查机制和 nimbus 节点配置同步策略。
Supervisor 节点在启动时首先会在本地创建并清空临时目录(路径:supervisor/tmp
),Supervisor 从 nimbus 节点下载下来的文件会临时存放在这里,包括 stormcode.cer、stormconf.cer、stormjar.jar,以及 lib 目录下面的文件等,经过简单处理之后会将其复制到 stormdist/${topology_id}
本地目录中,supervisor 本地文件说明如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| + ${supervisor_local_dir} | ---- + supervisor | ---- | ---- + stormdist | ---- | ---- | ---- + ${topology_id} | ---- | ---- | ---- | ---- + resources: 指定 topology 程序包 resources 目录下面的所有文件 | ---- | ---- | ---- | ---- + stormjar.jar: 包含指定 topology 所有代码的 jar 文件 | ---- | ---- | ---- | ---- + stormcode.ser: 包含指定 topology 对象的序列化文件 | ---- | ---- | ---- | ---- + stormconf.ser: 包含指定 topology 的配置信息文件 | ---- | ---- + localstate: 本地状态信息 | ---- | ---- + tmp: 临时目录,从 nimbus 下载的文件的临时存储目录,简单处理之后复制到 stormdist/${topology_id} | ---- | ---- | ---- + ${uuid} | ---- | ---- | ---- | ---- + stormjar.jar: 从 nimbus 节点下载下来的 jar 文件 | ---- | ---- | ---- + ${topology_id} | ---- | ---- | ---- | ---- + stormjar.jar: 包含指定 topology 所有代码的 jar 文件(从 inbox 目录复制过来) | ---- | ---- | ---- | ---- + stormcode.ser: 包含指定 topology 对象的序列化文件 | ---- | ---- | ---- | ---- + stormconf.ser: 包含指定 topology 的配置信息文件
| ---- + workers | ---- | ---- + ${worker_id} | ---- | ---- | ---- + pids | ---- | ---- | ---- | ---- + ${pid}: 指定 worker 进程 ID | ---- | ---- | ---- + heartbeats | ---- | ---- | ---- | ---- + ${worker_id}: 指定 worker 心跳信息(心跳时间、worker 的进程 ID)
|
接下来 supervisor 会创建 StormClusterState 对象,用于操作 ZK 集群,同时还会创建一个 WorkerReportError 类对象,用于上报 worker 的运行错误数据到 ZK,该类仅包含一个实例方法 report,用于执行上报逻辑。然后 supervisor 节点会创建一个 LocalState 对象用于存储节点的状态信息,这是一个简单、低效的键值存储数据库,每一次操作都会落盘,在这里对应的落盘目录是 supervisor/localstate
。Supervisor 的 ID(UUID 字符串) 就存储在该数据库中,supervisor 启动时会先尝试从本地状态信息对象中获取 ID 值,如果不存在的话就会创建一个新的 UUID 字符串作为 ID。
Supervisor 节点在启动的过程中会初始化心跳机制,间隔指定时间将当前节点的相关信息上报给 ZK(路径:supervisors/${supervisor_id}
),包含当前 supervisor 节点的主机名、ID、最近一次上报时间、截止上次上报节点的运行时间,以及 worker 端口列表信息。相关信息的初始化在 Heartbeat 类对象实例化时进行设置,期间会依据当前机器 CPU 核心数和物理内存大小计算允许的 worker 端口数目,并默认从 6800 端口号开始分配 worker 端口。Supervisor 节点会启动一个线程,默认每间隔 60 秒调用 Heartbeat#update
方法同步心跳信息到 ZK,该方法的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public void update() { supervisorInfo.setTimeSecs(TimeUtils.current_time_secs()); supervisorInfo.setUptimeSecs(TimeUtils.current_time_secs() - startTime);
this.updateSupervisorInfo();
try { stormClusterState.supervisor_heartbeat(supervisorId, supervisorInfo); } catch (Exception e) { LOG.error("Failed to update SupervisorInfo to ZK", e); } }
|
具体过程如代码注释,下面是一个实际的心跳信息示例:
1 2 3 4 5 6 7 8 9
| { "hostName": "10.38.164.192", "supervisorId": "980bbcfd-5438-4e25-aee9-bf411304a446", "timeSecs": 1533373753, "uptimeSecs": 2879598, "workerPorts": [ 6912, 6900, 6901, 6902, 6903, 6904, 6905, 6906, 6907, 6908, 6909, 6910, 6911 ] }
|
下面来重点看一下 supervisor 节点领取分配给当前节点的任务并启动执行的过程。该过程的实现代码块如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
|
ConcurrentHashMap<String, String> workerThreadPids = new ConcurrentHashMap<>(); SyncProcessEvent syncProcessEvent = new SyncProcessEvent( supervisorId, conf, localState, workerThreadPids, sharedContext, workerReportError, stormClusterState);
EventManagerImp syncSupEventManager = new EventManagerImp(); AsyncLoopThread syncSupEventThread = new AsyncLoopThread(syncSupEventManager); threads.add(syncSupEventThread);
SyncSupervisorEvent syncSupervisorEvent = new SyncSupervisorEvent( supervisorId, conf, syncSupEventManager, stormClusterState, localState, syncProcessEvent, hb);
int syncFrequency = JStormUtils.parseInt(conf.get(Config.SUPERVISOR_MONITOR_FREQUENCY_SECS)); EventManagerPusher syncSupervisorPusher = new EventManagerPusher(syncSupEventManager, syncSupervisorEvent, syncFrequency);
AsyncLoopThread syncSupervisorThread = new AsyncLoopThread(syncSupervisorPusher); threads.add(syncSupervisorThread);
|
要理解该过程的运行机制,我们应该倒着来看相应的源码实现,首先看一下代码块的倒数第二行:
1
| AsyncLoopThread syncSupervisorThread = new AsyncLoopThread(syncSupervisorPusher);
|
由前面我们对 Storm 基本线程模型的分析可以知道,这行代码会启动一个线程去循环执行入参回调的 run 方法,这里也就是 EventManagerPusher#run
方法,该方法的实现比较简单:
1 2 3 4
| @Override public void run() { eventManager.add(event); }
|
也就是不断的调用 EventManager#add
方法(默认间隔时间为 10 秒),继续往前看我们知道这里的 EventManager 类实际实现是 EventManagerImp,而不断的调用其 add 方法添加的 event 本质上就是一个 SyncSupervisorEvent 实例对象。EventManagerImp 维护了一个阻塞队列来不断记录加入的 event,它本身也是一个回调,再往前看我们就可以看到它在实例化时也被 AsyncLoopThread 启动,EventManagerImp#run
方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13
| public void run() { try { RunnableCallback r = queue.take(); if (r == null) { return; } r.run(); e = r.error(); this.processInc(); } catch (InterruptedException e) { LOG.info("Interrupted when processing event."); } }
|
该方法就是不断的从阻塞队列中取出相应的回调并应用其 run 方法,也就是不断应用 SyncSupervisorEvent#run
方法。
以上就是步骤五的整体逻辑,简单描述就是定期的往阻塞队列中添加 SyncSupervisorEvent 事件,而线程会循环的消费队列,取出事件并应用事件的 run 方法。下面来深入分析一下 SyncSupervisorEvent 的 run 方法,该方法所做的工作也就是 supervisor 的核心逻辑,主要可以概括为 3 点:
- 从 ZK 上下载任务分配信息,并更新到本地;
- 从 nimbus 节点上下载 topology 对应的 jar 和配置文件;
- 启动 worker 执行分配给当前 supervisor 的 topology 任务。
SyncSupervisorEvent#run
方法的实现比较长,下面按照执行步骤逐步拆分进行分析,首先来看一下从 ZK 上下载任务分配信息,并更新到本地的过程,相应实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
|
if (healthStatus.isMoreSeriousThan(HealthStatus.ERROR)) { assignmentVersion.clear(); assignments.clear(); LOG.warn("Supervisor machine check status: " + healthStatus + ", killing all workers."); } else { this.getAllAssignments(assignmentVersion, assignments, syncCallback); } LOG.debug("Get all assignments " + assignments);
List<String> downloadedTopologyIds = StormConfig.get_supervisor_toplogy_list(conf); LOG.debug("Downloaded storm ids: " + downloadedTopologyIds);
Map<Integer, LocalAssignment> zkAssignment = this.getLocalAssign(stormClusterState, supervisorId, assignments);
Map<Integer, LocalAssignment> localAssignment; try { LOG.debug("Writing local assignment " + zkAssignment); localAssignment = (Map<Integer, LocalAssignment>) localState.get(Common.LS_LOCAL_ASSIGNMENTS); if (localAssignment == null) { localAssignment = new HashMap<>(); } localState.put(Common.LS_LOCAL_ASSIGNMENTS, zkAssignment); } catch (IOException e) { LOG.error("put LS_LOCAL_ASSIGNMENTS " + zkAssignment + " to localState failed"); throw e; }
|
Supervisor 节点在本地会缓存任务分配信息,同时会定期从 ZK 同步最新的任务分配信息到本地,从 ZK 上获取任务分配信息的逻辑位于 SyncSupervisorEvent#getAllAssignments
方法中,方法会从 ZK 的 assignments 路径下获取所有的 topologyId,并与本地比较对应 topology 的任务分配信息版本,如果版本有更新则更新本地缓存的任务分配信息。
接下来 supervisor 会计算所有需要下载的 topology,包括需要更新的、需要重新下载的(之前下载有失败),以及在当前节点进行灰度的,并从 nimbus 节点下载各个 topology 对应的文件,包括 stormjar.jar、stormcode.ser、stormconf.ser,以及 lib 目录下面的依赖文件(如果存在的话),最后从本地删除那些之前下载过但是本次未分配给当前 supervisor 节点的 topology 文件,相应实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
|
Set<String> updateTopologies = this.getUpdateTopologies(localAssignment, zkAssignment, assignments); Set<String> reDownloadTopologies = this.getNeedReDownloadTopologies(localAssignment); if (reDownloadTopologies != null) { updateTopologies.addAll(reDownloadTopologies); }
Map<String, Set<Pair<String, Integer>>> upgradeTopologyPorts = this.getUpgradeTopologies(stormClusterState, localAssignment, zkAssignment); if (upgradeTopologyPorts.size() > 0) { LOG.info("upgrade topology ports:{}", upgradeTopologyPorts); updateTopologies.addAll(upgradeTopologyPorts.keySet()); }
Map<String, String> topologyCodes = getTopologyCodeLocations(assignments, supervisorId);
Set<String> downloadFailedTopologyIds = new HashSet<>();
this.downloadTopology(topologyCodes, downloadedTopologyIds, updateTopologies, assignments, downloadFailedTopologyIds);
this.removeUselessTopology(topologyCodes, downloadedTopologyIds);
|
文件下载的逻辑位于 SyncSupervisorEvent#downloadTopology
方法中,文件下载的过程可以概括为以下 5 个步骤:
- 从 nimbus 上下载 topology 相关文件到 supervisor 的临时目录:
${storm.local.dir}/supervisor/tmp/${uuid}
;
- 抽取 stormjar.jar 的 resources 文件;
- 将临时目录下的文件移动到
${storm.local.dir}/supervisor/stormdist/${topology_id}
目录;
- 清空临时目录;
- 添加对应的时间戳文件:
${storm.local.dir}/supervisor/stormdist/${topology_id}/timestamp
。
最后 supervisor 节点会调用 SyncProcessEvent#run
方法杀死状态异常的 worker,同时启动新的 worker 执行分配的任务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
|
syncProcesses.run(zkAssignment, downloadFailedTopologyIds, upgradeTopologyPorts);
public void run(Map<Integer, LocalAssignment> localAssignments, Set<String> downloadFailedTopologyIds, Map<String, Set<Pair<String, Integer>>> upgradeTopologyPorts) {
LOG.debug("Syncing processes, interval (sec): " + TimeUtils.time_delta(lastTime)); lastTime = TimeUtils.current_time_secs(); try { if (localAssignments == null) { localAssignments = new HashMap<>(); } LOG.debug("Assigned tasks: " + localAssignments);
Map<String, StateHeartbeat> localWorkerStats; try { localWorkerStats = this.getLocalWorkerStats(conf, localState, localAssignments); } catch (Exception e) { LOG.error("Failed to get local worker stats"); throw e; } LOG.debug("Allocated: " + localWorkerStats);
Map<String, Integer> taskCleanupTimeoutMap; Set<Integer> keepPorts = null; try { taskCleanupTimeoutMap = (Map<String, Integer>) localState.get(Common.LS_TASK_CLEANUP_TIMEOUT); keepPorts = this.killUselessWorkers(localWorkerStats, localAssignments, taskCleanupTimeoutMap); localState.put(Common.LS_TASK_CLEANUP_TIMEOUT, taskCleanupTimeoutMap); } catch (IOException e) { LOG.error("Failed to kill workers", e); }
this.checkNewWorkers(conf);
this.checkNeedUpdateTopologies(localWorkerStats, localAssignments);
this.startNewWorkers(keepPorts, localAssignments, downloadFailedTopologyIds);
this.restartUpgradingWorkers(localAssignments, localWorkerStats, upgradeTopologyPorts);
} catch (Exception e) { LOG.error("Failed to init SyncProcessEvent", e); } }
|
无论是新任务分配,还是灰度更新,启动 worker 的过程都是调用了 SyncProcessEvent#startWorkers
方法,该方法为每个新的 worker 基于 UUID 创建一个 workerId,以及进程目录 ${storm.local.dir}/workers/${worker_id}/pids
,并调用 SyncProcessEvent#doLaunchWorker
方法启动 worker,同时更新 worker 在本地的相应数据。Worker 进程的启动和运行机制将在下一篇中进行详细说明。
在分析 nimbus 节点启动过程中有一步会启动一个 HTTP 服务,用于接收查询 nimbus 节点本地日志和配置等数据的需求,supervisor 节点的启动过程也同样包含这样一个过程。Supervisor 的 HTTP 服务默认会监听在 7622 端口,用于接收来自 UI 的请求。
最后对于集群模式,如果配置了 supervisor.enable.check=true
则 supervisor 节点在启动时会创建一个线程用于定期检查 supervisor 的运行状况,另外还会启动一个线程用于同步 nimbus 的配置信息到本地节点。最后会创建并返回一个 SupervisorManger 类对象,用于对于当前 supervisor 节点进行管理。
到此,supervisor 节点基本启动完成了,supervisor 会定期基于 ZK 从 nimbus 节点领取任务,然后启动 worker 去执行任务,而启动 worker 的过程我们将在下一篇中进行详细分析。