JStorm 源码解析:基础线程模型

在具体开始分析 Storm 集群的启动和运行机制之前,我们先来看一下基础的线程模型,在整个 Storm 的实现中有很多地方用到它,所以将其单独拎出来先分析说明一下,后面看到相应的类就大致知道其内在的运行过程啦。

在 Storm 的实现中,有很多实现了 RunnableCallback 类的子类,这些类实例化之后都被传递给了 AsyncLoopThread 对象,示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class MyRunnableCallback extends RunnableCallback {

private static AtomicInteger count = new AtomicInteger();

@Override
public void run() {
System.out.println("[" + count.incrementAndGet() + "] thread-" + Thread.currentThread().getId() + " is running.");
}

@Override
public Object getResult() {
return 1;
}

public static void main(String[] args) {
MyRunnableCallback callback = new MyRunnableCallback();
new AsyncLoopThread(callback);
}
}

上面的例子的执行效果是每间隔 1 秒会执行一遍 run 方法,输出如下:

1
2
3
[1] thread-11 is running.
[2] thread-11 is running.
[3] thread-11 is running.

所以我们可以简单的理解其作用是简单方便的创建一个线程用于循环执行自定义的业务逻辑,接下来看一下相应的源码实现。

RunnableCallback 类实现了 Runnable、Callback,以及 Shutdownable 三个接口,其中 Runnable 是 jdk 自带的接口,后两个接口定义如下:

1
2
3
4
5
6
7
public interface Callback {
<T> Object execute(T... args);
}

public interface Shutdownable {
void shutdown();
}

RunnableCallback 类的完整定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class RunnableCallback implements Runnable, Callback, Shutdownable {

@Override
public void run() { }

@Override
public <T> Object execute(T... args) {
return null;
}

@Override
public void shutdown() { }

public void preRun() { }

public void postRun() { }

public Exception error() {
return null;
}

public Object getResult() {
return null;
}

public String getThreadName() {
return null;
}
}
  • run :用于实现自定义的需要异步循环执行的逻辑,该方法会依据条件被循环调度
  • execute :当线程异常退出时该方法会被调用,用于执行自定义的异常处理逻辑
  • shutdown :当任务被销毁或者正常退出时该方法被调用,用于执行一定的销毁策略
  • preRun :执行 run 方法之前的前置模板方法
  • postRun :执行 run 方法之后的后置模板方法
  • error :用于获取当前任务的错误运行信息,如果存在错误则会中断当前 run 方法的继续调度
  • getResult :该方法用于控制 run 方法的调度,如果返回值为 null 或者 0 则任务会一直循环调度,如果返回值小于 0 则会在执行一次 run 之后退出,如果返回值大于 0 则表示每次调度间隔睡眠的时间(单位:秒)
  • getThreadName :设置当前线程的名称

当我们完成实例化自定义的 RunnableCallback 对象之后,我们需要将其传递给 AsyncLoopThread 类对象用于启动执行。AsyncLoopThread 类提供了多个重载版本的构造函数,但最终调用的都是 AsyncLoopThread#init 方法,该方法的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private void init(RunnableCallback afn, boolean daemon, RunnableCallback kill_fn, int priority, boolean start) {
if (kill_fn == null) {
// 如果没有设置,则默认创建一个
kill_fn = new AsyncLoopDefaultKill();
}

// 采用 AsyncLoopRunnable 对于 afn 和 kfn 进行包装
Runnable runnable = new AsyncLoopRunnable(afn, kill_fn);
thread = new Thread(runnable);
String threadName = afn.getThreadName();
if (threadName == null) {
// 以 afn 的 simpleName 作为线程名称
threadName = afn.getClass().getSimpleName();
}
// 配置线程
thread.setName(threadName);
thread.setDaemon(daemon);
thread.setPriority(priority);
thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
LOG.error("UncaughtException", e);
ThreadUtils.haltProcess(1);
}
});

this.afn = afn;

if (start) {
// 启动线程
thread.start();
}
}

该方法的第一个参数 afn 就是我们传递的自定义的 RunnableCallback 对象;第二个参数 daemon 用于指定当前线程是否以守护线程的模式运行;第三个参数 kill_fn 也是一个 RunnableCallback 类型,当任务异常退出时会调用其 execute 方法;第四个参数 priority 用于指定线程的优先级;第五个参数 start 用于指定是否立即启动任务。

上面的方法中有一个名为 AsyncLoopRunnable 的类,实现了 Runnable 接口,并封装了 afn 和 kill_fn 两个 RunnableCallback 对象,该类的 run 方法实现了整个线程模型的调度机制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public void run() {
if (fn == null) {
LOG.error("fn==null");
throw new RuntimeException("AsyncLoopRunnable no core function ");
}

// 模板方法
fn.preRun();

try {
while (!shutdown.get()) {
// 执行自定义 callback 逻辑
fn.run();

if (shutdown.get()) {
this.shutdown();
return;
}

Exception e = fn.error();
if (e != null) {
throw e;
}

// 获取睡眠时间(单位:秒)
Object rtn = fn.getResult();
if (this.needQuit(rtn)) {
this.shutdown();
return;
}
}
} catch (Throwable e) {
if (shutdown.get()) {
this.shutdown();
} else {
LOG.error("Async loop died!!!" + e.getMessage(), e);
killFn.execute(e);
}
}
}

AsyncLoopRunnable 的 run 方法会循环调度 RunnableCallback 的 run 方法,并在每次执行完成之后检测当前任务是否被 shutdown、是否存在错误运行信息,如果都没有的话则会继续调用 AsyncLoopRunnable#needQuit 方法检查是否需要退出当前任务,该方法会依据 RunnableCallback#getResult 的返回结果决策接下去的运行方式,具体决策过程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private boolean needQuit(Object rtn) {
if (rtn != null) {
long sleepTime = Long.parseLong(String.valueOf(rtn));
if (sleepTime < 0) {
// 小于 0 则退出执行
return true;
} else if (sleepTime > 0) {
// 大于 0 表示要求每次执行中间休息相应的时间(单位:秒)
long now = System.currentTimeMillis();
long cost = now - lastTime;
long sleepMs = sleepTime * 1000 - cost; // 期望睡眠时间 - 中间消耗的时间
if (sleepMs > 0) {
// 还没有达到期望睡眠时间,继续睡眠
JStormUtils.sleepMs(sleepMs);
lastTime = System.currentTimeMillis();
} else {
lastTime = now;
}
}
}
// 为 null 或者 0 都继续执行
return false;
}

整个线程模型的设计和实现比较简单,但是却很实用,推荐大家将其纳入自己的工具箱。对于 Storm 基础线程模型的分析就到此结束,从下一篇开始我们将分三篇分别介绍 nimbus、supervisor,以及 worker 的启动和运行机制。