JStorm 源码解析:拓扑任务的资源分配过程

上一篇我们分析了 topology 构建和提交过程在客户端的逻辑,并最终通过 submitTopology 方法向 Storm 集群的 nimbus 节点提交任务。Nimbus 以 Thrift RPC 服务的方式运行,相应 thrift 接口方法实现位于 ServiceHandler 类中,下面我们从 ServiceHandler#submitTopology 方法切入,分析 nimbus 节点之于客户端提交任务的资源分配过程,该方法包装了 ServiceHandler#submitTopologyWithOpts 方法。

Storm 集群的任务提交主要分为三种类型:新任务提交、热部署,以及灰度发布。ServiceHandler#submitTopologyWithOpts 方法统一处理这三种情况,但是不管哪种提交方式都会首先验证 topology 名称和配置的合法性,然后基于具体提交类型分而治之。

灰度发布 & 热部署

首先来看灰度发布的情况,当客户端请求灰度发布时,nimbus 节点会检查对应 topology 在服务端的运行情况,只有状态为 ACTIVE 时才允许执行灰度发布。灰度发布的相关实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 获取指定 topology 的运行数据
TopologyInfo topologyInfo = this.getTopologyInfo(topologyId);
if (topologyInfo == null) {
throw new TException("Failed to get topology info");
}

// 获取指定的 worker 数目:${topology.upgrade.worker.num}
int workerNum = ConfigExtension.getUpgradeWorkerNum(serializedConf);
// 获取指定的组件名称:${topology.upgrade.component}
String component = ConfigExtension.getUpgradeComponent(serializedConf);
// 获取指定的 worker 列表:${topology.upgrade.workers}
Set<String> workers = ConfigExtension.getUpgradeWorkers(serializedConf);

// 判定 topology master 是不是使用独立的 worker
if (!ConfigExtension.isTmSingleWorker(serializedConf, topologyInfo.get_topology().get_numWorkers())) {
throw new TException("Gray upgrade requires that topology master to be a single worker, cannot perform the upgrade!");
}

// 灰度发布
return this.grayUpgrade(topologyId, uploadedJarLocation, topology, serializedConf, component, workers, workerNum);

对于允许灰度发布的场景,Storm 会基于当前提交 topology 的配置首先会尝试获取以下 3 个参数用于挑选 worker 进行发布:

  • topology.upgrade.worker.num
  • topology.upgrade.component
  • topology.upgrade.workers

如果同时指定了多个参数,方法会基于一定的优先级进行决策,具体如下:

  1. 如果参数 topology.upgrade.workers 不为空则忽略其他参数,挑选指定的 worker 进行发布,需要注意的是这些 worker 发布完之后,这个参数就自动置空;
  2. 否则查看参数 topology.upgrade.component 是否为空,如果不为空还需要查看参数 topology.upgrade.worker.num 是否为 0, 如果不为 0 则挑选指定工作组件下 topology.upgrade.worker.num 指定数目的 worker 进行发布,否则对这些工作组件下所有 worker 进行发布;
  3. 如果上面两个都为空,则随机挑选 topology.upgrade.worker.num 个 worker 进行发布。

灰度发布的具体执行流程位于 ServiceHandler#grayUpgrade 方法中,该方法实现比较冗长,故不在此贴出,下面参考源码和官网文档对发布的过程进行说明:

  1. 方法首先尝试从 ZK 获取当前 topology 对应的基本信息(路径:/topology/${topology_id})和灰度发布信息(路径:/gray_upgrade/${topology_id}),以及任务分配信息(路径:assignments/${topology_id});
  2. 如果存在灰度发布信息,则判断对应的灰度状态(已过期 / 已完成 / 进行中),如果正在灰度中则拒绝本次灰度请求,否则(包含不存在灰度发布信息的情况)继续执行灰度发布;
  3. 方法利用 GrayUpgradeConfig 对象封装灰度发布信息,并写入到 ZK 的 /gray_upgrade/${topology_id} 路径下,同时设置 config.continueUpgrading=true
  4. Topology Master 有一个线程 GrayUpgradeHandler 会定时读取该节点的配置,检测到有灰度发布配置且 continueUpgrading=true 时,将分配指定数目的 worker,添加到 ZK 的 /gray_upgrade/${topology_id}/upgrading_workers 路径下,并设置 continueUpgrading=false(防止自动进行后续的灰度发布);
  5. SyncSupervisorEvent 会定时检查每个拓扑的 upgrading_workers 节点,一旦有数据就和自身的 IP 和端口列表进行对比,如果有属于该 supervisor 节点的灰度发布就下载最新的 storm-code 和 storm-jar,然后重启 worker,同时将 worker 添加到 ZK 的 upgraded_workers 节点下;
  6. GrayUpgradeHandler 检测 ZK,如果 upgraded_workers 的 worker 数大于等于当前总 worker 数减 1(topology master 组件占用),则认为此次灰度发布已经完成,删除 ZK 上的灰度发布配置、upgrading_workers,以及 upgraded_workers。

如果只想升级部分 worker 或特定组件,可以用 complete_upgrade 强制完成升级。灰度发布过程中使用单独的 upgrading_workers 和 upgraded_workers 的设计主要是为了避免同步问题。如果将这些信息写在 GrayUpgradeConfig 类中可能会涉及到多个 supervisor 节点同时更新 workers 的情况,而使用单独的节点则只需要在这个节点下添加和删除子节点,不会有同步问题。

热部署和灰度发布从形式上来看都是对运行中 topology 的更新替换操作,但是对于 nimbus 来说,在处理上却是两条不同的分支,实际上热部署与新任务提交在处理过程上更加形似,毕竟热部署的过程就是杀死处于运行中的 topology 然后执行新任务提交的过程,所以接下来我们主要分析新任务的调度细节。

新任务提交

对于新提交的任务来说,Storm 会为该 topology 执行一些准备和验证工作,并在 ZK 上创建相应的结点记录该 topology 的元数据和任务分配信息,然后为该 topology 生成一个事件提交给任务分配队列等待 nimbus 节点为当前 topology 制定运行方案,并在执行成功后发送相应的通知事件。相关实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// 对当前 topology 配置进行规范化,并附加一些必要的配置
Map<Object, Object> stormConf = NimbusUtils.normalizeConf(conf, serializedConf, topology);
LOG.info("Normalized configuration:" + stormConf);

// 合并集群配置和拓扑配置
Map<Object, Object> totalStormConf = new HashMap<>(conf);
totalStormConf.putAll(stormConf);

// 确定 topology 中各个组件的并行度,保证不超过当前 topology 允许的最大值
StormTopology normalizedTopology = NimbusUtils.normalizeTopology(stormConf, topology, true);

/*
* 验证 topology 的基本结构信息:
* 1. 验证 topologyName,组件 ID 是否合法
* 2. 验证是否存在缺失 input 声明的 spout
* 3. 验证 woker 和 acker 数目参数配置
*/
Common.validate_basic(normalizedTopology, totalStormConf, topologyId);

StormClusterState stormClusterState = data.getStormClusterState();

// 创建 /local-dir/nimbus/${topology_id}/xxxx 文件,并将元数据同步到 ZK
this.setupStormCode(topologyId, uploadedJarLocation, stormConf, normalizedTopology, false);

// wait for blob replication before activate topology
this.waitForDesiredCodeReplication(conf, topologyId);

// generate TaskInfo for every bolt or spout in ZK : /ZK/tasks/topoologyId/xxx
// 为当前 topology 在 ZK 上生成 task 信息:/tasks/${topology_id}
this.setupZkTaskInfo(conf, topologyId, stormClusterState);

// mkdir topology error directory : taskerrors/${topology_id}
String path = Cluster.taskerror_storm_root(topologyId);
stormClusterState.mkdir(path);

String grayUpgradeBasePath = Cluster.gray_upgrade_base_path(topologyId); // gray_upgrade/${topology_id}
stormClusterState.mkdir(grayUpgradeBasePath);
// gray_upgrade/${topology_id}/upgraded_workers
stormClusterState.mkdir(Cluster.gray_upgrade_upgraded_workers_path(topologyId));
// gray_upgrade/${topology_id}/upgrading_workers
stormClusterState.mkdir(Cluster.gray_upgrade_upgrading_workers_path(topologyId));

// 为当前 topology 执行任务分配
LOG.info("Submit topology {} with conf {}", topologyName, serializedConf);
this.makeAssignment(topologyName, topologyId, options.get_initial_status());

// push start event after startup
double metricsSampleRate = ConfigExtension.getMetricSampleRate(stormConf); // ${topology.metric.sample.rate},默认是 0.05
StartTopologyEvent.pushEvent(topologyId, metricsSampleRate);

this.notifyTopologyActionListener(topologyName, "submitTopology");

下面对源码中涉及到的相关流程进行进一步分析。对于新提交的任务首先会执行一些准备工作,包括:

  1. 规范化 topology 的配置信息;
  2. 确定 topology 各个组件的并行度,保证不超过允许的最大值;
  3. 验证 topology 的基本结构信息(topology 名称和组件 ID 的合法性、spout 是否缺失 input 声明,以及验证 worker 和 acker 的参数配置等)。

然后 Storm 会创建或更新当前 topology 对象的序列化文件(stormcode.ser)和配置信息文件(stormconf.ser)到 blobstore 中,如果是采用 nimbus 本地模式存储,还需要将对应的元数据写入 ZK 来保证数据一致性。

接下来会为当前 topology 生成 task 信息,并记录到 ZK 上(路径:/tasks/${topology_id}),对于一个 topology 的同一个组件来说,如果并行度大于 1,那么 Storm 会为其创建对应数量的 task,并保证 taskId 是连续的,相应实现位于 ServiceHandler#setupZkTaskInfo 方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void setupZkTaskInfo(Map<Object, Object> conf, String topologyId, StormClusterState stormClusterState) throws Exception {
// 为当前 topology 追加系统组件,同时基于并行度创建组件对应的 task 信息,同一个组件的多个 task 信息具备连续的 ID
Map<Integer, TaskInfo> taskToTaskInfo = this.mkTaskComponentAssignments(conf, topologyId);

// 获取 topology master 的 ID (这里使用的是其对应的 task ID)
int masterId = NimbusUtils.getTopologyMasterId(taskToTaskInfo);
TopologyTaskHbInfo topoTaskHbInfo = new TopologyTaskHbInfo(topologyId, masterId);
data.getTasksHeartbeat().put(topologyId, topoTaskHbInfo);
// 创建 /ZK/taskbeats/${topology_id},并写入 topologyId 和 topologyMasterId
stormClusterState.topology_heartbeat(topologyId, topoTaskHbInfo);

if (taskToTaskInfo == null || taskToTaskInfo.size() == 0) {
throw new InvalidTopologyException("Failed to generate TaskIDs map");
}
// key is task id, value is task info
// 记录 task 信息到 ZK : /ZK/tasks/${topology_id}
stormClusterState.set_task(topologyId, taskToTaskInfo);
}

该方法主要做了 3 件事情:

  1. 为当前 topology 追加系统组件(acker-bolt、master-bolt,以及 system-bolt)。
  2. 为当前 topology 生成 task 分配信息,并记录到 ZK 相应节点。
  3. 为当前 topology 在 ZK 上创建对应的 task 心跳记录文件。

其中 1 和 2 位于 ServiceHandler#mkTaskComponentAssignments 方法中:

1
2
3
4
5
6
7
8
9
10
11
public Map<Integer, TaskInfo> mkTaskComponentAssignments(Map<Object, Object> conf, String topologyId)
throws IOException, InvalidTopologyException, KeyNotFoundException {
// 从 blobstore 中获取当前 topology 的配置信息
Map<Object, Object> stormConf = StormConfig.read_nimbus_topology_conf(topologyId, data.getBlobStore());
// 从 blobstore 中获取当前 topology 的 StormTopology 对象
StormTopology rawTopology = StormConfig.read_nimbus_topology_code(topologyId, data.getBlobStore());
// 追加一些系统组件到当前 topology 中
StormTopology topology = Common.system_topology(stormConf, rawTopology);
// 为当前 topology 生成 task 信息,key 是 taskId
return Common.mkTaskInfo(stormConf, topology, topologyId);
}

方法首先会从 blobstore 中获取 topology 的配置信息和 StormTopology 对象,然后调用 Common#system_topology 方法添加一些系统组件,包括 acker-bolt、master-bolt,以及 system-bolt 等。

然后调用 Common#mkTaskInfo 方法为当前 topology 中的各个组件生成 task 分配信息。方法实现比较简单,返回的结果是一个 map 类型,其中 key 是 taskId,对于同一个组件来说为其分配的 taskId 是连续的,value 是对应的 TaskInfo 对象,包含两个字段:componentId 和 componentType。前者对应系统组件 ID 和用户自定义组件 ID,后者对应组件类型,也就是 bolt 和 spout。

完成了 topology 组件 task 分配信息的创建,接下来方法为当前任务创建对应的 TopologyAssignEvent 事件对象,并将事件添加到队列中,等待集群为其分配资源。这一过程位于 ServiceHandler#makeAssignment 方法中,等待的过程采用了 CountDownLatch 机制,count 值设置为 1,并设置 5 分钟上限等待集群分配资源,超时则返回 false 表示本次任务提交失败。

队列的维护和消费过程位于 TopologyAssign 类中,该类实现了 Runnable 接口,并以单例的形式对外提供服务。Nimbus 节点在启动的时候会创建并初始化 TopologyAssign 对象,并以守护线程的方式启动队列的消费过程。线程的 run 方法会循环的从队列头部以阻塞的方式获取对应的 TopologyAssignEvent 事件对象,并调用 TopologyAssign#doTopologyAssignment 方法为相应的 topology 创建任务分配信息(Assignment 对象)和基本运行信息(StormBase 对象),并将任务分配信息和基本运行信息写入 ZK,其中关键的资源分配过程位于 TopologyAssign#mkAssignment 方法中,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public Assignment mkAssignment(TopologyAssignEvent event) throws Exception {
String topologyId = event.getTopologyId();
LOG.info("Determining assignment for " + topologyId);

// 1. 基于配置和当前集群运行状态创建 topology 任务分配的上下文信息
TopologyAssignContext context = this.prepareTopologyAssign(event);

// 2. 依据当前的运行模式基于对应节点负载为当前 topology 中的 task 分配 worker
Set<ResourceWorkerSlot> assignments;
if (!StormConfig.local_mode(nimbusData.getConf())) {
// 集群模式,获取模式的调度器
ITopologyScheduler scheduler = schedulers.get(DEFAULT_SCHEDULER_NAME); // DefaultTopologyScheduler
// 为当前 topology 中的 task 分配 worker
assignments = scheduler.assignTasks(context);
} else {
// 本地模式
assignments = mkLocalAssignment(context);
}

// 3. 记录任务分配信息到 ZK: assignments/${topology_id}
Assignment assignment = null;
if (assignments != null && assignments.size() > 0) {
// 获取服务中的 supervisorId 及其 hostname 映射信息
Map<String, String> nodeHost = getTopologyNodeHost(context.getCluster(), context.getOldAssignment(), assignments);
// 获取 task 的启动时间:<taskId, start_second>
Map<Integer, Integer> startTimes = getTaskStartTimes(
context, nimbusData, topologyId, context.getOldAssignment(), assignments);

String codeDir = (String) nimbusData.getConf().get(Config.STORM_LOCAL_DIR);

assignment = new Assignment(codeDir, assignments, nodeHost, startTimes);

// the topology binary changed.
if (event.isScaleTopology()) {
assignment.setAssignmentType(Assignment.AssignmentType.ScaleTopology);
}
StormClusterState stormClusterState = nimbusData.getStormClusterState();
// 写入 assignment 信息到 ZK: assignments/${topology_id}
stormClusterState.set_assignment(topologyId, assignment);

// update task heartbeat's start time
NimbusUtils.updateTaskHbStartTime(nimbusData, assignment, topologyId);
NimbusUtils.updateTopologyTaskTimeout(nimbusData, topologyId);

LOG.info("Successfully make assignment for topology id " + topologyId + ": " + assignment);
}
return assignment;
}

TopologyAssign#mkAssignment 方法主要做了下面三件事情:

  1. 基于配置和当前集群运行状态为当前 topology 创建任务分配的上下文信息。
  2. 依据当前的运行模式基于对应节点负载为当前 topology 中的 task 分配 worker。
  3. 将 topology 的任务分配信息 Assignment 对象记录到 ZK 相应节点上。

方法一开始会为当前 topology 创建任务分配的上下文信息 TopologyAssignContext 对象,该对象主要包含一下信息:

  • 当前 topology 的 topologyId 和 topologyMasterId。
  • 当前 topology 对应的 StormTopology 对象。
  • 配置信息,包括 nimbus 节点配置和 topology 配置。
  • 当前集群可用的 supervisor 节点信息(不包含位于黑名单中的和已经死亡的)。
  • 当前 topology 范围内所有 taskId 与其对应的组件 ID 之间的映射关系。
  • 当前 topology 范围内所有 task 的状态信息。
  • 其他信息,包括任务分配类型、老的任务分配信息、是否是 reassign,以及未停止的 worker 列表等。

完成任务分配的上下文信息创建之后,Storm 会基于该信息为当前 topology 分配 worker,集群模式下该过程的实现位于 DefaultTopologyScheduler#assignTasks 方法中,该方法会先计算需要分配的 worker 数目,然后分别为每个 worker 分配对应的 supervisor 节点,最后为 topology 范围内所有组件(包括系统组件)的 task 分配对应的 worker 进程。下面先来看一下 为 worker 分配 supervisor 节点的过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public List<ResourceWorkerSlot> getAvailableWorkers(
DefaultTopologyAssignContext context, Set<Integer> needAssign, int allocWorkerNum) {

// 1. 计算需要分配的 worker 数目
int reserveWorkers = context.getReserveWorkerNum(); // 需要保留的 worker 数目
int workersNum = this.getAvailableWorkersNum(context); // 当前集群总的可用的 worker 数目
if ((workersNum - reserveWorkers) < allocWorkerNum) {
// 没有足够的 worker 可以分配:可用 worker 数目 - 保留的 worker 数目 < 需要分配的数目
throw new FailedAssignTopologyException("there's no enough worker. allocWorkerNum="
+ allocWorkerNum + ", availableWorkerNum=" + workersNum + ",reserveWorkerNum=" + reserveWorkers);
}
workersNum = allocWorkerNum;

// 记录分配到的 worker
List<ResourceWorkerSlot> assignedWorkers = new ArrayList<>();

// 2. 分配 worker

// 2.1 处理用户自定义分配的情况
// 从 needAssign 中移除已经分配的 task,并记录分配的 worker 到 assignedWorkers 中
this.getRightWorkers(context, needAssign, assignedWorkers, workersNum,
// 获取用户自定义分配 worker slot 信息,排除状态为 unstopped 的 worker
this.getUserDefineWorkers(context, ConfigExtension.getUserDefineAssignment(context.getStormConf())));

if (ConfigExtension.isUseOldAssignment(context.getStormConf())) {
// 2.2 如果配置指定要复用旧的分配,则优先从旧的分配中选出合适的 worker
this.getRightWorkers(context, needAssign, assignedWorkers, workersNum, context.getOldWorkers());
} else if (context.getAssignType() == TopologyAssignContext.ASSIGN_TYPE_REBALANCE && !context.isReassign()) {
// 2.3 如果是 rebalance 任务分配类型,且可以复用原来的 worker 则将原来分配的 worker 记录下来
int cnt = 0;
for (ResourceWorkerSlot worker : context.getOldWorkers()) {
if (cnt < workersNum) {
ResourceWorkerSlot resFreeWorker = new ResourceWorkerSlot();
resFreeWorker.setPort(worker.getPort());
resFreeWorker.setHostname(worker.getHostname());
resFreeWorker.setNodeId(worker.getNodeId());
assignedWorkers.add(resFreeWorker);
cnt++;
} else {
break;
}
}
}

LOG.info("Get workers from user define and old assignments: " + assignedWorkers);

int restWorkerNum = workersNum - assignedWorkers.size(); // 还需要分配的 worker 数目
if (restWorkerNum < 0) {
throw new FailedAssignTopologyException(
"Too many workers are required for user define or old assignments. " +
"workersNum=" + workersNum + ", assignedWorkersNum=" + assignedWorkers.size());
}

// 2.4 对于剩下需要的 worker,直接添加 ResourceWorkerSlot 实例对象
for (int i = 0; i < restWorkerNum; i++) {
assignedWorkers.add(new ResourceWorkerSlot());
}

/*
* 3. 遍历将 worker 分配给相应的 supervisor
* - 如果 worker 指定了 supervisor,则优先分配给指定 supervisor
* - 依据 supervisor 的负载情况优先选择负载较低的进行分配
*/
List<SupervisorInfo> isolationSupervisors = this.getIsolationSupervisors(context);
if (isolationSupervisors.size() != 0) {
this.putAllWorkerToSupervisor(assignedWorkers, this.getResAvailSupervisors(isolationSupervisors));
} else {
// 为 worker 分配对应的 supervisor
this.putAllWorkerToSupervisor(assignedWorkers, this.getResAvailSupervisors(context.getCluster()));
}
this.setAllWorkerMemAndCpu(context.getStormConf(), assignedWorkers);
LOG.info("Assigned workers=" + assignedWorkers);
return assignedWorkers;
}

为 worker 分配 supervisor 节点的过程可以概括为:

  1. 计算需要分配的 worker 数目,如果可用的 worker 数目不满足要求则会抛出异常;
  2. 为需要分配的 worker 创建 ResourceWorkerSlot 分配单元信息,主要分为四种情况:
  • 用户自定义 worker slot 分配;
  • 配置指定复用旧的分配信息则优先从旧的分配中选出合适的 worker slot;
  • 对于 rebalance 任务分配类型,如果允许则复用原来的 worker slot;
  • 剩余情况,创建新的 work slot;
  1. 为 worker 分配相应的 supervisor 节点。

下面主要来看一下步骤 3,相应实现位于 WorkerScheduler#putAllWorkerToSupervisor 方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private void putAllWorkerToSupervisor(List<ResourceWorkerSlot> assignedWorkers, List<SupervisorInfo> supervisors) {

// 遍历处理 worker,如果指定了 supervisor,且 supervisor 存在空闲端口,则将其分配给该 supervisor
for (ResourceWorkerSlot worker : assignedWorkers) {
if (worker.getHostname() != null) {
for (SupervisorInfo supervisor : supervisors) {
// 如果当前 worker 对应的 hostname 是该 supervisor,且 supervisor 存在空闲的 worker
if (NetWorkUtils.equals(supervisor.getHostName(), worker.getHostname())
&& supervisor.getAvailableWorkerPorts().size() > 0) {
/*
* 基于当前 supervisor 信息更新对应的 worker 信息:
* 1. 保证 worker 对应的端口号是当前 supervisor 空闲的,否则选一个 supervisor 空闲的给 worker
* 2. 设置 worker 对应的 nodeId 为当前 supervisor 的 ID
*/
this.putWorkerToSupervisor(supervisor, worker);
break;
}
}
}
}

// 更新 supervisor 列表,移除没有空闲端口的 supervisor
supervisors = this.getResAvailSupervisors(supervisors);

// 对 supervisor 按照空闲端口数由大到小排序
Collections.sort(supervisors, new Comparator<SupervisorInfo>() {

@Override
public int compare(SupervisorInfo o1, SupervisorInfo o2) {
return -NumberUtils.compare(o1.getAvailableWorkerPorts().size(), o2.getAvailableWorkerPorts().size());
}

});

/*
* 按照 supervisor 的负载对 worker 进行分配:
* 1. 优先选择负载较低的 supervisor 进分配
* 2. 如果 supervisor 都已经过载但还有未分配的 worker,则从过载 supervisor 优先选择空闲端口较多的进行分配
*/
this.putWorkerToSupervisor(assignedWorkers, supervisors);
}

如果 worker 指定了 supervisor 节点,则会将其分配给对应的 supervisor,对于剩余的 worker 来说会考虑 supervisor 节点的负载进行分配,以保证集群中 supervisor 负载的均衡性。Storm 依据集群中 supervisor 节点的平均空闲端口数作为标准来衡量 supervisor 节点的负载,如果一个 supervisor 节点的空闲端口数小于该值则认为该 supervisor 过载。集群负载均衡性的保证主要参考以下两个规则:

  1. 优先选择负载较低的 supervisor 节点进分配。
  2. 如果 supervisor 节点都处于过载状态,但还有未分配的 worker,则从过载 supervisor 节点中优先选择空闲端口较多的节点进行分配。

再来看一下为 task 分配 worker 进程的过程,实现位于 TaskScheduler#assign 方法中,该方法按照组件的类别分先后对 task 进行 worker 分配,顺序如下:

  1. 优先为设置了 task.on.differ.node=true 的组件的 task 进行分配;
  2. 接着为剩余的用户自定义组件的 task 分配 worker;
  3. 最后为系统组件分配 worker。

方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public List<ResourceWorkerSlot> assign() {
if (tasks.size() == 0) { // 没有需要再分配的任务
assignments.addAll(this.getRestAssignedWorkers());
return assignments;
}

// 1. 处理设置了 task.on.differ.node=true 的组件,为其在不同 supervisor 节点上分配 worker
Set<Integer> assignedTasks = this.assignForDifferNodeTask();

// 2. 为剩余 task 分配 worker,不包含系统组件
tasks.removeAll(assignedTasks);
Map<Integer, String> systemTasks = new HashMap<>();
for (Integer task : tasks) {
String name = context.getTaskToComponent().get(task);
if (Common.isSystemComponent(name)) {
systemTasks.put(task, name);
continue;
}
this.assignForTask(name, task);
}

// 3. 为系统组件 task 分配 worker, e.g. acker, topology master...
for (Entry<Integer, String> entry : systemTasks.entrySet()) {
this.assignForTask(entry.getValue(), entry.getKey());
}

// 记录所有分配了 task 的 worker 集合
assignments.addAll(this.getRestAssignedWorkers());
return assignments;
}

对于设置了 task.on.differ.node=true 的组件,要求名下的 task 需要运行在不同的 supervisor 节点上,所以需要优先进行分配,否则如果一些 supervisor 因为配额已满从资源池移除之后,很可能导致没有足够多的 supervisor 节点来满足此类组件的 task 分配需求。

对于这三类组件的 task 分配过程基本过程类似,基本流程可以概括如下:

  1. 基于多重选择器为当前 task 选择最优 worker 进行分配;
  2. 将 task 加入到被分配 worker 的 task 列表,并更新 worker 持有的 task 数目;
  3. 检查当前 worker 分配的 task 数目,如果配额已满则将其从资源池移除,不再分配新的 task;
  4. 更新 task 所属组件分配在指定 worker 上的 task 数目。

完成了 task 到 worker,以及 worker 到 supervisor 的配置关系,也就相当于完成了对当前 topology 的任务分配过程,紧接着 Storm 会将任务分配信息记录到 ZK 对应的任务分配路径下面。需要清楚的一点是当前的分配还只是一个方案,Storm 集群并没有开始真正执行当前 topology,如果需要真正启动方案的执行,Storm 还需要调度各个 supervisor 节点按照方案启动相应的 worker 进程,并在每个 worker 进程上启动相应数量的线程来执行 task,相应过程我们后面会逐一进行分析。