深入理解 JUC:PriorityBlockingQueue

优先级队列 PriorityQueue 应该是大家都比较熟悉的一个集合组件,本文将要介绍的 PriorityBlockingQueue 是 PriorityQueue 的线程安全版本。PriorityBlockingQueue 底层依赖于数组作为存储结构,最大容量上限是 Integer.MAX_VALUE - 8,所以几乎可以将其视为无界的。同 PriorityQueue 一样,PriorityBlockingQueue 同样引入了堆数据结构来编排队列元素的优先级,默认使用最小堆结构。

此外,由 Blocking 字样我们可以推断出 PriorityBlockingQueue 是一个阻塞队列。PriorityBlockingQueue 实现自 BlockingQueue 接口,并基于 ReentrantLock 锁保证线程安全。不过需要注意的一点是,PriorityBlockingQueue 的阻塞仅针对出队列操作而言,当队列为空时出队列的线程会阻塞等待其它线程往队列中添加新的元素。对于入队列操作来说,因为 PriorityBlockingQueue 定义为无界,所以执行入队列的线程会立即得到响应,如果队列底层数组已满则该线程会尝试对底层数组进行扩容,当底层数据达到容量上限而无法继续扩容时会抛出 OOM 异常。

下面先来了解一下 PriorityBlockingQueue 的字段定义,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, Serializable {

/** 队列默认初始容量 */
private static final int DEFAULT_INITIAL_CAPACITY = 11;

/**
* 队列容量上限
*
* Some VMs reserve some header words in an array.
* Attempts to allocate larger arrays may result in OutOfMemoryError: Requested array size exceeds VM limit
*/
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

/**
* 存储队列元素的数组,按照最小堆组织
*
* Priority queue represented as a balanced binary heap:
* the two children of queue[n] are queue[2*n+1] and queue[2*(n+1)].
* The priority queue is ordered by comparator, or by the elements' natural ordering,
* if comparator is null: For each node n in the heap and each descendant d of n, n <= d.
* The element with the lowest value is in queue[0], assuming the queue is nonempty.
*/
private transient Object[] queue;

/** 队列中元素个数 */
private transient int size;

/** 队列元素比较器,如果为 null 则使用元素自带的比较器 */
private transient Comparator<? super E> comparator;

/** 保证队列操作线程安全的可重入独占锁 */
private final ReentrantLock lock;

/** 记录因为队列为空而阻塞的线程 */
private final Condition notEmpty;

/**
* 扩容标记位,保证同一时间只有一个线程在扩容队列,状态为 0 或 1:
* - 0: 表示当前没有在执行扩容操作
* - 1: 表示当前正在执行扩容操作
*/
private transient volatile int allocationSpinLock;

/** 辅助支持序列化和反序列化 */
private PriorityQueue<E> q;

// ... 省略方法实现

}

PriorityBlockingQueue 默认初始时的底层数组大小设置为 11,并在元素已满时触发扩容操作,字段 PriorityBlockingQueue#allocationSpinLock 用于控制同一时间只有一个线程在执行扩容。当某个线程检测到当前底层数组已满时会基于 CAS 操作尝试将该字段值由 0 改为 1,然后开始执行扩容,并在完成之后重置该标记字段。

字段 PriorityBlockingQueue#comparator 用于指定元素比较器以判定队列元素的优先级,如果该字段为 null,则 PriorityBlockingQueue 会基于元素自带的比较器排列优先级。对于基本类型而言则参考元素的自然顺序,对于自定义对象来说,需要保证这些对象实现了 java.lang.Comparable 接口,否则会抛出 ClassCastException 异常。

核心方法实现

PriorityBlockingQueue 实现自 BlockingQueue 接口,下面针对核心方法的实现逐一进行分析。

添加元素:offer & add & put

针对添加元素的操作,PriorityBlockingQueue 实现了 PriorityBlockingQueue#offerPriorityBlockingQueue#addPriorityBlockingQueue#put 方法,不过后两者都是直接调用了 PriorityBlockingQueue#offer 方法。

此外,该方法的超时版本 PriorityBlockingQueue#offer(E, long, TimeUnit) 也是直接委托给 PriorityBlockingQueue#offer 方法执行,并没有真正实现超时等待机制,这主要是因为 PriorityBlockingQueue 是无界的,所有的添加操作都能够被立即响应,而不会阻塞。

下面展开分析一下 PriorityBlockingQueue#offer 方法的实现,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public boolean offer(E e) {
// 待添加元素不能为 null
if (e == null) {
throw new NullPointerException();
}
final ReentrantLock lock = this.lock;
// 加锁
lock.lock();
int n, cap;
Object[] array;
// 如果队列中的元素个数大于等于队列的容量,则执行扩容操作
while ((n = size) >= (cap = (array = queue).length)) {
this.tryGrow(array, cap); // 扩容
}
try {
// 将待添加元素插入到堆的合适位置(最小堆)
Comparator<? super E> cmp = comparator;
if (cmp == null) {
siftUpComparable(n, e, array);
} else {
// 自定义比较器
siftUpUsingComparator(n, e, array, cmp);
}
// 结点计数加 1
size = n + 1;
// 唤醒一个之前因为队列为空而阻塞的线程
notEmpty.signal();
} finally {
// 释放锁
lock.unlock();
}
return true;
}

PriorityBlockingQueue 同样不允许往其中添加 null 元素,如果待添加的元素值合法则执行:

  1. 加锁,保证同一时间只有一个线程在操作队列;
  2. 判断队列是否已满,如果是则执行扩容操作;
  3. 将元素基于最小堆数据结构的约束插入到底层数据的合适位置;
  4. 队列结点计数加 1;
  5. 因为当前队列至少包含一个元素,所以尝试唤醒一个之前因为队列为空而阻塞的线程;
  6. 释放锁并返回。

继续来看一下上述步骤中的扩容过程,实现位于 PriorityBlockingQueue#tryGrow 方法中,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private void tryGrow(Object[] array, int oldCap) {
// 扩容之前,先释放锁,避免扩容期间阻塞其它线程的出队列、入队列操作
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
if (allocationSpinLock == 0 &&
// 基于 CAS 操作将扩容标记位由 0 改为 1
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) {
try {
// 如果当前队列长度小于 64,则扩容为 2(n + 1),否则扩容为 (1 + 1/2)n
int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : (oldCap >> 1)); // grow faster if small
// 避免队列容量超过允许上限
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE) {
throw new OutOfMemoryError();
}
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array) {
newArray = new Object[newCap];
}
} finally {
// 重置扩容标记
allocationSpinLock = 0;
}
}

// 当前线程扩容失败,则让渡其它线程获取锁
if (newArray == null) {
Thread.yield();
}

// 加锁
lock.lock();

// 替换底层存储为扩容后的数组,并复制元素
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}

在开始执行扩容之前,当前线程会释放持有的锁,以避免在扩容期间阻塞其它线程的出队列操作,然后基于 CAS 操作修改扩容标记位 PriorityBlockingQueue#allocationSpinLock,保证同一时间只有一个线程在执行扩容。一开始数组较小(长度小于 64)时,线程将对底层数组成倍扩容(即 2(n + 1)),然后再按照 50% 的比例进行扩容(即 (1 + 1/2) * n),如果底层数组已经到达容量上限,则会抛出 OOM 异常。

线程在完成扩容操作之后会重置扩容标记,如果有线程在竞争 CAS 时失败则会尝试让渡其它线程获取锁。这里主要是让渡给成功完成扩容操作的线程,因为此时扩容操作还未真正完成,该线程需要尝试获取锁以继续用扩容后的数组替换当前底层数组。

继续回到 PriorityBlockingQueue#offer 方法,如果扩容操作完成或者本次入队列操作无需触发扩容,则接下去线程会将待添加的元素按照最小堆的约束插入到底层数据的合适位置。此时需要区分两种情况,如果在构造 PriorityBlockingQueue 对象时指定了比较器 Comparator,则会调用 PriorityBlockingQueue#siftUpUsingComparator 方法基于该比较器执行最小堆插入操作,否则调用 PriorityBlockingQueue#siftUpComparable 方法按照元素的自然顺序将当前元素插入到最小堆中。

基于数组实现的堆结构,在操作上是比较简单的,读者可以自行参考源码,本文不对最小堆 siftUp*siftDown* 操作展开分析。

获取元素:poll & peek & take

前面几篇介绍的队列都满足 FIFO 的特性,在执行出队列时返回的都是在队列中存活时间最长的元素。对于 PriorityBlockingQueue 而言,结点的顺序则按照优先级进行编排,所以这里获取元素的操作返回的是队列中优先级最高的结点。

针对获取元素的操作,PriorityBlockingQueue 实现了 PriorityBlockingQueue#pollPriorityBlockingQueue#peekPriorityBlockingQueue#take 方法。其中 PriorityBlockingQueue#peek 方法仅获取最小堆堆顶结点元素值,而不移除该结点,实现上比较简单。方法 PriorityBlockingQueue#take 相对于 PriorityBlockingQueue#poll 的区别在于,当队列为空时该方法会无限期阻塞,直到有其它线程往队列中插入新的元素,或者该线程被中断。实现层面,二者大同小异,所以下面以 PriorityBlockingQueue#poll 方法为例展开分析从 PriorityBlockingQueue 中获取元素操作的具体实现。

PriorityBlockingQueue 针对 PriorityBlockingQueue#poll 方法定义了两个版本,区别在于当队列为空时是立即返回还是阻塞等待一段时间,而在实现思路上是一致的。这里以不带超时参数的版本为例展开分析,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public E poll() {
final ReentrantLock lock = this.lock;
// 加锁
lock.lock();
try {
// 出队列,获取最小堆堆顶元素值,并移除堆顶结点,调整最小堆
return this.dequeue();
} finally {
// 释放锁
lock.unlock();
}
}

private E dequeue() {
int n = size - 1;
if (n < 0) {
// 当前队列为空,直接返回 null
return null;
} else {
Object[] array = queue;
// 获取堆顶元素值
E result = (E) array[0];
// 调整堆的结构,以便再次满足最小堆定义
E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null) {
siftDownComparable(0, x, array, n);
} else {
// 自定义比较器
siftDownUsingComparator(0, x, array, n, cmp);
}
// 队列结点计数减 1
size = n;
return result;
}
}

对于优先级队列而言,出队列操作获取到的是队列中优先级最高的元素,因为底层依赖于最小堆实现,所以只需要移除最小堆堆顶结点,并返回结点元素即可。但是因为这样破坏了堆的结构,所以需要调用 shiftDown* 方法从上往下进行调整,以再次满足最小堆结构的约束。

移除元素:remove

针对移除元素的操作,PriorityBlockingQueue 实现了 PriorityBlockingQueue#remove 方法,并提供了有参和无参的版本,其中无参版本实际上是委托给 PriorityBlockingQueue#poll 方法执行的。下面来分析一下有参版本的实现,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public boolean remove(Object o) {
final ReentrantLock lock = this.lock;
// 加锁
lock.lock();
try {
// 获取待删除元素的数组下标
int i = this.indexOf(o);
if (i == -1) {
// 不存在
return false;
}
// 移除元素
this.removeAt(i);
return true;
} finally {
// 释放锁
lock.unlock();
}
}

private void removeAt(int i) {
Object[] array = queue;
int n = size - 1;
// 当前移除的是最后一个元素
if (n == i) { // removed last element
array[i] = null;
}
// 当前移除的是中间元素
else {
// 将数组最后一个位置置为 null,并调整堆的结构以满足最小堆定义
E moved = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
// 自上而下调整堆结构以满足最小堆定义
if (cmp == null) {
siftDownComparable(i, moved, array, n);
} else {
siftDownUsingComparator(i, moved, array, n, cmp);
}
// 自下而上调整堆结构以满足最小堆定义
if (array[i] == moved) {
if (cmp == null) {
siftUpComparable(i, moved, array);
} else {
siftUpUsingComparator(i, moved, array, cmp);
}
}
}
// 队列结点计数减 1
size = n;
}

如果待删除的元素是优先级最低的元素,则只需要将底层数组末尾结点置为 null 即可,否则,对于其它优先级的元素来说,在执行删除之后需要调整堆结构以满足最小堆定义。

其它操作:size & contains

方法 PriorityBlockingQueue#contains 接收一个参数,用于判断队列中是否包含值等于参数的结点。

方法 PriorityBlockingQueue#size 用于返回当前队列中包含的结点个数,因为 PriorityBlockingQueue 已经定义了 PriorityBlockingQueue#size 字段,用于对队列中的结点进行计数,所以该方法只需要返回字段值即可。

总结

本文分析了 PriorityBlockingQueue 的设计与实现。PriorityBlockingQueue 是优先级队列 PriorityQueue 的线程安全版本,基于最小堆实现元素的优先级排列。不同于前面几篇我们介绍的队列的 FIFO 特性,PriorityBlockingQueue 出队列的元素并非是在队列中存活时间最长的元素,而是优先级最高的元素。

PriorityBlockingQueue 的底层虽然依赖于数组作为存储结构,但因为容量上限足够大,所以几乎可以视为无界的。当底层数组存满时,PriorityBlockingQueue 并不会让入队列的线程阻塞等待,而是转去扩容底层数组,以继续容纳新的元素。当然,如果容量到达上限,无法继续扩容时,入队列操作会触发 OOM 异常。

参考

  1. JDK 1.8 源码