深入理解 JUC:PriorityBlockingQueue
优先级队列 PriorityQueue 应该是大家都比较熟悉的一个集合组件,本文将要介绍的 PriorityBlockingQueue 是 PriorityQueue 的线程安全版本。PriorityBlockingQueue 底层依赖于数组作为存储结构,最大容量上限是 Integer.MAX_VALUE - 8
,所以几乎可以将其视为无界的。同 PriorityQueue 一样,PriorityBlockingQueue 同样引入了堆数据结构来编排队列元素的优先级,默认使用最小堆结构。
此外,由 Blocking 字样我们可以推断出 PriorityBlockingQueue 是一个阻塞队列。PriorityBlockingQueue 实现自 BlockingQueue 接口,并基于 ReentrantLock 锁保证线程安全。不过需要注意的一点是,PriorityBlockingQueue 的阻塞仅针对出队列操作而言,当队列为空时出队列的线程会阻塞等待其它线程往队列中添加新的元素。对于入队列操作来说,因为 PriorityBlockingQueue 定义为无界,所以执行入队列的线程会立即得到响应,如果队列底层数组已满则该线程会尝试对底层数组进行扩容,当底层数据达到容量上限而无法继续扩容时会抛出 OOM 异常。
下面先来了解一下 PriorityBlockingQueue 的字段定义,如下:
1 | public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, Serializable { |
PriorityBlockingQueue 默认初始时的底层数组大小设置为 11,并在元素已满时触发扩容操作,字段 PriorityBlockingQueue#allocationSpinLock
用于控制同一时间只有一个线程在执行扩容。当某个线程检测到当前底层数组已满时会基于 CAS 操作尝试将该字段值由 0 改为 1,然后开始执行扩容,并在完成之后重置该标记字段。
字段 PriorityBlockingQueue#comparator
用于指定元素比较器以判定队列元素的优先级,如果该字段为 null,则 PriorityBlockingQueue 会基于元素自带的比较器排列优先级。对于基本类型而言则参考元素的自然顺序,对于自定义对象来说,需要保证这些对象实现了 java.lang.Comparable
接口,否则会抛出 ClassCastException 异常。
核心方法实现
PriorityBlockingQueue 实现自 BlockingQueue 接口,下面针对核心方法的实现逐一进行分析。
添加元素:offer & add & put
针对添加元素的操作,PriorityBlockingQueue 实现了 PriorityBlockingQueue#offer
、PriorityBlockingQueue#add
和 PriorityBlockingQueue#put
方法,不过后两者都是直接调用了 PriorityBlockingQueue#offer
方法。
此外,该方法的超时版本 PriorityBlockingQueue#offer(E, long, TimeUnit)
也是直接委托给 PriorityBlockingQueue#offer
方法执行,并没有真正实现超时等待机制,这主要是因为 PriorityBlockingQueue 是无界的,所有的添加操作都能够被立即响应,而不会阻塞。
下面展开分析一下 PriorityBlockingQueue#offer
方法的实现,如下:
1 | public boolean offer(E e) { |
PriorityBlockingQueue 同样不允许往其中添加 null 元素,如果待添加的元素值合法则执行:
- 加锁,保证同一时间只有一个线程在操作队列;
- 判断队列是否已满,如果是则执行扩容操作;
- 将元素基于最小堆数据结构的约束插入到底层数据的合适位置;
- 队列结点计数加 1;
- 因为当前队列至少包含一个元素,所以尝试唤醒一个之前因为队列为空而阻塞的线程;
- 释放锁并返回。
继续来看一下上述步骤中的扩容过程,实现位于 PriorityBlockingQueue#tryGrow
方法中,如下:
1 | private void tryGrow(Object[] array, int oldCap) { |
在开始执行扩容之前,当前线程会释放持有的锁,以避免在扩容期间阻塞其它线程的出队列操作,然后基于 CAS 操作修改扩容标记位 PriorityBlockingQueue#allocationSpinLock
,保证同一时间只有一个线程在执行扩容。一开始数组较小(长度小于 64)时,线程将对底层数组成倍扩容(即 2(n + 1)
),然后再按照 50% 的比例进行扩容(即 (1 + 1/2) * n
),如果底层数组已经到达容量上限,则会抛出 OOM 异常。
线程在完成扩容操作之后会重置扩容标记,如果有线程在竞争 CAS 时失败则会尝试让渡其它线程获取锁。这里主要是让渡给成功完成扩容操作的线程,因为此时扩容操作还未真正完成,该线程需要尝试获取锁以继续用扩容后的数组替换当前底层数组。
继续回到 PriorityBlockingQueue#offer
方法,如果扩容操作完成或者本次入队列操作无需触发扩容,则接下去线程会将待添加的元素按照最小堆的约束插入到底层数据的合适位置。此时需要区分两种情况,如果在构造 PriorityBlockingQueue 对象时指定了比较器 Comparator,则会调用 PriorityBlockingQueue#siftUpUsingComparator
方法基于该比较器执行最小堆插入操作,否则调用 PriorityBlockingQueue#siftUpComparable
方法按照元素的自然顺序将当前元素插入到最小堆中。
基于数组实现的堆结构,在操作上是比较简单的,读者可以自行参考源码,本文不对最小堆 siftUp*
和 siftDown*
操作展开分析。
获取元素:poll & peek & take
前面几篇介绍的队列都满足 FIFO 的特性,在执行出队列时返回的都是在队列中存活时间最长的元素。对于 PriorityBlockingQueue 而言,结点的顺序则按照优先级进行编排,所以这里获取元素的操作返回的是队列中优先级最高的结点。
针对获取元素的操作,PriorityBlockingQueue 实现了 PriorityBlockingQueue#poll
、PriorityBlockingQueue#peek
和 PriorityBlockingQueue#take
方法。其中 PriorityBlockingQueue#peek
方法仅获取最小堆堆顶结点元素值,而不移除该结点,实现上比较简单。方法 PriorityBlockingQueue#take
相对于 PriorityBlockingQueue#poll
的区别在于,当队列为空时该方法会无限期阻塞,直到有其它线程往队列中插入新的元素,或者该线程被中断。实现层面,二者大同小异,所以下面以 PriorityBlockingQueue#poll
方法为例展开分析从 PriorityBlockingQueue 中获取元素操作的具体实现。
PriorityBlockingQueue 针对 PriorityBlockingQueue#poll
方法定义了两个版本,区别在于当队列为空时是立即返回还是阻塞等待一段时间,而在实现思路上是一致的。这里以不带超时参数的版本为例展开分析,实现如下:
1 | public E poll() { |
对于优先级队列而言,出队列操作获取到的是队列中优先级最高的元素,因为底层依赖于最小堆实现,所以只需要移除最小堆堆顶结点,并返回结点元素即可。但是因为这样破坏了堆的结构,所以需要调用 shiftDown*
方法从上往下进行调整,以再次满足最小堆结构的约束。
移除元素:remove
针对移除元素的操作,PriorityBlockingQueue 实现了 PriorityBlockingQueue#remove
方法,并提供了有参和无参的版本,其中无参版本实际上是委托给 PriorityBlockingQueue#poll
方法执行的。下面来分析一下有参版本的实现,如下:
1 | public boolean remove(Object o) { |
如果待删除的元素是优先级最低的元素,则只需要将底层数组末尾结点置为 null 即可,否则,对于其它优先级的元素来说,在执行删除之后需要调整堆结构以满足最小堆定义。
其它操作:size & contains
方法 PriorityBlockingQueue#contains
接收一个参数,用于判断队列中是否包含值等于参数的结点。
方法 PriorityBlockingQueue#size
用于返回当前队列中包含的结点个数,因为 PriorityBlockingQueue 已经定义了 PriorityBlockingQueue#size
字段,用于对队列中的结点进行计数,所以该方法只需要返回字段值即可。
总结
本文分析了 PriorityBlockingQueue 的设计与实现。PriorityBlockingQueue 是优先级队列 PriorityQueue 的线程安全版本,基于最小堆实现元素的优先级排列。不同于前面几篇我们介绍的队列的 FIFO 特性,PriorityBlockingQueue 出队列的元素并非是在队列中存活时间最长的元素,而是优先级最高的元素。
PriorityBlockingQueue 的底层虽然依赖于数组作为存储结构,但因为容量上限足够大,所以几乎可以视为无界的。当底层数组存满时,PriorityBlockingQueue 并不会让入队列的线程阻塞等待,而是转去扩容底层数组,以继续容纳新的元素。当然,如果容量到达上限,无法继续扩容时,入队列操作会触发 OOM 异常。
参考
- JDK 1.8 源码