JStorm 源码解析:拓扑的构建和提交过程

我们按照 Storm 规范开发的 spout 和 bolt 需要使用 TopologyBuilder 构建成有向无环图(拓扑),并指定消息的分组方式,然后提交给 Storm 集群执行,本篇我们将分析 topology 的构建和提交过程。前面分析 Storm 的编程接口时曾介绍过 StormTopology 这个 thrift 类,topology 在构建完成之后会封装成一个 StormTopology 对象,并通过 RPC 方法提交给 Storm 集群的 nimbus 节点。

拓扑的构建过程

拓扑结构在 Storm 集群中以 StormTopology 对象的形式表示,这是一个 thrift 类,其定义如下:

1
2
3
4
5
struct StormTopology {
1: required map<string, SpoutSpec> spouts; // topology 中的 spout 集合
2: required map<string, Bolt> bolts; // topology 中的 bolt 集合
3: required map<string, StateSpoutSpec> state_spouts; // topology 中的 state spout 集合
}

属性 spouts 的 key 是 spout 对应的 ID,value 是 SpoutSpec 类型对象,这也是一个 thrift 类,封装了 spout 的序列化 ComponentObject 对象和通用组件 ComponentCommon 对象。属性 bolts 的 key 是 bolt 对应的 ID,value 是 Bolt 类型对象,Bolt 同样是一个 thrfit 类,封装了 bolt 的序列化 ComponentObject 对象和通用组件 ComponentCommon 对象。

ComponentObject 是一个 thrift 联合类型(union),在这里主要使用了 serialized_java 字段记录组件的序列化值:

1
2
3
4
5
union ComponentObject {
1: binary serialized_java; // 序列化后的 java 对象
2: ShellComponent shell; // ShellComponent 对象
3: JavaObject java_object; // java 对象
}

ComponentCommon 是对组件的抽象表示,spout 和 bolt 在 topology 中统称为组件,topology 构建过程中会将 spout 和 bolt 都封装成为 ComponentCommon 对象:

1
2
3
4
5
6
7
8
9
10
struct ComponentCommon {
// 组件将从哪些 GlobalStreamId 以何种分组方式接收数据
1: required map<GlobalStreamId, Grouping> inputs;
// 组件要输出的所有流,key 是 streamId
2: required map<string, StreamInfo> streams;
// 组件并行度(即多少个线程),这些线程可能分布在不同的机器或进程空间中
3: optional i32 parallelism_hint;
// 组件相关配置项
4: optional string json_conf;
}

StormTopology 作为 thrift 类在编译成 java 实现时比较冗长,所以 Storm 提供了 TopologyBuilder 构造器类来简化 topology 的构造,其使用形式一般如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class WordCountTopology implements ComponentId, FieldName {

private static final String TOPOLOGY_NAME = "wordcount-topology";

public static void main(String[] args) throws Exception {
SentenceSpout sentenceSpout = new SentenceSpout();
SentenceSplitBolt sentenceSplitBolt = new SentenceSplitBolt();
WordCountBolt wordCountBolt = new WordCountBolt();
ReportBolt reportBolt = new ReportBolt();

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SENTENCE_SPOUT_ID, sentenceSpout);
builder.setBolt(SENTENCE_SPLIT_BOLT_ID, sentenceSplitBolt).shuffleGrouping(SENTENCE_SPOUT_ID);
builder.setBolt(WORD_COUNT_BOLT_ID, wordCountBolt).fieldsGrouping(SENTENCE_SPLIT_BOLT_ID, new Fields(WORD));
builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(WORD_COUNT_BOLT_ID);

Config config = new Config();
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());

TimeUnit.MINUTES.sleep(10);

localCluster.killTopology(TOPOLOGY_NAME);
localCluster.shutdown();
}

}

创建完 TopologyBuilder 实例之后,我们可以调用 setSpout 方法往 topology 中添加并设置 spout 组件,调用 setBolt 方法往 topology 中添加并设置 bolt 组件,并最后通过调用 createTopology 方法来完成 topology 的构建,该方法会返回一个 StormTopology 对象。TopologyBuilder 类中主要关注 3 个属性:

1
2
3
4
5
6
/** 记录拓扑范围内所有的 Bolt 对象 */
protected Map<String, IRichBolt> _bolts = new HashMap<>();
/** 记录拓扑范围内所有的 Spout 对象 */
protected Map<String, IRichSpout> _spouts = new HashMap<>();
/** 记录拓扑范围内封装所有的 Spout 和 Bolt 的 ComponentCommon 组件对象 */
protected Map<String, ComponentCommon> _commons = new HashMap<>();

下面来看一下 spout 和 bolt 的构造过程,即 setSpoutsetBolt 方法,针对这两类方法,TopologyBuilder 都提供了多种重载版本,其中 setSpout 对应的底层实现如下:

1
2
3
4
5
6
7
8
9
public SpoutDeclarer setSpout(String id, IRichSpout spout, Number parallelism_hint) throws IllegalArgumentException {
// 保证 id 在 topology 范围内的全局唯一
this.validateUnusedId(id);
// 以 ComponentCommon 的形式封装组件,并记录到 _commons 属性中
this.initCommon(id, spout, parallelism_hint);
// 记录组件到 _spout 集合中
_spouts.put(id, spout);
return new SpoutGetter(id);
}

方法首先会验证 spoutId 在 topology 范围内的全局唯一性,即没有被已有的 spout 和 bolt 占用,否则会抛出 IllegalArgumentException 异常。initCommon 方法会构造 spout 对应的 ComponentCommon 对象并记录到 _commons 属性中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
protected void initCommon(String id, IComponent component, Number parallelism) throws IllegalArgumentException {
ComponentCommon common = new ComponentCommon();
common.set_inputs(new HashMap<GlobalStreamId, Grouping>());

// 设置并行度
if (parallelism != null) {
int dop = parallelism.intValue();
if (dop < 1) {
throw new IllegalArgumentException("Parallelism must be positive.");
}
common.set_parallelism_hint(dop); // 设置组件并行度
} else {
// 如果没有设置的话,默认设置并行度为 1
common.set_parallelism_hint(1);
}

// 获取组件相关的配置并以 json 的形式记录到 ComponentCommon 对象中
Map conf = component.getComponentConfiguration();
if (conf != null) common.set_json_conf(JSONValue.toJSONString(conf));
_commons.put(id, common);
}

最后将 spout 对象记录到 _spouts 属性中,并构造当前 spout 对应的 SpoutGetter 对象。

SpoutGetter 可以理解为 spout 对应的属性配置器,用于为当前 spout 加载通用的配置和设置私有的属性值,并最终将所有的配置项序列化为 json 格式记录到封装当前 spout 的 ComponentCommon 对象中(json_conf 属性)。

1
2
3
4
5
protected class SpoutGetter extends ConfigGetter<SpoutDeclarer> implements SpoutDeclarer {
public SpoutGetter(String id) {
super(id);
}
}

SpoutGetter 类继承了 ConfigGetter 类,并实现了 SpoutDeclarer 接口,该接口主要是声明了一些 spout 组件相关的配置方法,具体的实现都在 ConfigGetter 的父类 BaseConfigurationDeclarer 中,实现比较简单,不展开说明。ConfigGetter 覆盖实现了父类的 addConfigurations 方法,并在该方法中将当前 spout 所有相关的配置项序列化成 json 记录到对应的 ComponentCommon 对象中:

1
2
3
4
5
6
7
8
9
10
11
@Override
public T addConfigurations(Map conf) {
if (conf != null && conf.containsKey(Config.TOPOLOGY_KRYO_REGISTER)) { // ${topology.kryo.register}
// 在通常的非事务流处理中,不允许设置组件的序列化方式
throw new IllegalArgumentException("Cannot set serializations for a component using fluent API");
}
String currConf = _commons.get(_id).get_json_conf();
// 将 currConf 与 conf 的配置项合并,并以 json string 的形式记录到对应组件的 json_conf 字段中
_commons.get(_id).set_json_conf(JStormUtils.mergeIntoJson(JStormUtils.parseJson(currConf), conf));
return (T) this;
}

下面继续分析 setBolt 方法,上一篇在介绍 Bolt 组件接口时我们知道 Storm 提供了三种基础的 Bolt 组件类型,即 IBolt(or IRichBolt)、IBasicBolt,以及 IBatchBolt。针对每种 Bolt 类型,TopologyBuilder 都有提供相应版本的 setBolt 方法实现,下面以最常见的 IBolt 类型为例,对应的方法实现如下:

1
2
3
4
5
6
7
8
9
public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint) throws IllegalArgumentException {
// 保证 id 在 topology 范围内的全局唯一
this.validateUnusedId(id);
// 以 ComponentCommon 形式封装组件,并记录到 _commons 属性中
this.initCommon(id, bolt, parallelism_hint);
// 记录 bolt 到 _bolt 集合中
_bolts.put(id, bolt);
return new BoltGetter(id);
}

流程上与 setSpout 大同小异,不再重复撰述,方法最终会将 bolt 对象记录到 _bolts 属性中,并构造当前 bolt 对应的 BoltGetter 对象。

前面我们分析了 SpoutGetter,知道其作用主要是为 spout 配置相关属性,BoltGetter 的作用同样如此,不过相对于 SpoutGetter 增加了消息分组方式的配置入口,最后同样将属性序列化为 json 格式记录到与组件相对应的 ComponentCommon 对象中。

在完成调用 setSpoutsetBolt 往 topology 中添加 spout 和 bolt 组件之后,我们需要调用 createTopology 方法创建相应的 StormTopology 对象,该方法的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public StormTopology createTopology() {
Map<String, Bolt> boltSpecs = new HashMap<>();
Map<String, SpoutSpec> spoutSpecs = new HashMap<>();

// 如果当前 topology 中含有 stateful-bolt,就为 topology 自动添加一个 CheckpointSpout
this.maybeAddCheckpointSpout();

// 遍历处理 bolt,封装 bolt 的序列化形式和 ComponentCommon 形式为 Bolt 对象,并记录到 boltSpecs 中
for (String boltId : _bolts.keySet()) {
IRichBolt bolt = _bolts.get(boltId);
// 如果当前 topology 中含有 stateful-bolt,那么针对 non-stateful bolt 都采用 CheckpointTupleForwarder 进行包装
bolt = this.maybeAddCheckpointTupleForwarder(bolt);
ComponentCommon common = this.getComponentCommon(boltId, bolt);
try {
this.maybeAddCheckpointInputs(common);
this.maybeAddWatermarkInputs(common, bolt);
// 封装 bolt 的序列化形式和 ComponentCommon 形式为 Bolt 对象,并记录到 boltSpecs 中
boltSpecs.put(boltId, new Bolt(ComponentObject.serialized_java(Utils.javaSerialize(bolt)), common));
} catch (RuntimeException wrapperCause) {
// 省略异常处理逻辑
throw wrapperCause;
}
}

// 遍历处理 spout,封装 spout 的序列化形式和 ComponentCommon 形式为 SpoutSpec 对象,并记录到 spoutSpecs 中
for (String spoutId : _spouts.keySet()) {
IRichSpout spout = _spouts.get(spoutId);
ComponentCommon common = this.getComponentCommon(spoutId, spout);
try {
// 封装 spout 的序列化形式和 ComponentCommon 形式为 SpoutSpec 对象,并记录到 spoutSpecs 中
spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.javaSerialize(spout)), common));
} catch (RuntimeException wrapperCause) {
// 省略异常处理逻辑
throw wrapperCause;
}
}

// 封装成为 stormTopology 对象返回
return new StormTopology(spoutSpecs, boltSpecs, new HashMap<String, StateSpoutSpec>());
}

整个方法的执行流程可以概括为:

  1. 如果 topology 中存在有状态的 bolt,则为当前 topology 自动添加一个 CheckpointSpout 组件;
  2. 遍历处理之前添加到 topology 中的 bolt,采用 Bolt 封装其序列化对象和 ComponentCommon 组件对象;
  3. 遍历处理之前添加到 topology 中的 spout,采用 SpoutSpec 封装其序列化对象和 ComponentCommon 组件对象;
  4. 构造 StormTopology 对象并返回。

集群环境下拓扑的提交过程

当构建完 topology 之后,我们需要以任务的形式将其提交到 Storm 集群运行。此外,为了方便调试,Storm 也支持通过 LocalCluster 在本地提交运行任务,本节我们主要介绍如何向 Storm 集群提交任务。

Storm 提供了 StormSubmitter 类用于向 Storm 集群提交任务,并提供了两类方法:submitTopologysubmitTopologyWithProgressBar。后者是对前者的封装,在原版 Storm 中用于支持显示任务的提交进度,但是这一设计在 JStorm 中被移除,所以两类方法实际上是等价的。接下来我们对 submitTopology 方法的实现进行分析,Storm 为该方法提供了多个重载版本,对应的底层实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts)
throws AlreadyAliveException, InvalidTopologyException {

// 验证配置是否为 json 格式
if (!Utils.isValidConf(stormConf)) {
throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
}

// 封装配置(构建 topology 期间添加的、提交 topology 时传入的,以及命令行参数)
Map userTotalConf = new HashMap();
userTotalConf.putAll(TopologyBuilder.getStormConf()); // add the configuration generated during topology building
userTotalConf.putAll(stormConf);
userTotalConf.putAll(Utils.readCommandLineOpts());

// 加载配置文件配置
Map conf = Utils.readStormConfig();
conf.putAll(stormConf);
putUserInfo(conf, stormConf);

try {
String serConf = Utils.to_json(userTotalConf); // 转换成 json 形式
if (localNimbus != null) {
// 本地模式
LOG.info("Submitting topology " + name + " in local mode");
localNimbus.submitTopology(name, null, serConf, topology);
} else {
// 集群模式
// 创建 Thrift 客户端
NimbusClient client = NimbusClient.getConfiguredClient(conf);
try {
// 是否允许热部署 ${topology.hot.deploy.enable}
boolean enableDeploy = ConfigExtension.getTopologyHotDeplogyEnable(userTotalConf);
// 是否是灰度发布 ${topology.upgrade}
boolean isUpgrade = ConfigExtension.isUpgradeTopology(userTotalConf);
// 是否允许动态更新
boolean dynamicUpdate = enableDeploy || isUpgrade;

if (topologyNameExists(client, conf, name) != dynamicUpdate) {
if (dynamicUpdate) {
// 动态更新,但是对应的 topology 不存在
throw new RuntimeException("Topology with name `" + name + "` does not exist on cluster");
} else {
// 提交新任务,但是对应的 topology 已经存在
throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
}
}

// 上传 jar 包
submitJar(client, conf);
LOG.info("Submitting topology " + name + " in distributed mode with conf " + serConf);

// 提交任务
if (opts != null) {
client.getClient().submitTopologyWithOpts(name, path, serConf, topology, opts);
} else {
// for backward compatibility
client.getClient().submitTopology(name, path, serConf, topology);
}
} finally {
client.close();
}
}
LOG.info("Finished submitting topology: " + name);
}
// 省略 catch 代码块
}

Storm 任务提交的过程本质上是一个与 nimbus 节点进行 RPC 通信的过程,整体流程可以概括为:

  1. 加载与封装配置;
  2. 验证当前 topology 的远程状态;
  3. 上传 topology 的 jar 文件到 nimbus 节点;
  4. 提交 topology 任务。

配置的加载与封装过程会验证配置是否为 json 格式,并聚合多个来源的配置封装成 map 集合。在任务提交之前会验证当前 topology 在远程集群的状态,如果当前操作是热部署或灰度发布,则必须保证对应的 topology 在远程集群已经存在,而对于新提交的 topology 来说,如果远程集群存在同名的 topology 则会禁止提交。

Storm 任务的提交分为两个步骤,首先上传 topology 对应的 jar 文件到 nimbus 服务器,上传成功之后才会调用远程方法通知 nimbus 有新任务加入,需要开始为该 topology 制定运行方案。下面先来看一下 jar 报上传的过程,该过程位于 submitJar 方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private static void submitJar(NimbusClient client, Map conf) {
if (submittedJar == null) {
try {
LOG.info("Jar not uploaded to master yet. Submitting jar...");
// 获取对应的 client jar 名称,例如 jstorm-1.0.0-SNAPSHOT.jar
String localJar = System.getProperty("storm.jar");
// 为待上传的 jar 包创建存储路径和 Channel,并返回路径值
// ${storm.local.dir}/nimbus/inbox/${key}
path = client.getClient().beginFileUpload();
String[] pathCache = path.split("/");
// ${storm.local.dir}/nimbus/inbox/${key}/stormjar-${key}.jar
String uploadLocation = path + "/stormjar-" + pathCache[pathCache.length - 1] + ".jar";

// 如果设置了 lib jar 则先上传 lib jar
List<String> lib = (List<String>) conf.get(GenericOptionsParser.TOPOLOGY_LIB_NAME); // topology.lib.name
Map<String, String> libPath = (Map<String, String>) conf.get(GenericOptionsParser.TOPOLOGY_LIB_PATH); // topology.lib.path
if (lib != null && lib.size() != 0) {
for (String libName : lib) {
String jarPath = path + "/lib/" + libName;
client.getClient().beginLibUpload(jarPath);
submitJar(conf, libPath.get(libName), jarPath, client);
}
} else {
if (localJar == null) {
// no lib, no client jar
throw new RuntimeException("No client app jar found, please upload it");
}
}

// 上传 client jar
if (localJar != null) {
submittedJar = submitJar(conf, localJar, uploadLocation, client);
} else {
// no client jar, but with lib jar
client.getClient().finishFileUpload(uploadLocation);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
} else {
LOG.info("Jar has already been uploaded to master. Will not submit again.");
}
}

方法首先会获取 topology 对应的 jar 文件名称(项目打包后对应的 jar 文件),然后调用 thrift 方法 beginFileUpload 为待上传的 jar 文件创建存储路径和传输通道,并返回对应的路径值。在 storm.thrift 文件中定义了一个 service 类型的 Nimbus 类,如果你对 thrift 熟悉就应该知道这是一个 service 接口声明,Nimbus 类声明了一些能够与 nimbus 节点进行远程通信的方法,相应方法实现位于 ServiceHandler 类中,可以在该类中找到 beginFileUpload 方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public String beginFileUpload() throws TException {
String fileLoc = null;
try {
String key = UUID.randomUUID().toString();
String path = StormConfig.masterInbox(conf) + "/" + key; // ${storm.local.dir}/nimbus/inbox/${key}
FileUtils.forceMkdir(new File(path));
FileUtils.cleanDirectory(new File(path));
fileLoc = path + "/stormjar-" + key + ".jar"; // ${storm.local.dir}/nimbus/inbox/${key}/stormjar-${key}.jar
data.getUploaders().put(fileLoc, Channels.newChannel(new FileOutputStream(fileLoc)));
LOG.info("Begin upload file from client to " + fileLoc);
return path;
}
// 省略 catch 代码块
}

方法首先会基于 UUID 为本次需要上传的 jar 文件创建一个唯一的名称标识,然后在 nimbus 本地对应的目录下创建 jar 文件存储路径(如下),同时为该路径创建一个传输通道,并返回该路径(不包含文件名称):

${storm.local.dir}/nimbus/inbox/${key}/stormjar-${key}.jar

接下来就开始执行 jar 文件的上传逻辑,如果我们在自己的代码中提交 topology 时指定了一些依赖包,那么这里首先会上传这些依赖包,然后再上传主程序包。所有的文件上传都位于一个重载版本的 submitJar 方法中,该重载方法会调用远程 uploadChunk 方法执行具体的文件上传操作,并在上传完成之后调用远程 finishFileUpload 方法关闭对应的上传通道。整个过程就是将我们发布机上本地的 topology jar 文件上传到 nimbus 节点对应的本地路径 nimbus/inbox/${key}/stormjar-${key}.jar 下面,其中 key 是一个 UUID 唯一标识。

接下来方法会调用 submitTopology 方法提交 topology 任务,默认会设置 topology 的初始化状态为 ACTIVE。Nimbus 在接收到 RPC 请求之后开始对提交的任务制定运行方案,主要是依据 topology 配置和集群的运行状态为提交的任务分配 task、worker,以及 supervisor。如果成功则返回对应的 topologyId,否则会抛出相应的异常,我们将在下一篇中对整个 topology 任务分配过程进行深入分析。