在具体开始分析 Storm 集群的启动和运行机制之前,我们先来看一下基础的线程模型,在整个 Storm 的实现中有很多地方用到它,所以将其单独拎出来先分析说明一下,后面看到相应的类就大致知道其内在的运行过程啦。
在 Storm 的实现中,有很多实现了 RunnableCallback 类的子类,这些类实例化之后都被传递给了 AsyncLoopThread 对象,示例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public class MyRunnableCallback extends RunnableCallback {
private static AtomicInteger count = new AtomicInteger();
@Override public void run() { System.out.println("[" + count.incrementAndGet() + "] thread-" + Thread.currentThread().getId() + " is running."); }
@Override public Object getResult() { return 1; }
public static void main(String[] args) { MyRunnableCallback callback = new MyRunnableCallback(); new AsyncLoopThread(callback); } }
|
上面的例子的执行效果是每间隔 1 秒会执行一遍 run 方法,输出如下:
1 2 3
| [1] thread-11 is running. [2] thread-11 is running. [3] thread-11 is running.
|
所以我们可以简单的理解其作用是简单方便的创建一个线程用于循环执行自定义的业务逻辑,接下来看一下相应的源码实现。
RunnableCallback 类实现了 Runnable、Callback,以及 Shutdownable 三个接口,其中 Runnable 是 jdk 自带的接口,后两个接口定义如下:
1 2 3 4 5 6 7
| public interface Callback { <T> Object execute(T... args); }
public interface Shutdownable { void shutdown(); }
|
RunnableCallback 类的完整定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public class RunnableCallback implements Runnable, Callback, Shutdownable {
@Override public void run() { }
@Override public <T> Object execute(T... args) { return null; }
@Override public void shutdown() { }
public void preRun() { }
public void postRun() { }
public Exception error() { return null; }
public Object getResult() { return null; }
public String getThreadName() { return null; } }
|
- run :用于实现自定义的需要异步循环执行的逻辑,该方法会依据条件被循环调度
- execute :当线程异常退出时该方法会被调用,用于执行自定义的异常处理逻辑
- shutdown :当任务被销毁或者正常退出时该方法被调用,用于执行一定的销毁策略
- preRun :执行 run 方法之前的前置模板方法
- postRun :执行 run 方法之后的后置模板方法
- error :用于获取当前任务的错误运行信息,如果存在错误则会中断当前 run 方法的继续调度
- getResult :该方法用于控制 run 方法的调度,如果返回值为 null 或者 0 则任务会一直循环调度,如果返回值小于 0 则会在执行一次 run 之后退出,如果返回值大于 0 则表示每次调度间隔睡眠的时间(单位:秒)
- getThreadName :设置当前线程的名称
当我们完成实例化自定义的 RunnableCallback 对象之后,我们需要将其传递给 AsyncLoopThread 类对象用于启动执行。AsyncLoopThread 类提供了多个重载版本的构造函数,但最终调用的都是 AsyncLoopThread#init
方法,该方法的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| private void init(RunnableCallback afn, boolean daemon, RunnableCallback kill_fn, int priority, boolean start) { if (kill_fn == null) { kill_fn = new AsyncLoopDefaultKill(); }
Runnable runnable = new AsyncLoopRunnable(afn, kill_fn); thread = new Thread(runnable); String threadName = afn.getThreadName(); if (threadName == null) { threadName = afn.getClass().getSimpleName(); } thread.setName(threadName); thread.setDaemon(daemon); thread.setPriority(priority); thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { @Override public void uncaughtException(Thread t, Throwable e) { LOG.error("UncaughtException", e); ThreadUtils.haltProcess(1); } });
this.afn = afn;
if (start) { thread.start(); } }
|
该方法的第一个参数 afn 就是我们传递的自定义的 RunnableCallback 对象;第二个参数 daemon 用于指定当前线程是否以守护线程的模式运行;第三个参数 kill_fn 也是一个 RunnableCallback 类型,当任务异常退出时会调用其 execute 方法;第四个参数 priority 用于指定线程的优先级;第五个参数 start 用于指定是否立即启动任务。
上面的方法中有一个名为 AsyncLoopRunnable 的类,实现了 Runnable 接口,并封装了 afn 和 kill_fn 两个 RunnableCallback 对象,该类的 run 方法实现了整个线程模型的调度机制:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| public void run() { if (fn == null) { LOG.error("fn==null"); throw new RuntimeException("AsyncLoopRunnable no core function "); }
fn.preRun();
try { while (!shutdown.get()) { fn.run();
if (shutdown.get()) { this.shutdown(); return; }
Exception e = fn.error(); if (e != null) { throw e; }
Object rtn = fn.getResult(); if (this.needQuit(rtn)) { this.shutdown(); return; } } } catch (Throwable e) { if (shutdown.get()) { this.shutdown(); } else { LOG.error("Async loop died!!!" + e.getMessage(), e); killFn.execute(e); } } }
|
AsyncLoopRunnable 的 run 方法会循环调度 RunnableCallback 的 run 方法,并在每次执行完成之后检测当前任务是否被 shutdown、是否存在错误运行信息,如果都没有的话则会继续调用 AsyncLoopRunnable#needQuit
方法检查是否需要退出当前任务,该方法会依据 RunnableCallback#getResult
的返回结果决策接下去的运行方式,具体决策过程如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| private boolean needQuit(Object rtn) { if (rtn != null) { long sleepTime = Long.parseLong(String.valueOf(rtn)); if (sleepTime < 0) { return true; } else if (sleepTime > 0) { long now = System.currentTimeMillis(); long cost = now - lastTime; long sleepMs = sleepTime * 1000 - cost; if (sleepMs > 0) { JStormUtils.sleepMs(sleepMs); lastTime = System.currentTimeMillis(); } else { lastTime = now; } } } return false; }
|
整个线程模型的设计和实现比较简单,但是却很实用,推荐大家将其纳入自己的工具箱。对于 Storm 基础线程模型的分析就到此结束,从下一篇开始我们将分三篇分别介绍 nimbus、supervisor,以及 worker 的启动和运行机制。