深入理解 JUC:CyclicBarrier

上一篇我们分析了 CountDownLatch 同步器的设计与实现,在日常开发中还有另外一个同步器组件 CyclicBarrier 常常令我们容易混淆。在文章 理清 CountDownLatch 与 CyclicBarrier 的区别 中我们曾经总结过二者的区别,并通过示例演示了各自的应用场景。上一篇,我们分析了 CountDownLatch 的实现原理,本文我们继续从源码层面来分析 CyclicBarrier 的设计与实现。

CyclicBarrier 实现内幕

前面介绍的 CountDownLatch 同步器是基于 AQS 实现的,而本文要介绍的 CyclicBarrier 则没有直接继承 AQS 的 AbstractQueuedSynchronizer 抽象类,而是基于 ReentrantLock 锁进行实现。首先来看一下 CyclicBarrier 的字段定义,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class CyclicBarrier {

/** 支撑 CyclicBarrier 的重入锁 */
private final ReentrantLock lock = new ReentrantLock();
/** 条件队列,已经到达屏障的线程会在条件队列中等待其它线程 */
private final Condition trip = lock.newCondition();
/** 参与的线程数 */
private final int parties;
/** 当所有线程都到达屏障时的回调函数 */
private final Runnable barrierCommand;
/** 当前年代对象 */
private Generation generation = new Generation();
/** 当前剩余未完成的线程数 */
private int count;

// ... 省略方法定义

}

上述各个字段的含义如代码注释,这里我们进一步解释一下 generation 字段,该字段为 Generation 类型,用于表示当前 CyclicBarrier 同步器的年代信息。Generation 内部类定义如下:

1
2
3
private static class Generation {
boolean broken = false;
}

当新建一个 CyclicBarrier 对象时会初始化 CyclicBarrier#generation 字段。此外,当所有参与的线程都到达屏障后(也称 tripped),或者 CyclicBarrier 被重置(即调用 CyclicBarrier#reset 方法)时,会新建一个 Generation 对象赋值给 CyclicBarrier#generation 字段,表示年代的更替。

Generation 定义的 Generation#broken 属性用于标识当前屏障是否被打破。当 CyclicBarrier 被重置,或者参与到该屏障的某个线程被中断、等待超时,亦或是执行回调函数发生异常,都会导致屏障被打破。破损的屏障(即 broken=true)会导致当前参与等待的线程,以及已经处于等待状态的线程抛出 BrokenBarrierException 异常,并退出当前屏障进程。

因为 CyclicBarrier 的复用性,导致在程序运行期间可能并存多个年代信息,但是任何时刻只有一个年代对象是活跃的,剩余的年代对象对应的 CyclicBarrier 要么是已经用完的(tripped),要么就是已经破损的。

介绍完了字段定义,下面来分析一下 CyclicBarrier 的方法实现,首先来看一下构造方法。CyclicBarrier 定义了两个构造方法,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
public CyclicBarrier(int parties) {
this(parties, null);
}

public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) {
throw new IllegalArgumentException();
}
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}

其中 parties 参数用于指定当前参与的线程数,参数 barrierAction 用于指定当所有参与的线程都到达屏障时的回调逻辑。你可能有些疑问,既然设置了 parties 字段,为什么还要设置一个 count 字段呢?

考虑 CyclicBarrier 是可重用的,所以需要有一个字段记录参与线程的数目,即 parties 字段,而 count 字段初始值等于 parties 字段值,但是在运行期间其值是会随着参与线程逐一到达屏障而递减的,所以 count 值始终记录的是当前未到达屏障的线程数。当 CyclicBarrier 被重置时,我们需要依据 parties 字段值来重置 count 字段值。

继续来看一下 CyclicBarrier 除构造方法以外的剩余方法实现,主要分析一下 CyclicBarrier#await 方法和 CyclicBarrier#reset 方法。首先来看一下 CyclicBarrier#reset 方法,当我们希望复用 CyclicBarrier 对象时可以调用该方法,用于重置 count 值、年代信息,并唤醒所有位于条件队列中等待的线程。方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
this.breakBarrier(); // break the current generation
this.nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}

private void breakBarrier() {
// 标识当前屏障被打破
generation.broken = true;
// 重置 count 字段值
count = parties;
// 唤醒所有等待的线程
trip.signalAll();
}

private void nextGeneration() {
// 唤醒所有等待的线程
trip.signalAll();
// 重置 count 值
count = parties;
generation = new Generation();
}

再来看一下 CyclicBarrier#await 方法,该方法用于阻塞当前线程,以在屏障处等待其它线程到达,CyclicBarrier 还为该方法定义了超时等待版本。当一个线程因调用 CyclicBarrier#await 方法进入等待状态时,该线程将会在满足以下条件之一时退出等待状态:

  1. 所有参与的线程都已经到达了屏障。
  2. 当前线程被中断,或者其它处于等待状态的线程被中断。
  3. 如果启用了超时机制,并且某个参与的线程等待超时。
  4. CyclicBarrier 被重置。

方法 CyclicBarrier#await 的普通版本和超时版本在实现上都是直接委托给 CyclicBarrier#dowait 方法执行,所以下面主要来分析一下该方法,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
final ReentrantLock lock = this.lock;
// 加锁
lock.lock();
try {
// 获取当前年代信息
final Generation g = generation;

// 当前屏障被打破,抛出异常
if (g.broken) {
throw new BrokenBarrierException();
}

// 当前线程被中断,打破屏障,并唤醒所有等待的线程
if (Thread.interrupted()) {
this.breakBarrier();
throw new InterruptedException();
}

int index = --count;
// 如果 count 值为 0,说明所有的线程都已经到达屏障
if (index == 0) { // tripped
boolean ranAction = false;
try {
// 如果设置了回调,则执行
final Runnable command = barrierCommand;
if (command != null) {
command.run();
}
ranAction = true;
// 唤醒所有等待的线程,并重置屏障
this.nextGeneration();
return 0;
} finally {
// 如果执行回调异常
if (!ranAction) {
this.breakBarrier();
}
}
}

// count 值不为 0,说明存在还未到达屏障的线程,则进入条件队列等待

// loop until tripped, broken, interrupted, or timed out
for (; ; ) {
try {
if (!timed) {
// 进入条件队列等待
trip.await();
} else if (nanos > 0L) {
// 进入条件队列超时等待
nanos = trip.awaitNanos(nanos);
}
} catch (InterruptedException ie) {
// 当前线程被中断,响应中断
if (g == generation && !g.broken) {
this.breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not been interrupted,
// so this interrupt is deemed to "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}

// 屏障被打破
if (g.broken) {
throw new BrokenBarrierException();
}

// 当前 CyclicBarrier 已经被重置
if (g != generation) {
return index;
}

// 等待超时
if (timed && nanos <= 0L) {
this.breakBarrier();
throw new TimeoutException();
}
}
} finally {
// 释放锁
lock.unlock();
}
}

由上述实现我们可以总结线程在调用 CyclicBarrier#await 方法时的整体执行流程。如果当前线程不是最后一个到达屏障的线程(递减 count 值之后仍然大于 0),则调用 Condition#await 方法(或超时版本)将当前线程添加到条件队列中等待。如果当前线程是最后一个到达屏障的线程(递减 count 值之后为 0),则在线程到达屏障后执行:

  1. 如果指定了回调逻辑,则执行该回调,如果期间发生任何异常,则打破屏障、重置 count 值,并唤醒条件队列中所有等待的线程;
  2. 否则,继续调用 CyclicBarrier#nextGeneration 方法唤醒条件队列中所有等待的线程,并重置 count 值和年代信息。

在上述过程中如果当前线程或处于等待状态的线程被中断、屏障被打破、年代信息发生变化,或者等待超时(如果允许的话),则线程将会从 Condition#await 方法中退出,即当前屏障失效。

总结

最后总结一下,CyclicBarrier 在实现上虽然没有直接基于 AQS,但是组合了 ReentrantLock 锁,所以我们仍然可以将其视为基于 AQS 的同步器实现。CyclicBarrier 在被构造时需要指定参与的线程数目,当参与的线程调用 CyclicBarrier#await 方法时,如果该线程不是最后一个到达屏障的线程,则会将其打入条件队列进行等待。当参与的最后一个线程到达屏障时,该线程会唤醒所有在屏障前面等待的线程继续往下执行。

CyclicBarrier 和 CountDownLatch 虽然都采用了计数器机制,但是区别在于,CountDownLatch 的计数器是由其它线程操作减值的,这些操作的线程在操作完成之后仍然会继续往下执行,并不会参与等待;而 CyclicBarrier 的计数器是由参与等待的线程操作减值的,这些线程在操作完成之后后在屏障前面阻塞等待。所以最终的表象就是 CountDownLatch 在等待其它线程操作完成,而 CyclicBarrier 在相互等待彼此操作完成。

参考

  1. JDK 1.8 源码
  2. The java.util.concurrent Synchronizer Framework