深入理解 JUC:LinkedBlockingQueue
上一篇我们分析了无界非阻塞线程安全队列 ConcurrentLinkedQueue 的设计与实现,本篇我们继续分析线程安全队列 LinkedBlockingQueue 的实现机制。由 Blocking 字样可以推断出 LinkedBlockingQueue 是阻塞队列,前面我们介绍过阻塞队列和非阻塞队列在实现上的区别,知道阻塞队列一般是基于锁机制来保证线程安全,本文我们就一起来分析一下 LinkedBlockingQueue 是如何基于锁构建线程安全队列的。
同样由 Linked 关键字我们可以推断出 LinkedBlockingQueue 底层依赖于链表实现,在 LinkedBlockingQueue 的内部实现了一个单链表,用以存放队列元素。其中,结点 Node 类定义如下:
1 | static class Node<E> { |
其中 next 指针的指向分为 3 种情况:
- 指向某个具体的后继结点。
- 指向自己,意味着后继结点为
head.next
。 - 指向 null,说明当前结点是队列的尾结点,没有后继结点。
LinkedBlockingQueue 定义了 head 和 last 指针分别指向队列的头结点和尾结点。此外,LinkedBlockingQueue 还定义了如下字段:
1 | public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, Serializable { |
由上述字段定义可以看出,LinkedBlockingQueue 限制了队列的容量上限,并使用 AtomicInteger 类型字段对队列中的元素个数进行计数。虽然 LinkedBlockingQueue 底层依赖于链表实现,理论上是无界的,但是 LinkedBlockingQueue 在实现上却限制了队列的容量上限(默认为 Integer.MAX_VALUE
)。
此外,针对出队列和入队列操作,LinkedBlockingQueue 分别设置了一把独占可重入锁,即 takeLock 和 putLock,从而保证同一时间只有一个线程执行出队列操作,只有一个线程执行入队列操作,且出队列的线程与入队列的线程彼此之间不相互影响。针对一些阻塞版本的出队列入队列方法,如果队列为空,则出队列线程会被记录到条件队列 notEmpty 中进行等待,如果队列已满,则入队列线程会被记录到条件队列 notFull 中进行等待。
BlockingQueue 接口
BlockingQueue 接口继承自 Queue 接口,用于描述阻塞队列。当队列无法及时响应用户请求时,例如当我们尝试从空队列中获取元素,或者继续往已满的有界队列中添加元素,BlockingQueue 定义了以下 4 种响应形式:
- 抛出异常。
- 立即返回特殊值,例如 null 或 false。
- 无限期阻塞当前请求,直到队列状态变为可用。
- 超时阻塞当前请求,直到队列状态变为可用。
BlockingQueue 接口的定义如下:
1 | public interface BlockingQueue<E> extends Queue<E> { |
针对各方法的含义说明如下:
offer
:往队列中添加元素,如果成功则返回 true,对于有界队列来说,如果队列已满则返回 false,而不是抛出异常。BlockingQueue 同时还声明了超时版本的 offer 方法。add
:往队列中添加元素,如果成功则返回 true,对于有界队列来说,如果队列已满则抛出 IllegalStateException 异常。put
:往队列中添加元素,对于有界队列来说,如果队列已满则阻塞当前请求,期间支持响应中断。poll
:移除队列头结点,并返回结点元素值,如果队列为空则等待指定时间,并在超时时返回 null,期间支持响应中断。take
:仅获取头结点元素值而不删除结点,如果队列为空则阻塞等待,期间支持响应中断。remove
:接收一个参数,从队列中删除值等于该参数的结点,如果存在多个结点满足要求,则删除第一个。contains
:接收一个参数,判断队列中是否存在值等于该参数的结点。remainingCapacity
:返回队列的剩余容量,如果是无界队列,则返回Integer.MAX_VALUE
。drainTo
:从队列中移除所有(或指定个数)结点,并将结点元素放入参数指定的集合中返回,相对于逐个移除更加高效。
核心方法实现
LinkedBlockingQueue 实现自 BlockingQueue 接口,下面针对核心方法的实现逐一进行分析。
添加元素:offer & add & put
针对添加元素的操作,LinkedBlockingQueue 实现了 LinkedBlockingQueue#offer
、LinkedBlockingQueue#add
和 LinkedBlockingQueue#put
方法,其中 LinkedBlockingQueue#add
是对 LinkedBlockingQueue#offer
的封装,并在队列已满时抛出 IllegalStateException 异常。
下面主要展开分析 LinkedBlockingQueue#offer
和 LinkedBlockingQueue#put
方法的实现。首先来看一下 LinkedBlockingQueue#offer
方法,实现如下:
1 | public boolean offer(E e) { |
与 ConcurrentLinkedQueue 一样,LinkedBlockingQueue 同样不允许往其中添加 null 元素。如果队列已满,则上述方法会直接返回 false,表示添加失败,否则创建待添加元素对应的结点对象,并继续执行:
- 加锁,保证同一时间只有一个线程在执行添加操作;
- 再次校验队列是否已满,如果已满则跳转至步骤 5,否则执行
LinkedBlockingQueue#enqueue
方法往队列末端插入结点; - 结点个数计数加 1;
- 如果在完成本次添加操作之后,队列仍然未满,则尝试唤醒一个之前因为队列已满而等待的线程;
- 释放锁;
- 如果本次成功添加了一个元素,则调用
LinkedBlockingQueue#signalNotEmpty
方法尝试唤醒一个之前因为队列为空而等待的线程; - 返回。
其中 LinkedBlockingQueue#signalNotEmpty
方法的实现比较简单,读者可以参考源码实现。这里简单提一下 LinkedBlockingQueue#enqueue
方法,实现如下:
1 | private void enqueue(Node<E> node) { |
在 LinkedBlockingQueue 对象被构造出来时,head 和 last 指针均指向一个元素值为 null 的标记结点。由上述方法的实现可以看出当执行入队列操作时,是将结点赋值给 last 结点的 next 指针,并没有移除队列头部的 null 结点,下文在介绍出队列操作时返回的都是 head.next
结点元素值,理解了上述插入操作的执行过程也就不会疑惑为什么出队列时不是直接返回 head 结点的元素值。
LinkedBlockingQueue 还定义了超时版本的 LinkedBlockingQueue#offer(E, long, TimeUnit)
方法,当队列已满时,该方法会阻塞等待指定的时间。
下面再来看一下 LinkedBlockingQueue#put
方法,相对于上面介绍的 LinkedBlockingQueue#offer
方法,对于有界队列而言,如果队列已满则该方法将无限期阻塞,方法实现如下:
1 | public void put(E e) throws InterruptedException { |
由上述实现可以看出,相对于 LinkedBlockingQueue#offer
方法在队列已满时的直接返回,方法 LinkedBlockingQueue#put
会将当前线程添加到条件队列中等待其它线程释放队列空间。
获取元素:poll & peek & take
针对获取元素的操作,LinkedBlockingQueue 实现了 LinkedBlockingQueue#poll
、LinkedBlockingQueue#peek
和 LinkedBlockingQueue#take
方法,其中 LinkedBlockingQueue#peek
方法仅获取队列头结点元素值,而不移除头结点,实现上比较简单。下面展开分析 LinkedBlockingQueue#poll
和 LinkedBlockingQueue#take
方法的实现机制。
首先来看一下 LinkedBlockingQueue#poll
方法,LinkedBlockingQueue 针对该方法定义了两个版本,区别在于当队列为空时是立即返回还是阻塞等待一段时间,而在实现思路上是一致的。这里以不带超时参数的版本为例展开分析,实现如下:
1 | public E poll() { |
如果队列为空则上述方法会直接返回 null,否则继续执行:
- 加锁,保证同一时间只有一个线程在执行获取操作;
- 再次校验队列是否为空,如果为空则跳转至步骤 5,否则执行
LinkedBlockingQueue#dequeue
方法移除队列头结点,并返回结点元素值; - 结点个数计数减 1;
- 如果在完成本次移除操作之后,队列仍然非空,则尝试唤醒一个之前因为队列为空而等待的线程;
- 释放锁;
- 如果本次成功移除了一个元素,则调用
LinkedBlockingQueue#signalNotFull
方法尝试唤醒一个之前因为队列已满而等待的线程; - 返回。
其中 LinkedBlockingQueue#signalNotFull
方法的实现比较简单,读者可以参考源码实现。前面我们分析了入队列 LinkedBlockingQueue#enqueue
方法,下面来看一下出队列方法,实现如下:
1 | private E dequeue() { |
理解了前面入队列的过程,则上述出队列的实现也就一目了然,只要清楚队列的头结点一直是一个值为 null 的结点,而真正有效的队列头结点是该结点的 next 结点。
LinkedBlockingQueue 还定义了超时版本的 LinkedBlockingQueue#poll(long, TimeUnit)
方法,当队列为空时,该方法会阻塞等待指定的时间。
下面再来看一下 LinkedBlockingQueue#take
方法,相对于上面介绍的 LinkedBlockingQueue#poll
方法,对于有界队列而言,如果队列为空则该方法将无限期阻塞,方法实现如下:
1 | public E take() throws InterruptedException { |
由上述实现可以看出,相对于 LinkedBlockingQueue#poll
方法在队列为空时的直接返回,方法 LinkedBlockingQueue#take
会将当前线程添加到条件队列中等待其它线程添加新的队列元素。
移除元素:remove
针对移除元素的操作,LinkedBlockingQueue 实现了 LinkedBlockingQueue#remove
方法,并提供了有参和无参的版本,其中无参版本实际上是委托给 LinkedBlockingQueue#poll
方法执行的。下面来分析一下有参版本的实现,如下:
1 | public boolean remove(Object o) { |
上述方法接收一个参数,并执行删除元素值等于该参数的结点,如果存在多个满足条件的结点,则删除第一个。在执行删除操作之前会获取 putLock 和 takeLock 两把锁,以防止删除期间有其它线程执行出队列或入队列操作。
其它操作:size & contains
最后来看一下 LinkedBlockingQueue#contains
和 LinkedBlockingQueue#size
方法的实现,前者用于检查队列是否包含值等于参数的结点,实现如下:
1 | public boolean contains(Object o) { |
方法 LinkedBlockingQueue#size
用于返回队列的结点个数,前面已经介绍了 LinkedBlockingQueue 定义了一个 AtomicInteger 类型的字段用于计数队列的结点个数,所以 LinkedBlockingQueue#size
方法能够精确的返回,且几乎没有性能开销,同时在实现上非常简单,如下:
1 | public int size() { |
既然定义一个 AtomicInteger 类型的计数变量有这么多优势,那么不禁要思考为什么上一篇介绍的 ConcurrentLinkedQueue 没有这么做呢?这主要还是因为 ConcurrentLinkedQueue 是非阻塞队列,基于 CAS 机制来保证线程安全,但是 CAS 的缺点在于无法像锁一样同时保证多个操作的原子性,所以无法引入计数原子变量。
总结
本文分析了 LinkedBlockingQueue 的设计与实现,LinkedBlockingQueue 底层依赖于单链表作为存储结构,并基于可重入锁 ReentrantLock 保证线程安全,同时为入队列和出队列分别设置了一把锁以提升操作性能,减少阻塞开销。
相对于上一篇介绍的 ConcurrentLinkedQueue 而言,LinkedBlockingQueue 与其功能相同,但是底层却是两套不同的设计与实现。此外,JUC 还提供了以数组作为底层存储结构的有界阻塞线程安全队列 ArrayBlockingQueue,该组件与本文介绍的 LinkedBlockingQueue 在设计思路上是一致的,同样基于 ReentrantLock 锁保证线程安全,并支持在构造时使用公平锁策略。
总而言之,在线程安全队列的使用上 JUC 给我们提供了多种选择,具体开发时如何确定使用哪个组件还是要取决于具体的应用场景,甚至有必要进行一些压测,以结果引导决策。
参考
- JDK 1.8 源码