我们按照 Storm 规范开发的 spout 和 bolt 需要使用 TopologyBuilder 构建成有向无环图(拓扑),并指定消息的分组方式,然后提交给 Storm 集群执行,本篇我们将分析 topology 的构建和提交过程。前面分析 Storm 的编程接口时曾介绍过 StormTopology 这个 thrift 类,topology 在构建完成之后会封装成一个 StormTopology 对象,并通过 RPC 方法提交给 Storm 集群的 nimbus 节点。
拓扑的构建过程
拓扑结构在 Storm 集群中以 StormTopology 对象的形式表示,这是一个 thrift 类,其定义如下:
1 2 3 4 5 struct StormTopology { 1 : required map<string , SpoutSpec> spouts; 2 : required map<string , Bolt> bolts; 3 : required map<string , StateSpoutSpec> state_spouts; }
属性 spouts 的 key 是 spout 对应的 ID,value 是 SpoutSpec 类型对象,这也是一个 thrift 类,封装了 spout 的序列化 ComponentObject 对象和通用组件 ComponentCommon 对象。属性 bolts 的 key 是 bolt 对应的 ID,value 是 Bolt 类型对象,Bolt 同样是一个 thrfit 类,封装了 bolt 的序列化 ComponentObject 对象和通用组件 ComponentCommon 对象。
ComponentObject 是一个 thrift 联合类型(union),在这里主要使用了 serialized_java 字段记录组件的序列化值:
1 2 3 4 5 union ComponentObject { 1 : binary serialized_java; 2 : ShellComponent shell; 3 : JavaObject java_object; }
ComponentCommon 是对组件的抽象表示,spout 和 bolt 在 topology 中统称为组件,topology 构建过程中会将 spout 和 bolt 都封装成为 ComponentCommon 对象:
1 2 3 4 5 6 7 8 9 10 struct ComponentCommon { 1 : required map<GlobalStreamId, Grouping> inputs; 2 : required map<string , StreamInfo> streams; 3 : optional i32 parallelism_hint; 4 : optional string json_conf; }
StormTopology 作为 thrift 类在编译成 java 实现时比较冗长,所以 Storm 提供了 TopologyBuilder 构造器类来简化 topology 的构造,其使用形式一般如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class WordCountTopology implements ComponentId , FieldName { private static final String TOPOLOGY_NAME = "wordcount-topology" ; public static void main (String[] args) throws Exception { SentenceSpout sentenceSpout = new SentenceSpout(); SentenceSplitBolt sentenceSplitBolt = new SentenceSplitBolt(); WordCountBolt wordCountBolt = new WordCountBolt(); ReportBolt reportBolt = new ReportBolt(); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(SENTENCE_SPOUT_ID, sentenceSpout); builder.setBolt(SENTENCE_SPLIT_BOLT_ID, sentenceSplitBolt).shuffleGrouping(SENTENCE_SPOUT_ID); builder.setBolt(WORD_COUNT_BOLT_ID, wordCountBolt).fieldsGrouping(SENTENCE_SPLIT_BOLT_ID, new Fields(WORD)); builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(WORD_COUNT_BOLT_ID); Config config = new Config(); LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology()); TimeUnit.MINUTES.sleep(10 ); localCluster.killTopology(TOPOLOGY_NAME); localCluster.shutdown(); } }
创建完 TopologyBuilder 实例之后,我们可以调用 setSpout
方法往 topology 中添加并设置 spout 组件,调用 setBolt
方法往 topology 中添加并设置 bolt 组件,并最后通过调用 createTopology
方法来完成 topology 的构建,该方法会返回一个 StormTopology 对象。TopologyBuilder 类中主要关注 3 个属性:
1 2 3 4 5 6 protected Map<String, IRichBolt> _bolts = new HashMap<>();protected Map<String, IRichSpout> _spouts = new HashMap<>();protected Map<String, ComponentCommon> _commons = new HashMap<>();
下面来看一下 spout 和 bolt 的构造过程,即 setSpout
和 setBolt
方法,针对这两类方法,TopologyBuilder 都提供了多种重载版本,其中 setSpout
对应的底层实现如下:
1 2 3 4 5 6 7 8 9 public SpoutDeclarer setSpout (String id, IRichSpout spout, Number parallelism_hint) throws IllegalArgumentException {this .validateUnusedId(id);this .initCommon(id, spout, parallelism_hint);_spouts.put(id, spout); return new SpoutGetter(id);}
方法首先会验证 spoutId 在 topology 范围内的全局唯一性,即没有被已有的 spout 和 bolt 占用,否则会抛出 IllegalArgumentException 异常。initCommon
方法会构造 spout 对应的 ComponentCommon 对象并记录到 _commons
属性中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 protected void initCommon (String id, IComponent component, Number parallelism) throws IllegalArgumentException { ComponentCommon common = new ComponentCommon(); common.set_inputs(new HashMap<GlobalStreamId, Grouping>()); if (parallelism != null ) { int dop = parallelism.intValue(); if (dop < 1 ) { throw new IllegalArgumentException("Parallelism must be positive." ); } common.set_parallelism_hint(dop); } else { common.set_parallelism_hint(1 ); } Map conf = component.getComponentConfiguration(); if (conf != null ) common.set_json_conf(JSONValue.toJSONString(conf)); _commons.put(id, common); }
最后将 spout 对象记录到 _spouts
属性中,并构造当前 spout 对应的 SpoutGetter 对象。
SpoutGetter 可以理解为 spout 对应的属性配置器,用于为当前 spout 加载通用的配置和设置私有的属性值,并最终将所有的配置项序列化为 json 格式记录到封装当前 spout 的 ComponentCommon 对象中(json_conf 属性)。
1 2 3 4 5 protected class SpoutGetter extends ConfigGetter <SpoutDeclarer > implements SpoutDeclarer { public SpoutGetter (String id) { super (id); } }
SpoutGetter 类继承了 ConfigGetter 类,并实现了 SpoutDeclarer 接口,该接口主要是声明了一些 spout 组件相关的配置方法,具体的实现都在 ConfigGetter 的父类 BaseConfigurationDeclarer 中,实现比较简单,不展开说明。ConfigGetter 覆盖实现了父类的 addConfigurations
方法,并在该方法中将当前 spout 所有相关的配置项序列化成 json 记录到对应的 ComponentCommon 对象中:
1 2 3 4 5 6 7 8 9 10 11 @Override public T addConfigurations (Map conf) { if (conf != null && conf.containsKey(Config.TOPOLOGY_KRYO_REGISTER)) { throw new IllegalArgumentException("Cannot set serializations for a component using fluent API" ); } String currConf = _commons.get(_id).get_json_conf(); _commons.get(_id).set_json_conf(JStormUtils.mergeIntoJson(JStormUtils.parseJson(currConf), conf)); return (T) this ; }
下面继续分析 setBolt
方法,上一篇在介绍 Bolt 组件接口时我们知道 Storm 提供了三种基础的 Bolt 组件类型,即 IBolt(or IRichBolt)、IBasicBolt,以及 IBatchBolt。针对每种 Bolt 类型,TopologyBuilder 都有提供相应版本的 setBolt
方法实现,下面以最常见的 IBolt 类型为例,对应的方法实现如下:
1 2 3 4 5 6 7 8 9 public BoltDeclarer setBolt (String id, IRichBolt bolt, Number parallelism_hint) throws IllegalArgumentException { this .validateUnusedId(id); this .initCommon(id, bolt, parallelism_hint); _bolts.put(id, bolt); return new BoltGetter(id); }
流程上与 setSpout
大同小异,不再重复撰述,方法最终会将 bolt 对象记录到 _bolts
属性中,并构造当前 bolt 对应的 BoltGetter 对象。
前面我们分析了 SpoutGetter,知道其作用主要是为 spout 配置相关属性,BoltGetter 的作用同样如此,不过相对于 SpoutGetter 增加了消息分组方式的配置入口,最后同样将属性序列化为 json 格式记录到与组件相对应的 ComponentCommon 对象中。
在完成调用 setSpout
和 setBolt
往 topology 中添加 spout 和 bolt 组件之后,我们需要调用 createTopology
方法创建相应的 StormTopology 对象,该方法的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public StormTopology createTopology () { Map<String, Bolt> boltSpecs = new HashMap<>(); Map<String, SpoutSpec> spoutSpecs = new HashMap<>(); this .maybeAddCheckpointSpout(); for (String boltId : _bolts.keySet()) { IRichBolt bolt = _bolts.get(boltId); bolt = this .maybeAddCheckpointTupleForwarder(bolt); ComponentCommon common = this .getComponentCommon(boltId, bolt); try { this .maybeAddCheckpointInputs(common); this .maybeAddWatermarkInputs(common, bolt); boltSpecs.put(boltId, new Bolt(ComponentObject.serialized_java(Utils.javaSerialize(bolt)), common)); } catch (RuntimeException wrapperCause) { throw wrapperCause; } } for (String spoutId : _spouts.keySet()) { IRichSpout spout = _spouts.get(spoutId); ComponentCommon common = this .getComponentCommon(spoutId, spout); try { spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.javaSerialize(spout)), common)); } catch (RuntimeException wrapperCause) { throw wrapperCause; } } return new StormTopology(spoutSpecs, boltSpecs, new HashMap<String, StateSpoutSpec>()); }
整个方法的执行流程可以概括为:
如果 topology 中存在有状态的 bolt,则为当前 topology 自动添加一个 CheckpointSpout 组件;
遍历处理之前添加到 topology 中的 bolt,采用 Bolt 封装其序列化对象和 ComponentCommon 组件对象;
遍历处理之前添加到 topology 中的 spout,采用 SpoutSpec 封装其序列化对象和 ComponentCommon 组件对象;
构造 StormTopology 对象并返回。
集群环境下拓扑的提交过程
当构建完 topology 之后,我们需要以任务的形式将其提交到 Storm 集群运行。此外,为了方便调试,Storm 也支持通过 LocalCluster 在本地提交运行任务,本节我们主要介绍如何向 Storm 集群提交任务。
Storm 提供了 StormSubmitter 类用于向 Storm 集群提交任务,并提供了两类方法:submitTopology
和 submitTopologyWithProgressBar
。后者是对前者的封装,在原版 Storm 中用于支持显示任务的提交进度,但是这一设计在 JStorm 中被移除,所以两类方法实际上是等价的。接下来我们对 submitTopology
方法的实现进行分析,Storm 为该方法提供了多个重载版本,对应的底层实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 public static void submitTopology (String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException { if (!Utils.isValidConf(stormConf)) { throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable" ); } Map userTotalConf = new HashMap(); userTotalConf.putAll(TopologyBuilder.getStormConf()); userTotalConf.putAll(stormConf); userTotalConf.putAll(Utils.readCommandLineOpts()); Map conf = Utils.readStormConfig(); conf.putAll(stormConf); putUserInfo(conf, stormConf); try { String serConf = Utils.to_json(userTotalConf); if (localNimbus != null ) { LOG.info("Submitting topology " + name + " in local mode" ); localNimbus.submitTopology(name, null , serConf, topology); } else { NimbusClient client = NimbusClient.getConfiguredClient(conf); try { boolean enableDeploy = ConfigExtension.getTopologyHotDeplogyEnable(userTotalConf); boolean isUpgrade = ConfigExtension.isUpgradeTopology(userTotalConf); boolean dynamicUpdate = enableDeploy || isUpgrade; if (topologyNameExists(client, conf, name) != dynamicUpdate) { if (dynamicUpdate) { throw new RuntimeException("Topology with name `" + name + "` does not exist on cluster" ); } else { throw new RuntimeException("Topology with name `" + name + "` already exists on cluster" ); } } submitJar(client, conf); LOG.info("Submitting topology " + name + " in distributed mode with conf " + serConf); if (opts != null ) { client.getClient().submitTopologyWithOpts(name, path, serConf, topology, opts); } else { client.getClient().submitTopology(name, path, serConf, topology); } } finally { client.close(); } } LOG.info("Finished submitting topology: " + name); } }
Storm 任务提交的过程本质上是一个与 nimbus 节点进行 RPC 通信的过程,整体流程可以概括为:
加载与封装配置;
验证当前 topology 的远程状态;
上传 topology 的 jar 文件到 nimbus 节点;
提交 topology 任务。
配置的加载与封装过程会验证配置是否为 json 格式,并聚合多个来源的配置封装成 map 集合。在任务提交之前会验证当前 topology 在远程集群的状态,如果当前操作是热部署或灰度发布,则必须保证对应的 topology 在远程集群已经存在,而对于新提交的 topology 来说,如果远程集群存在同名的 topology 则会禁止提交。
Storm 任务的提交分为两个步骤,首先上传 topology 对应的 jar 文件到 nimbus 服务器,上传成功之后才会调用远程方法通知 nimbus 有新任务加入,需要开始为该 topology 制定运行方案。下面先来看一下 jar 报上传的过程,该过程位于 submitJar
方法中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 private static void submitJar (NimbusClient client, Map conf) { if (submittedJar == null ) { try { LOG.info("Jar not uploaded to master yet. Submitting jar..." ); String localJar = System.getProperty("storm.jar" ); path = client.getClient().beginFileUpload(); String[] pathCache = path.split("/" ); String uploadLocation = path + "/stormjar-" + pathCache[pathCache.length - 1 ] + ".jar" ; List<String> lib = (List<String>) conf.get(GenericOptionsParser.TOPOLOGY_LIB_NAME); Map<String, String> libPath = (Map<String, String>) conf.get(GenericOptionsParser.TOPOLOGY_LIB_PATH); if (lib != null && lib.size() != 0 ) { for (String libName : lib) { String jarPath = path + "/lib/" + libName; client.getClient().beginLibUpload(jarPath); submitJar(conf, libPath.get(libName), jarPath, client); } } else { if (localJar == null ) { throw new RuntimeException("No client app jar found, please upload it" ); } } if (localJar != null ) { submittedJar = submitJar(conf, localJar, uploadLocation, client); } else { client.getClient().finishFileUpload(uploadLocation); } } catch (Exception e) { throw new RuntimeException(e); } } else { LOG.info("Jar has already been uploaded to master. Will not submit again." ); } }
方法首先会获取 topology 对应的 jar 文件名称(项目打包后对应的 jar 文件),然后调用 thrift 方法 beginFileUpload
为待上传的 jar 文件创建存储路径和传输通道,并返回对应的路径值。在 storm.thrift
文件中定义了一个 service 类型的 Nimbus 类,如果你对 thrift 熟悉就应该知道这是一个 service 接口声明,Nimbus 类声明了一些能够与 nimbus 节点进行远程通信的方法,相应方法实现位于 ServiceHandler 类中,可以在该类中找到 beginFileUpload
方法的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public String beginFileUpload () throws TException { String fileLoc = null ; try { String key = UUID.randomUUID().toString(); String path = StormConfig.masterInbox(conf) + "/" + key; FileUtils.forceMkdir(new File(path)); FileUtils.cleanDirectory(new File(path)); fileLoc = path + "/stormjar-" + key + ".jar" ; data.getUploaders().put(fileLoc, Channels.newChannel(new FileOutputStream(fileLoc))); LOG.info("Begin upload file from client to " + fileLoc); return path; } }
方法首先会基于 UUID 为本次需要上传的 jar 文件创建一个唯一的名称标识,然后在 nimbus 本地对应的目录下创建 jar 文件存储路径(如下),同时为该路径创建一个传输通道,并返回该路径(不包含文件名称):
${storm.local.dir}/nimbus/inbox/${key}/stormjar-${key}.jar
接下来就开始执行 jar 文件的上传逻辑,如果我们在自己的代码中提交 topology 时指定了一些依赖包,那么这里首先会上传这些依赖包,然后再上传主程序包。所有的文件上传都位于一个重载版本的 submitJar
方法中,该重载方法会调用远程 uploadChunk
方法执行具体的文件上传操作,并在上传完成之后调用远程 finishFileUpload
方法关闭对应的上传通道。整个过程就是将我们发布机上本地的 topology jar 文件上传到 nimbus 节点对应的本地路径 nimbus/inbox/${key}/stormjar-${key}.jar
下面,其中 key 是一个 UUID 唯一标识。
接下来方法会调用 submitTopology
方法提交 topology 任务,默认会设置 topology 的初始化状态为 ACTIVE。Nimbus 在接收到 RPC 请求之后开始对提交的任务制定运行方案,主要是依据 topology 配置和集群的运行状态为提交的任务分配 task、worker,以及 supervisor。如果成功则返回对应的 topologyId,否则会抛出相应的异常,我们将在下一篇中对整个 topology 任务分配过程进行深入分析。