上一篇我们分析了 topology 构建和提交过程在客户端的逻辑,并最终通过 submitTopology
方法向 Storm 集群的 nimbus 节点提交任务。Nimbus 以 Thrift RPC 服务的方式运行,相应 thrift 接口方法实现位于 ServiceHandler 类中,下面我们从 ServiceHandler#submitTopology
方法切入,分析 nimbus 节点之于客户端提交任务的资源分配过程,该方法包装了 ServiceHandler#submitTopologyWithOpts
方法。
Storm 集群的任务提交主要分为三种类型:新任务提交、热部署,以及灰度发布。ServiceHandler#submitTopologyWithOpts
方法统一处理这三种情况,但是不管哪种提交方式都会首先验证 topology 名称和配置的合法性,然后基于具体提交类型分而治之。
灰度发布 & 热部署
首先来看灰度发布的情况,当客户端请求灰度发布时,nimbus 节点会检查对应 topology 在服务端的运行情况,只有状态为 ACTIVE 时才允许执行灰度发布。灰度发布的相关实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 TopologyInfo topologyInfo = this .getTopologyInfo(topologyId); if (topologyInfo == null ) { throw new TException("Failed to get topology info" ); } int workerNum = ConfigExtension.getUpgradeWorkerNum(serializedConf);String component = ConfigExtension.getUpgradeComponent(serializedConf); Set<String> workers = ConfigExtension.getUpgradeWorkers(serializedConf); if (!ConfigExtension.isTmSingleWorker(serializedConf, topologyInfo.get_topology().get_numWorkers())) { throw new TException("Gray upgrade requires that topology master to be a single worker, cannot perform the upgrade!" ); } return this .grayUpgrade(topologyId, uploadedJarLocation, topology, serializedConf, component, workers, workerNum);
对于允许灰度发布的场景,Storm 会基于当前提交 topology 的配置首先会尝试获取以下 3 个参数用于挑选 worker 进行发布:
topology.upgrade.worker.num
topology.upgrade.component
topology.upgrade.workers
如果同时指定了多个参数,方法会基于一定的优先级进行决策,具体如下:
如果参数 topology.upgrade.workers
不为空则忽略其他参数,挑选指定的 worker 进行发布,需要注意的是这些 worker 发布完之后,这个参数就自动置空;
否则查看参数 topology.upgrade.component
是否为空,如果不为空还需要查看参数 topology.upgrade.worker.num
是否为 0, 如果不为 0 则挑选指定工作组件下 topology.upgrade.worker.num
指定数目的 worker 进行发布,否则对这些工作组件下所有 worker 进行发布;
如果上面两个都为空,则随机挑选 topology.upgrade.worker.num
个 worker 进行发布。
灰度发布的具体执行流程位于 ServiceHandler#grayUpgrade
方法中,该方法实现比较冗长,故不在此贴出,下面参考源码和官网文档对发布的过程进行说明:
方法首先尝试从 ZK 获取当前 topology 对应的基本信息(路径:/topology/${topology_id}
)和灰度发布信息(路径:/gray_upgrade/${topology_id}
),以及任务分配信息(路径:assignments/${topology_id}
);
如果存在灰度发布信息,则判断对应的灰度状态(已过期 / 已完成 / 进行中),如果正在灰度中则拒绝本次灰度请求,否则(包含不存在灰度发布信息的情况)继续执行灰度发布;
方法利用 GrayUpgradeConfig 对象封装灰度发布信息,并写入到 ZK 的 /gray_upgrade/${topology_id}
路径下,同时设置 config.continueUpgrading=true
;
Topology Master 有一个线程 GrayUpgradeHandler 会定时读取该节点的配置,检测到有灰度发布配置且 continueUpgrading=true
时,将分配指定数目的 worker,添加到 ZK 的 /gray_upgrade/${topology_id}/upgrading_workers
路径下,并设置 continueUpgrading=false
(防止自动进行后续的灰度发布);
SyncSupervisorEvent 会定时检查每个拓扑的 upgrading_workers 节点,一旦有数据就和自身的 IP 和端口列表进行对比,如果有属于该 supervisor 节点的灰度发布就下载最新的 storm-code 和 storm-jar,然后重启 worker,同时将 worker 添加到 ZK 的 upgraded_workers 节点下;
GrayUpgradeHandler 检测 ZK,如果 upgraded_workers 的 worker 数大于等于当前总 worker 数减 1(topology master 组件占用),则认为此次灰度发布已经完成,删除 ZK 上的灰度发布配置、upgrading_workers,以及 upgraded_workers。
如果只想升级部分 worker 或特定组件,可以用 complete_upgrade 强制完成升级。灰度发布过程中使用单独的 upgrading_workers 和 upgraded_workers 的设计主要是为了避免同步问题。如果将这些信息写在 GrayUpgradeConfig 类中可能会涉及到多个 supervisor 节点同时更新 workers 的情况,而使用单独的节点则只需要在这个节点下添加和删除子节点,不会有同步问题。
热部署和灰度发布从形式上来看都是对运行中 topology 的更新替换操作,但是对于 nimbus 来说,在处理上却是两条不同的分支,实际上热部署与新任务提交在处理过程上更加形似,毕竟热部署的过程就是杀死处于运行中的 topology 然后执行新任务提交的过程,所以接下来我们主要分析新任务的调度细节。
新任务提交
对于新提交的任务来说,Storm 会为该 topology 执行一些准备和验证工作,并在 ZK 上创建相应的结点记录该 topology 的元数据和任务分配信息,然后为该 topology 生成一个事件提交给任务分配队列等待 nimbus 节点为当前 topology 制定运行方案,并在执行成功后发送相应的通知事件。相关实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 Map<Object, Object> stormConf = NimbusUtils.normalizeConf(conf, serializedConf, topology); LOG.info("Normalized configuration:" + stormConf); Map<Object, Object> totalStormConf = new HashMap<>(conf); totalStormConf.putAll(stormConf); StormTopology normalizedTopology = NimbusUtils.normalizeTopology(stormConf, topology, true ); Common.validate_basic(normalizedTopology, totalStormConf, topologyId); StormClusterState stormClusterState = data.getStormClusterState(); this .setupStormCode(topologyId, uploadedJarLocation, stormConf, normalizedTopology, false );this .waitForDesiredCodeReplication(conf, topologyId);this .setupZkTaskInfo(conf, topologyId, stormClusterState);String path = Cluster.taskerror_storm_root(topologyId); stormClusterState.mkdir(path); String grayUpgradeBasePath = Cluster.gray_upgrade_base_path(topologyId); stormClusterState.mkdir(grayUpgradeBasePath); stormClusterState.mkdir(Cluster.gray_upgrade_upgraded_workers_path(topologyId)); stormClusterState.mkdir(Cluster.gray_upgrade_upgrading_workers_path(topologyId)); LOG.info("Submit topology {} with conf {}" , topologyName, serializedConf); this .makeAssignment(topologyName, topologyId, options.get_initial_status());double metricsSampleRate = ConfigExtension.getMetricSampleRate(stormConf); StartTopologyEvent.pushEvent(topologyId, metricsSampleRate); this .notifyTopologyActionListener(topologyName, "submitTopology" );
下面对源码中涉及到的相关流程进行进一步分析。对于新提交的任务首先会执行一些准备工作,包括:
规范化 topology 的配置信息;
确定 topology 各个组件的并行度,保证不超过允许的最大值;
验证 topology 的基本结构信息(topology 名称和组件 ID 的合法性、spout 是否缺失 input 声明,以及验证 worker 和 acker 的参数配置等)。
然后 Storm 会创建或更新当前 topology 对象的序列化文件(stormcode.ser)和配置信息文件(stormconf.ser)到 blobstore 中,如果是采用 nimbus 本地模式存储,还需要将对应的元数据写入 ZK 来保证数据一致性。
接下来会为当前 topology 生成 task 信息,并记录到 ZK 上(路径:/tasks/${topology_id}
),对于一个 topology 的同一个组件来说,如果并行度大于 1,那么 Storm 会为其创建对应数量的 task,并保证 taskId 是连续的,相应实现位于 ServiceHandler#setupZkTaskInfo
方法中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public void setupZkTaskInfo (Map<Object, Object> conf, String topologyId, StormClusterState stormClusterState) throws Exception { Map<Integer, TaskInfo> taskToTaskInfo = this .mkTaskComponentAssignments(conf, topologyId); int masterId = NimbusUtils.getTopologyMasterId(taskToTaskInfo); TopologyTaskHbInfo topoTaskHbInfo = new TopologyTaskHbInfo(topologyId, masterId); data.getTasksHeartbeat().put(topologyId, topoTaskHbInfo); stormClusterState.topology_heartbeat(topologyId, topoTaskHbInfo); if (taskToTaskInfo == null || taskToTaskInfo.size() == 0 ) { throw new InvalidTopologyException("Failed to generate TaskIDs map" ); } stormClusterState.set_task(topologyId, taskToTaskInfo); }
该方法主要做了 3 件事情:
为当前 topology 追加系统组件(acker-bolt、master-bolt,以及 system-bolt)。
为当前 topology 生成 task 分配信息,并记录到 ZK 相应节点。
为当前 topology 在 ZK 上创建对应的 task 心跳记录文件。
其中 1 和 2 位于 ServiceHandler#mkTaskComponentAssignments
方法中:
1 2 3 4 5 6 7 8 9 10 11 public Map<Integer, TaskInfo> mkTaskComponentAssignments (Map<Object, Object> conf, String topologyId) throws IOException, InvalidTopologyException, KeyNotFoundException { Map<Object, Object> stormConf = StormConfig.read_nimbus_topology_conf(topologyId, data.getBlobStore()); StormTopology rawTopology = StormConfig.read_nimbus_topology_code(topologyId, data.getBlobStore()); StormTopology topology = Common.system_topology(stormConf, rawTopology); return Common.mkTaskInfo(stormConf, topology, topologyId); }
方法首先会从 blobstore 中获取 topology 的配置信息和 StormTopology 对象,然后调用 Common#system_topology
方法添加一些系统组件,包括 acker-bolt、master-bolt,以及 system-bolt 等。
然后调用 Common#mkTaskInfo
方法为当前 topology 中的各个组件生成 task 分配信息。方法实现比较简单,返回的结果是一个 map 类型,其中 key 是 taskId,对于同一个组件来说为其分配的 taskId 是连续的,value 是对应的 TaskInfo 对象,包含两个字段:componentId 和 componentType。前者对应系统组件 ID 和用户自定义组件 ID,后者对应组件类型,也就是 bolt 和 spout。
完成了 topology 组件 task 分配信息的创建,接下来方法为当前任务创建对应的 TopologyAssignEvent 事件对象,并将事件添加到队列中,等待集群为其分配资源。这一过程位于 ServiceHandler#makeAssignment
方法中,等待的过程采用了 CountDownLatch 机制,count 值设置为 1,并设置 5 分钟上限等待集群分配资源,超时则返回 false 表示本次任务提交失败。
队列的维护和消费过程位于 TopologyAssign 类中,该类实现了 Runnable 接口,并以单例的形式对外提供服务。Nimbus 节点在启动的时候会创建并初始化 TopologyAssign 对象,并以守护线程的方式启动队列的消费过程。线程的 run 方法会循环的从队列头部以阻塞的方式获取对应的 TopologyAssignEvent 事件对象,并调用 TopologyAssign#doTopologyAssignment
方法为相应的 topology 创建任务分配信息(Assignment 对象)和基本运行信息(StormBase 对象),并将任务分配信息和基本运行信息写入 ZK,其中关键的资源分配过程位于 TopologyAssign#mkAssignment
方法中,实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 public Assignment mkAssignment (TopologyAssignEvent event) throws Exception { String topologyId = event.getTopologyId(); LOG.info("Determining assignment for " + topologyId); TopologyAssignContext context = this .prepareTopologyAssign(event); Set<ResourceWorkerSlot> assignments; if (!StormConfig.local_mode(nimbusData.getConf())) { ITopologyScheduler scheduler = schedulers.get(DEFAULT_SCHEDULER_NAME); assignments = scheduler.assignTasks(context); } else { assignments = mkLocalAssignment(context); } Assignment assignment = null ; if (assignments != null && assignments.size() > 0 ) { Map<String, String> nodeHost = getTopologyNodeHost(context.getCluster(), context.getOldAssignment(), assignments); Map<Integer, Integer> startTimes = getTaskStartTimes( context, nimbusData, topologyId, context.getOldAssignment(), assignments); String codeDir = (String) nimbusData.getConf().get(Config.STORM_LOCAL_DIR); assignment = new Assignment(codeDir, assignments, nodeHost, startTimes); if (event.isScaleTopology()) { assignment.setAssignmentType(Assignment.AssignmentType.ScaleTopology); } StormClusterState stormClusterState = nimbusData.getStormClusterState(); stormClusterState.set_assignment(topologyId, assignment); NimbusUtils.updateTaskHbStartTime(nimbusData, assignment, topologyId); NimbusUtils.updateTopologyTaskTimeout(nimbusData, topologyId); LOG.info("Successfully make assignment for topology id " + topologyId + ": " + assignment); } return assignment; }
TopologyAssign#mkAssignment
方法主要做了下面三件事情:
基于配置和当前集群运行状态为当前 topology 创建任务分配的上下文信息。
依据当前的运行模式基于对应节点负载为当前 topology 中的 task 分配 worker。
将 topology 的任务分配信息 Assignment 对象记录到 ZK 相应节点上。
方法一开始会为当前 topology 创建任务分配的上下文信息 TopologyAssignContext 对象,该对象主要包含一下信息:
当前 topology 的 topologyId 和 topologyMasterId。
当前 topology 对应的 StormTopology 对象。
配置信息,包括 nimbus 节点配置和 topology 配置。
当前集群可用的 supervisor 节点信息(不包含位于黑名单中的和已经死亡的)。
当前 topology 范围内所有 taskId 与其对应的组件 ID 之间的映射关系。
当前 topology 范围内所有 task 的状态信息。
其他信息,包括任务分配类型、老的任务分配信息、是否是 reassign,以及未停止的 worker 列表等。
完成任务分配的上下文信息创建之后,Storm 会基于该信息为当前 topology 分配 worker,集群模式下该过程的实现位于 DefaultTopologyScheduler#assignTasks
方法中,该方法会先计算需要分配的 worker 数目,然后分别为每个 worker 分配对应的 supervisor 节点,最后为 topology 范围内所有组件(包括系统组件)的 task 分配对应的 worker 进程。下面先来看一下 为 worker 分配 supervisor 节点的过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 public List<ResourceWorkerSlot> getAvailableWorkers ( DefaultTopologyAssignContext context, Set<Integer> needAssign, int allocWorkerNum) { int reserveWorkers = context.getReserveWorkerNum(); int workersNum = this .getAvailableWorkersNum(context); if ((workersNum - reserveWorkers) < allocWorkerNum) { throw new FailedAssignTopologyException("there's no enough worker. allocWorkerNum=" + allocWorkerNum + ", availableWorkerNum=" + workersNum + ",reserveWorkerNum=" + reserveWorkers); } workersNum = allocWorkerNum; List<ResourceWorkerSlot> assignedWorkers = new ArrayList<>(); this .getRightWorkers(context, needAssign, assignedWorkers, workersNum, this .getUserDefineWorkers(context, ConfigExtension.getUserDefineAssignment(context.getStormConf()))); if (ConfigExtension.isUseOldAssignment(context.getStormConf())) { this .getRightWorkers(context, needAssign, assignedWorkers, workersNum, context.getOldWorkers()); } else if (context.getAssignType() == TopologyAssignContext.ASSIGN_TYPE_REBALANCE && !context.isReassign()) { int cnt = 0 ; for (ResourceWorkerSlot worker : context.getOldWorkers()) { if (cnt < workersNum) { ResourceWorkerSlot resFreeWorker = new ResourceWorkerSlot(); resFreeWorker.setPort(worker.getPort()); resFreeWorker.setHostname(worker.getHostname()); resFreeWorker.setNodeId(worker.getNodeId()); assignedWorkers.add(resFreeWorker); cnt++; } else { break ; } } } LOG.info("Get workers from user define and old assignments: " + assignedWorkers); int restWorkerNum = workersNum - assignedWorkers.size(); if (restWorkerNum < 0 ) { throw new FailedAssignTopologyException( "Too many workers are required for user define or old assignments. " + "workersNum=" + workersNum + ", assignedWorkersNum=" + assignedWorkers.size()); } for (int i = 0 ; i < restWorkerNum; i++) { assignedWorkers.add(new ResourceWorkerSlot()); } List<SupervisorInfo> isolationSupervisors = this .getIsolationSupervisors(context); if (isolationSupervisors.size() != 0 ) { this .putAllWorkerToSupervisor(assignedWorkers, this .getResAvailSupervisors(isolationSupervisors)); } else { this .putAllWorkerToSupervisor(assignedWorkers, this .getResAvailSupervisors(context.getCluster())); } this .setAllWorkerMemAndCpu(context.getStormConf(), assignedWorkers); LOG.info("Assigned workers=" + assignedWorkers); return assignedWorkers; }
为 worker 分配 supervisor 节点的过程可以概括为:
计算需要分配的 worker 数目,如果可用的 worker 数目不满足要求则会抛出异常;
为需要分配的 worker 创建 ResourceWorkerSlot 分配单元信息,主要分为四种情况:
用户自定义 worker slot 分配;
配置指定复用旧的分配信息则优先从旧的分配中选出合适的 worker slot;
对于 rebalance 任务分配类型,如果允许则复用原来的 worker slot;
剩余情况,创建新的 work slot;
为 worker 分配相应的 supervisor 节点。
下面主要来看一下步骤 3,相应实现位于 WorkerScheduler#putAllWorkerToSupervisor
方法中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 private void putAllWorkerToSupervisor (List<ResourceWorkerSlot> assignedWorkers, List<SupervisorInfo> supervisors) { for (ResourceWorkerSlot worker : assignedWorkers) { if (worker.getHostname() != null ) { for (SupervisorInfo supervisor : supervisors) { if (NetWorkUtils.equals(supervisor.getHostName(), worker.getHostname()) && supervisor.getAvailableWorkerPorts().size() > 0 ) { this .putWorkerToSupervisor(supervisor, worker); break ; } } } } supervisors = this .getResAvailSupervisors(supervisors); Collections.sort(supervisors, new Comparator<SupervisorInfo>() { @Override public int compare (SupervisorInfo o1, SupervisorInfo o2) { return -NumberUtils.compare(o1.getAvailableWorkerPorts().size(), o2.getAvailableWorkerPorts().size()); } }); this .putWorkerToSupervisor(assignedWorkers, supervisors); }
如果 worker 指定了 supervisor 节点,则会将其分配给对应的 supervisor,对于剩余的 worker 来说会考虑 supervisor 节点的负载进行分配,以保证集群中 supervisor 负载的均衡性。Storm 依据集群中 supervisor 节点的平均空闲端口数作为标准来衡量 supervisor 节点的负载,如果一个 supervisor 节点的空闲端口数小于该值则认为该 supervisor 过载。集群负载均衡性的保证主要参考以下两个规则:
优先选择负载较低的 supervisor 节点进分配。
如果 supervisor 节点都处于过载状态,但还有未分配的 worker,则从过载 supervisor 节点中优先选择空闲端口较多的节点进行分配。
再来看一下为 task 分配 worker 进程的过程,实现位于 TaskScheduler#assign
方法中,该方法按照组件的类别分先后对 task 进行 worker 分配,顺序如下:
优先为设置了 task.on.differ.node=true
的组件的 task 进行分配;
接着为剩余的用户自定义组件的 task 分配 worker;
最后为系统组件分配 worker。
方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public List<ResourceWorkerSlot> assign () { if (tasks.size() == 0 ) { assignments.addAll(this .getRestAssignedWorkers()); return assignments; } Set<Integer> assignedTasks = this .assignForDifferNodeTask(); tasks.removeAll(assignedTasks); Map<Integer, String> systemTasks = new HashMap<>(); for (Integer task : tasks) { String name = context.getTaskToComponent().get(task); if (Common.isSystemComponent(name)) { systemTasks.put(task, name); continue ; } this .assignForTask(name, task); } for (Entry<Integer, String> entry : systemTasks.entrySet()) { this .assignForTask(entry.getValue(), entry.getKey()); } assignments.addAll(this .getRestAssignedWorkers()); return assignments; }
对于设置了 task.on.differ.node=true
的组件,要求名下的 task 需要运行在不同的 supervisor 节点上,所以需要优先进行分配,否则如果一些 supervisor 因为配额已满从资源池移除之后,很可能导致没有足够多的 supervisor 节点来满足此类组件的 task 分配需求。
对于这三类组件的 task 分配过程基本过程类似,基本流程可以概括如下:
基于多重选择器为当前 task 选择最优 worker 进行分配;
将 task 加入到被分配 worker 的 task 列表,并更新 worker 持有的 task 数目;
检查当前 worker 分配的 task 数目,如果配额已满则将其从资源池移除,不再分配新的 task;
更新 task 所属组件分配在指定 worker 上的 task 数目。
完成了 task 到 worker,以及 worker 到 supervisor 的配置关系,也就相当于完成了对当前 topology 的任务分配过程,紧接着 Storm 会将任务分配信息记录到 ZK 对应的任务分配路径下面。需要清楚的一点是当前的分配还只是一个方案,Storm 集群并没有开始真正执行当前 topology,如果需要真正启动方案的执行,Storm 还需要调度各个 supervisor 节点按照方案启动相应的 worker 进程,并在每个 worker 进程上启动相应数量的线程来执行 task,相应过程我们后面会逐一进行分析。