深入理解 JUC:DelayQueue

延迟队列 DelayQueue 用于存放具有过期属性的元素,被添加到 DelayQueue 中的元素只有在到达过期时间之后才会出队列,常用于延迟任务调度。DelayQueue 本质上是一个无界的阻塞队列,底层依赖于优先级队列 PriorityQueue 作为存储结构,并使用 ReentrantLock 锁保证线程安全。

DelayQueue 的字段定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {

/** 保证线程安全的可重入独占锁 */
private final transient ReentrantLock lock = new ReentrantLock();

/** 底层存储结构,优先级队列 */
private final PriorityQueue<E> q = new PriorityQueue<E>();

/** Leader-Follower 模式,用于记录角色为 leader 的线程对象 */
private Thread leader = null;

/** 因为队列为空,或元素未到期而阻塞的线程 */
private final Condition available = lock.newCondition();

// ... 省略方法定义

}

重点分析一下 DelayQueue#leader 字段设置的意图,该字段用于记录当前角色为 leader 的线程对象。当执行出队列操作(DelayQueue#takeDelayQueue#poll(long, TimeUnit) 方法)时,如果 DelayQueue#leader 字段为 null,即不存在 leader 线程,且有未到期的延迟元素,则会将当前线程设置成 leader 角色,并等待该元素到期,以减少不必要的等待时间,保证延迟元素能够在到期时及时被响应。

设想如果不这样设计,那么线程在遇到队列中没有到期的延迟元素时应该怎么办呢?可以采取以下 3 种策略:

  1. 进入忙循环轮询。
  2. 进入条件队列等待,并稍后由其它线程唤醒。
  3. 先退出当前方法,等待后续再次执行出队列操作。

可以看出,这些策略要么是消耗 CPU 资源,要么就是无法对队列中的元素在到期时及时出队列,而引入 leader 角色正好能够避免了这些问题。在某个线程以 leader 的身份等待优先级最高的延迟元素到期时,其它线程在发现队列中没有到期的元素时会以 follower 角色无限期等待。而在 leader 线程从等待状态退出时,它会主动放弃自己的 leader 角色,并唤醒一个正在处于等待状态的 follower 线程,该线程将有机会晋升成为新的 leader。

在 leader 线程等待期间,有可能会插入优先级更高的元素,这个时候就需要剥夺该线程的 leader 角色,以提供其它线程成为 leader 的机会,继而保证刚刚新插入的元素能够在到期时及时被响应。否则就需要等到当前 leader 线程退出等待状态之后或者有新的线程请求获取元素时才有机会出队列,这样可能存在较大的延迟。

核心方法实现

DelayQueue 实现自 BlockingQueue 接口,下面针对核心方法的实现逐一进行分析。不过在开始分析之前,我们先来介绍一下 java.util.concurrent.Delayed 接口,DelayQueue 要求添加到其中的元素必须实现该接口。Delayed 接口定义如下:

1
2
3
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}

该接口继承自 Comparable 接口,所以添加到 DelayQueue 中的元素都是可比较的。方法 Delayed#getDelay 接收一个 TimeUnit 类型的单位值,用于返回当前延迟元素的剩余到期时间,如果小于等于 0 则说明该元素已经到期。

添加元素:offer & add & put

针对添加元素的操作,DelayQueue 实现了 DelayQueue#offerDelayQueue#addDelayQueue#put 方法,不过后两者都是直接调用了 DelayQueue#offer 方法。

此外,该方法的超时版本 DelayQueue#offer(E, long, TimeUnit) 也是直接委托给 DelayQueue#offer 方法执行,并没有真正实现超时等待机制。这主要是因为 DelayQueue 是无界的,所有的添加操作都能够被立即响应,而不会阻塞。

下面展开分析一下 DelayQueue#offer 方法的实现,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
// 加锁
lock.lock();
try {
// 往队列中添加元素
q.offer(e);
// 当前添加的元素是队列中最先过期的
if (q.peek() == e) {
// 清空 leader,保证该元素能够及时出队列
leader = null;
// 唤醒因元素均为到期或队列为空而等待的线程
available.signal();
}
return true;
} finally {
// 释放锁
lock.unlock();
}
}

DelayQueue 不允许向其中添加值为 null 的元素,这主要由优先级队列 PriorityQueue 保证。如果待添加的元素值合法,则执行:

  1. 加锁,保证同一时间只有一个线程在操作队列;
  2. 将待添加元素插入到 DelayQueue 中;
  3. 检查 DelayQueue 中优先级最高的的元素是否是刚刚新加入的元素;
  4. 如果是则剥夺当前 leader 线程的 leader 角色,并唤醒一个之前因为队列中的元素均未到期或队列为空而等待的线程;
  5. 释放锁并返回。

为什么当队列中插入了一个优先级最高的元素时需要剥夺当前 leader 线程的 leader 角色,并唤醒一个处于等待的 follower 线程呢?

其实在前面也有所提及,假设现在 DelayQueue 中最先过期的元素还有 10 秒到期,则 leader 线程会等待 10 秒后再次尝试出队列,其它 follower 线程因为检测到当前已有线程成为 leader,所以在发现没有已到期的元素时会等待。假设现在有一个还有 5 秒到期的元素插入了进来,如果在该元素到期之后没有新的线程来请求出队列,则该元素将不能及时被响应,直到 leader 线程退出等待,即使此时有相当数量的 follower 线程在等待元素到期。

获取元素:poll & peek & take

针对获取元素的操作,DelayQueue 实现了 DelayQueue#pollDelayQueue#peekDelayQueue#take 方法。其中 DelayQueue#peek 方法在获取到锁的基础上直接调用了 PriorityQueue#peek 方法,仅获取 DelayQueue 中最先到期的元素(获取时可能还未到期),而不移除该元素,实现上比较简单。

方法 DelayQueue#take 相对于 DelayQueue#poll 的区别在于,当队列为空或没有到期的元素时该方法会无限期阻塞,直到有元素到期或该线程被中断,而 DelayQueue#poll 方法在相同场景下则会立即返回 null。

下面分别展开分析这两个方法的实现,首先来看一下 DelayQueue#poll 方法,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public E poll() {
final ReentrantLock lock = this.lock;
// 获取锁
lock.lock();
try {
// 获取队列中最先过期的元素
E first = q.peek();
// 队列为空,或者元素还未到期,立即返回 null
if (first == null || first.getDelay(NANOSECONDS) > 0) {
return null;
}
// 当前元素已过期,移除并返回
else {
return q.poll();
}
} finally {
// 释放锁
lock.unlock();
}
}

上述方法会检查 DelayQueue 中是否有已经到期的元素,如果有则将该元素出队列并返回,否则,如果队列为空或没有已经到期的元素,则立即返回 null。针对 DelayQueue#poll 方法,DelayQueue 还提供了超时版本 DelayQueue#poll(long, TimeUnit),当队列为空或没有已经到期的元素时等待指定时间。

再来看一下 DelayQueue#take 方法的实现,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 获取锁,支持响应中断
lock.lockInterruptibly();
try {
for (; ; ) {
// 获取最先过期的元素
E first = q.peek();
// 队列为空,则等待
if (first == null) {
available.await();
}
// 队列非空
else {
// 如果当前元素已经过期,则出队列
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0) {
return q.poll();
}

/* 当前元素还未过期 */

first = null; // don't retain ref while waiting

// 已经有其它线程成为 leader,则等待
if (leader != null) {
available.await();
}
// 没有 leader 线程,将自己设置为 leader 角色
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 等待 delay 纳秒
available.awaitNanos(delay);
} finally {
// 如果等待期间自己的 leader 角色未被剥夺,则在等待完成之后主动放弃
if (leader == thisThread) {
leader = null;
}
}
}
}
}
} finally {
// 如果队列不为空,则唤醒一个之前因为队列为空而等待的线程
if (leader == null && q.peek() != null) {
available.signal();
}
// 释放锁
lock.unlock();
}
}

方法 DelayQueue#take 在获取到锁之后会先检查队列是否为空,如果为空则等待,否则执行:

  1. 如果 DelayQueue 中优先级最高的元素已经到期,则出队列并返回该元素;
  2. 否则,如果当前已经有 leader 线程,则等待;
  3. 如果当前没有 leader 线程,则将自己设置为 leader 角色,并等待队列中优先级最高的元素到期。

如果 DelayQueue 中优先级最高的元素到期,或者等待期间被中断,则当前 leader 线程会主动放弃自己的 leader 角色,以给其它 follower 线程机会。当然在等待期间,当前线程的 leader 角色也可能会被剥夺,前面我们在分析 DelayQueue#offer 方法时已经介绍过,当等待期间有其它优先级更高的元素插入进来时,执行插入的线程会剥夺当前 leader 线程的 leader 角色,以便让刚刚插入的优先级更高的元素能够在到期时及时出队列。

在整个 DelayQueue#take 方法执行的最后,如果 DelayQueue 非空,且当前没有线程成为 leader,则会唤醒一个之前因为队列为空而阻塞的 follower 线程。这里限制 leader == null 主要是防止在有 leader 存在的前提下,被唤醒的线程会因为队列中没有到期的元素而再次等待。

移除元素:remove

针对移除元素的操作,DelayQueue 实现了 DelayQueue#remove 方法,并提供了有参和无参的版本,其中无参版本实际上是委托给 DelayQueue#poll 方法执行的。下面来分析一下有参版本的实现,如下:

1
2
3
4
5
6
7
8
9
10
11
12
public boolean remove(Object o) {
final ReentrantLock lock = this.lock;
// 加锁
lock.lock();
try {
// 从优先级队列中移除元素
return q.remove(o);
} finally {
// 释放锁
lock.unlock();
}
}

DelayQueue 移除指定元素的操作在获取到锁的前提下,交由 PriorityQueue 执行,实现上比较简单。

DelayQueue 的 DelayQueue#size 方法在实现上与 DelayQueue#remove 方法思路相同,不再展开。但需要注意的一点是,方法 DelayQueue#size 所返回的值并不仅仅包含那些未到期的元素,也可能包含一些已经到期而未被从队列中移除的元素,一种可能是这些元素未被及时响应,另外一种可能就是线程仅获取了元素值,而没有移除对应的结点,例如调用了 DelayQueue#peek 方法。

总结

本文分析了 DelayQueue 的设计与实现。DelayQueue 相对于前面介绍的队列的特别之处在于引入了时间属性,只有在元素到期时才会被出队列。在实现上,DelayQueue 底层依赖于优先级队列 PriorityQueue 作为存储结构,并基于 ReentrantLock 锁保证线程安全,同时巧妙设计了 Leader-Follower 模式来保证延迟元素在到期时能够及时被响应。

参考

  1. JDK 1.8 源码