深入理解 JUC:SynchronousQueue
本文我们一起来分析一下 SynchronousQueue 的设计与实现。不同于前面介绍的一系列线程安全队列,SynchronousQueue 从真正意义上来说并不能算是一个队列,而将其理解为一个用于线程之间通信的组件更为恰当。SynchronousQueue 没有容量的概念,一个线程在执行完入队列操作之后,必须等待另外一个线程与之匹配完成出队列后方可继续再次入队列,反之亦然。此外,有别于我们通常理解的队列中的结点只承载元素,SynchronousQueue 中的结点还需要附着对应的操作线程,这些线程在对应的结点上等待被匹配(fulfill)。
SynchronousQueue 实现自 BlockingQueue 接口,底层基于 LockSupport 工具类 实现线程的阻塞和唤醒操作,并依赖 CAS 保证线程安全。在构造 SynchronousQueue 对象时,允许通过参数指定是否启用公平模式。SynchronousQueue 基于 Dual Stack 数据结构实现非公平的线程通信,基于 Dual Queue 数据结构实现公平的线程通信。SynchronousQueue 的公平模式因为减少了线程之间的冲突,在竞争频繁的场景下反而具备更高的性能,而非公平模式能够更好的维持线程局部性(thread locality),减少线程上下文切换的开销。
SynchronousQueue 示例
本小节我们以“生产者-消费者”示例演示 SynchronousQueue 的基本使用,在示例中我们设置了一个生产者和两个消费者,以展示 SynchronousQueue 公平性特征。示例实现如下(省略了异常处理):
1 | private static BlockingQueue<Integer> queue = new SynchronousQueue<>(true); |
运行输出如下:
1 | Producer produce: 0 |
可以看到,当生产者往 SynchronousQueue 中插入一个元素之后,生产者线程会等待消费者完成消费,而消费者线程在完成消费之后会等待生产者生产。SynchronousQueue 的公平性特性尽可能保证了消费者 A 和 B 能够交替执行消费操作。
在上述示例中,如果我们将 Producer 入队列的方法由 put 改为 offer,那么在 Consumer 入队列成功之前,Producer 始终不能入队列成功,这对于一般的队列而言显得有些奇怪。实际上,这里说的不能成功入队列不够准确,要知道 offer 是一类带有超时机制的方法,也就是说当 Producer 在将某个元素执行入队列之后,它希望有一个 Consumer 能够在自己期望的时间内与该元素进行匹配,否则就只能返回 false,从表象上来看就是没有入队列成功。实际应用中我们需要考虑此类表象是否符合自己的业务场景,如果不满足则可以考虑使用 put 方法执行入队列操作。
核心方法实现
SynchronousQueue 实现自 BlockingQueue 接口,但并未对接口中声明的方法全部支持,例如 SynchronousQueue 的 SynchronousQueue#peek
方法就始终返回 null,在使用时推荐先阅读 API 文档,避免影响程序的正确性。本文主要分析 SynchronousQueue 的实现机制,所以下面重点来看一下 SynchronousQueue 已实现的出队列和入队列操作。
前面我们提及到 SynchronousQueue 内部基于 Dual Stack 和 Dual Queue 数据结构实现,在 SynchronousQueue 中定义了一个 Transferer 抽象类,该类抽象了 Dual Stack 和 Dual Queue 数据结构的实现,定义如下:
1 | abstract static class Transferer<E> { |
SynchronousQueue 的出队列和入队列操作均委托给 Transferer#transfer
方法执行(如下),该方法接收 3 个参数,其中参数 e 表示待添加到队列中的元素值,对于出队列操作来说,e 始终等于 null;参数 timed 用于设置当前操作是否具备超时策略,如果是则需要使用参数 nanos 参数指定超时时间。
SynchronousQueue#put(E e)
->transferer.transfer(e, false, 0)
SynchronousQueue#offer(E)
->transferer.transfer(e, true, 0)
SynchronousQueue#offer(E, long, TimeUnit)
->transferer.transfer(e, true, unit.toNanos(timeout))
SynchronousQueue#take
->transferer.transfer(null, false, 0)
SynchronousQueue#poll()
->transferer.transfer(null, true, 0)
SynchronousQueue#poll(long, TimeUnit)
->transferer.transfer(null, true, unit.toNanos(timeout))
针对 Dual Stack 和 Dual Queue 数据结构,SynchronousQueue 分别定义了 TransferStack 和 TransferQueue 实现类,下面的小节将针对这两个类的实现机制展开分析。
在开始之前,我们先对 匹配 一词在 SynchronousQueue 中的含义进行解释,在下面的章节中将多次提及匹配的概念。我们大致已经了解到 SynchronousQueue 在内部基于栈或队列实现线程间的交互,以“生产者-消费者”为例,如果使用的是栈结构(队列亦如此),当生产者往 SynchronousQueue 中插入一个元素时,该生产者线程在插入成功之后并不会立即返回,而是等待消费者前来消费。当消费者执行消费时发现栈上正好有生产者在等待,于是执行消费逻辑,也称为开始执行匹配(fulfill)进程,将当前消费者与生产者匹配成一对儿纷纷出栈。
Dual Stack
针对 Dual Stack 数据结构,SynchronousQueue 实现了 TransferStack 类。TransferStack 继承自 Transferer 抽象类,并定义了 SNode 类描述栈上的结点。针对结点的运行模式,TransferStack 定义了 3 个 int 类型的常量字段予以描述,如下:
- REQUEST:标识未匹配的消费者结点。
- DATA:标识未匹配的生产者结点。
- FULFILLING:标识结点正在执行匹配操作。
栈在运行期间要么为空,要么存放着一个或多个未匹配的消费者结点或生产者结点,对应的消费者或生产者线程依附在具体的结点上等待。一个栈上不可能同时共存未匹配的消费者结点和未匹配的生产者结点,也就是说同一时间栈上所有结点的运行模式(即 SNode#mode
字段值)都应该是一致的,除了栈顶结点可能会因为正在执行匹配进程而附加 FULFILLING 状态。
SNode 类的字段定义如下:
1 | static final class SNode { |
各字段的含义如代码注释,我们将在下面分析 TransferStack#transfer
方法实现时一并分析 SNode 中定义的方法,并对各个字段的含义结合具体场景做进一步介绍。
前面在介绍 Transferer 抽象类时,我们知道该抽象类仅声明了一个方法,即 Transferer#transfer
方法,该方法也是整个 SynchronousQueue 中最核心的实现。在开始分析 TransferStack 之于该方法的实现之前,我们先从整体出发,感知一下 TransferStack 的运行流程。
以“生产者-消费者”为例,假设当前有 3 个生产者依次执行往 SynchronousQueue 中插入元素,执行的顺序为 1 -> 2 -> 3
,则入栈之后得到的栈结构如下:
1 | 3 -> 2 -> 1 -> null |
入栈后的 3 个生产者线程将在栈对应结点上等待。如果来了一个消费者执行出队列操作,此时消费者将与 head 结点上的生产者进行匹配,匹配成功之后得到的栈结构如下:
1 | 2 -> 1 -> null |
此时剩下的生产者线程将继续等待,期间可以允许新的消费者出队列,也可以允许新的生产者入队列。
上述过程就是 TransferStack#transfer
方法的核心执行逻辑,对此有了一个大概的感知之后,下面来深入分析 TransferStack#transfer
方法的具体实现。实际上在 TransferStack#transfer
方法的开头,作者已经对整个方法的运行流程给出了直观的概括,摘录如下:
If apparently empty or already containing nodes of same mode, try to push node on stack and wait for a match, returning it, or null if cancelled.
If apparently containing node of complementary mode, try to push a fulfilling node on to stack, match with corresponding waiting node, pop both from stack, and return matched item. The matching or unlinking might not actually be necessary because of other threads performing action 3:
If top of stack already holds another fulfilling node, help it out by doing its match and/or pop operations, and then continue. The code for helping is essentially the same as for fulfilling, except that it doesn’t return the item.
方法 TransferStack#transfer
实现如下:
1 | E transfer(E e, boolean timed, long nanos) { |
上述实现中 for 循环内部的 if ... else if ... else
控制结构分别对应作者给出的 3 段注释(已在代码中标出),其中场景 3 主要是对场景 2 的辅助,下面重点分析场景 1 和场景 2 的实现和执行流程。
首先来看一下 场景 1 ,此时栈为空,或者栈中等待的线程运行模式与当前线程的运行模式相同,此时需要将结点入栈,并让当前线程在结点上等待。执行流程可以概括为:
- 如果设置了超时且已经到期,则顺带判断 head 结点是否被取消,如果是则后移 head 指针并进入下一轮循环,否则返回 null;
- 否则新建一个包含待添加元素 e 的结点入栈,并执行
TransferStack#awaitFulfill
方法让当前线程在该结点上等待匹配(或被取消); - 如果在等待期间被取消,则清理栈上的无效结点,并返回 null;
- 否则说明结点被成功匹配,如果当前线程是消费者线程则返回匹配结点的元素值,如果当前线程是生产者线程则返回刚刚添加的元素值。
下面利用图示演示上述执行流程。假设当前操作线程是一个生产者,期望将元素 3 插入到 SynchronousQueue 中,并且当前栈中已经包含两个处于等待状态的生产者(如下图 1 所示)。因为当前线程与栈中等待的线程模式相同(均为 DATA),所以新建一个元素值为 3 的结点入栈(如下图 2 所示),并让当前线程在结点上等待。
继续分析让线程等待的 TransferStack#awaitFulfill
方法,线程会阻塞(或自旋)在该方法上等待被匹配,实现如下:
1 | SNode awaitFulfill(SNode s, boolean timed, long nanos) { |
上述方法首先会依据是否设置超时来计算剩余的到期时间和自旋次数,然后执行:
- 判断等待期间是否被中断,如果是则取消当前结点,即将结点的 match 指针指向自己;
- 判断结点的 match 指针是否指向 null,只要不为 null 就说明当前结点被成功匹配或取消(此时 match 指针指向结点自己),返回 match 指针指向的结点;
- 否则,说明结点未被匹配或取消,如果设置了超时且已经到期,则取消当前结点,并在下一轮循环中返回;
- 在进入阻塞之前,先尝试自旋几次;
- 如果自旋几次之后仍然未完成匹配则阻塞等待,依据是否设置超时来决定是无限期等待还是超时等待,并在等待之前判断当前结点上是否有绑定线程,如果未绑定则将当前线程绑定到该结点上。
由上述实现可以看到,等待的线程并没有立即阻塞,而是先尝试自旋了几次,这主要是考虑生产者和消费者频繁交互的情况。这类场景下当生产者执行入队列操作之后马上会有消费者前来执行出队列,此时生产者线程无需被阻塞,只需要自旋几次即被匹配成功,从而避免线程阻塞和唤醒所带来的性能开销。如果生产者和消费者交互并不频繁,因为自旋的次数并不多,所以不会造成太多的 CPU 开销,几乎可以忽略。
如果结点在等待期间被取消,则上述方法会将结点的 match 指针指向自己,后续流程会基于该特征识别被取消的结点,并调用 TransferStack#clean
方法执行清理工作,该方法实现如下:
1 | void clean(SNode s) { |
清理的过程首先会确立一个哨兵(sentinel)结点,该结点是位于结点 s 之后最近一个有效(未被取消)的结点,然后从栈顶开始遍历清除那些已经被取消的结点。至于为什么需要设置一个哨兵结点,考虑在并发场景下结点 s 可能已经被其它线程移除,设置哨兵结点能够避免对整个栈进行遍历。
接着来看一下 场景 2 ,此时栈中正在等待的线程运行模式与当前线程互补(可以简单理解为栈中等待的线程都是生产者,而当前线程是消费者),并且此时没有线程正在执行匹配操作,所以进入匹配进程。本次与当前线程匹配的是 head 结点上的线程,所以首先需要从上至下在栈上找到第一个有效(未被取消)的 head 结点,然后执行:
- 创建一个结点元素为 e,附加 FULFILLING 标志的结点 s,并将结点入栈;
- 获取本次待与 s 匹配的结点 m,如果 m 为 null 则说明栈上已经没有处于等待的结点,需要退出匹配进程并继续判定接下去进入哪个场景;
- 否则,调用
SNode#tryMatch
方法执行匹配操作; - 如果匹配成功则后移 head 指针,并返回(如果当前线程是消费者线程则返回匹配结点的元素值,如果当前线程是生产者线程则返回刚刚添加的元素值);
- 如果匹配失败,说明结点 m 已经被取消,尝试继续匹配 m 的后继结点。
下面利用图示演示上述执行流程。如下图 1 所示,假设当前操作线程是一个消费者(图中黄色结点),期望对 SynchronousQueue 执行出队列操作,并且当前栈中已经包含两个处于等待状态的生产者(图中青色结点)。因为当前线程与栈中等待的线程模式互补,所以新建一个元素值为 null 的结点入栈(如下图 2 所示),并附加 FULFILLING 标志(图中红色结点)。
然后开始执行匹配进程,设置 m 和 mn 指针,如上图 3 所示。在成功执行完 SNode#tryMatch
方法之后会将结点 m 的 match 指针指向结点 s,表示结点 m 和 s 匹配成功,如上图 4 所示。
继续来分析一下执行匹配进程的 SNode#tryMatch
方法,实现如下:
1 | boolean tryMatch(SNode s) { |
匹配的过程核心在于将待匹配结点的 match 指针指向当前操作线程对应的结点。
关于 Dual Stack 的运行机制就介绍这么多,受栈 FILO 特性的约束,基于 Dual Stack 的 SynchronousQueue 始终在栈顶执行入队列和出队列操作,后入队的线程会先被匹配,这也解释了为什么基于 Dual Stack 的 SynchronousQueue 是非公平的。基于 Dual Stack 的 SynchronousQueue 潜在的一个问题是可能导致先入队的线程长期得不到匹配而饥饿,而优点在于能够更好的维持线程局部性(thread locality),减少线程上下文切换的开销。
Dual Queue
针对 Dual Queue 数据结构,SynchronousQueue 实现了 TransferQueue 类,TransferQueue 同样继承自 Transferer 抽象类,并定义了 QNode 类描述队列上的结点。TransferQueue 定义了 TransferQueue#head
和 TransferQueue#tail
指针字段,分别指向队列的头结点和尾结点。
QNode 类的字段定义如下:
1 | static final class QNode { |
各字段的含义如代码注释,其中 QNode#isData
字段用于标识对应结点是生产者结点还是消费者结点。不同于 TransferStack 的 SNode 需要使用 SNode#mode
字段描述结点是未匹配的生产者、未匹配的消费者,或者是正在匹配中等状态,TransferQueue 因为出队列和入队列分别在 head 和 tail 结点上执行,所以无需定义专门的字段描述结点的运行模式。我们将在下面分析 TransferQueue#transfer
方法实现时一并分析 QNode 中定义的方法,并对各个字段的含义结合具体场景做进一步介绍。
在开始分析 TransferQueue 之于 Transferer#transfer
方法的实现之前,我们还是先从整体出发,感知一下 TransferQueue 的运行流程。同样以“生产者-消费者”为例,假设当前有 3 个生产者依次执行往 SynchronousQueue 中插入元素,执行的顺序为 1 -> 2 -> 3
,则入队列之后得到的队列结构如下:
1 | 1 -> 2 -> 3 -> null |
入队列后的 3 个生产者线程将在队列对应结点上等待。如果来了一个消费者执行出队列操作,此时消费者将与 head 结点上的生产者进行匹配,匹配成功之后得到的队列结构如下:
1 | 2 -> 3 -> null |
此时剩下的生产者线程将继续等待,期间可以允许新的消费者出队列,也可以允许新的生产者入队列。
上述过程就是 TransferQueue#transfer
方法的核心执行逻辑,对此有了一个大概的感知之后,下面来深入分析 TransferQueue#transfer
方法的具体实现。实际上在 TransferQueue#transfer
方法的开头,作者同样已经对整个方法的运行流程给出了直观的概括,摘录如下:
If queue apparently empty or holding same-mode nodes, try to add node to queue of waiters, wait to be fulfilled (or cancelled) and return matching item.
If queue apparently contains waiting items, and this call is of complementary mode, try to fulfill by CAS’ing item field of waiting node and dequeuing it, and then returning matching item.
方法 TransferQueue#transfer
实现如下:
1 | E transfer(E e, boolean timed, long nanos) { |
上述实现中 for 循环内部的 if ... else
控制结构分别对应作者给出的 2 段注释(已在代码中标出),在 for 循环的一开始会判断 head 或 tail 指针是否为 null,但是在 SynchronousQueue 运行期间正常是不会出现 head 或 tail 指针为 null 的情况,作者在注释中给出的解释如下:
The loop starts off with a null check guarding against seeing uninitialized head or tail values. This never happens in current SynchronousQueue, but could if callers held non-volatile/final ref to the transferer. The check is here anyway because it places null checks at top of loop, which is usually faster than having them implicitly interspersed.
下面展开分析场景 1 和场景 2 的实现和执行流程。首先来看一下 场景 1 ,此时队列为空,或者队列中等待的线程运行模式与当前线程的运行模式相同,此时需要将结点入队列,并让当前线程在结点上等待。执行流程可以概括为:
- 因为要入队列操作,所以要保证 tail 指向队列真正的尾结点;
- 如果设置了超时且已到期,则返回 null;
- 否则新建一个包含待添加元素 e 的结点入队列,如果失败进入下一轮循环重试,否则后移 tail 指针并调用
TransferQueue#awaitFulfill
方法让当前线程在该结点上等待匹配(或被取消); - 如果在等待期间被取消,则清理队列上的无效结点,并返回 null;
- 否则说明结点被成功匹配,更新 head 指针,如果当前线程是消费者线程则返回匹配结点的元素值,如果当前线程是生产者线程则返回刚刚添加的元素值。
下面利用图示演示上述执行流程。假设当前操作线程是一个生产者,期望将元素 3 插入到 SynchronousQueue 中,并且当前栈中已经包含了两个处于等待状态的生产者(如下图 1 所示)。因为当前线程与队列中等待的线程模式相同(即 isData=true
),所以新建一个元素值为 3 的结点入队列(如下图 2 所示),并让当前线程在结点上等待。
TransferQueue 实现的让线程等待的方法 TransferQueue#awaitFulfill
与 TransferStack 中实现的 TransferStack#awaitFulfill
方法在设计和实现思路上相同,这里不再重复介绍。下面来分析一下执行清理工作的 TransferQueue#clean
方法,实现如下(其中 s 是待清理的结点,pred 是 s 的前驱结点):
1 | void clean(QNode pred, QNode s) { |
如果待删除结点 s 不是 tail 结点,则只需要简单移除 s 即可,否则暂时不能移除 s 结点,会导致 tail 为 null,影响后续入队列操作。针对这种场景,作者设计了一个 cleanMe 结点,该结点的 next 指针指向需要被移除的 s 结点(此时 s 为 tail 结点),当结点 s 后续不再是 tail 结点时,延后删除。
接着来看一下 场景 2 ,此时队列中正在等待的线程运行模式与当前线程互补,所以进入匹配进程。本次与当前线程匹配的是 head 结点的后继结点上的线程,所以首先需要从前往后在队列上找到第一个有效(未被取消)的 head 后继结点,然后执行:
- 获取 head 结点的后继结点 m;
- 如果结点 m 已经被匹配,或被取消,则后移 head 指针后进入下一轮循环重试;
- 否则,基于 CAS 尝试将结点 m 的元素值替换为 e,如果失败则说明结点 m 已经被其它线程匹配,继续后移 head 指针后进入下一轮循环重试;
- 否则,说明匹配成功,后移 head 指针,并唤醒在匹配结点 m 上等待的线程,如果当前线程是消费者线程则返回匹配结点的元素值,如果当前线程是生产者线程则返回刚刚添加的元素值。
下面利用图示演示上述执行流程。如下图 1 所示,假设当前操作线程是一个消费者(图中黄色结点),期望对 SynchronousQueue 执行出队列操作,并且当前队列中已经包含两个处于等待状态的生产者(图中青色结点)。因为当前线程与队列中等待的线程模式互补,所以获取 head 结点的 next 结点 m 作为待匹配结点(如下图 2 所示)。基于 CAS 尝试将结点 m 的元素值修改为 null,如下图 3 所示,然后后移 head 指针指向 m 结点,并唤醒在结点 m 上等待的线程,如下图 4 所示。
关于 Dual Queue 的运行机制就介绍这么多,受队列 FIFO 特性的约束,基于 Dual Queue 的 SynchronousQueue 在队头执行出队列操作,并在队尾执行入队列操作,先入队的线程通常会先被匹配,这也解释了为什么基于 Dual Queue 的 SynchronousQueue 是公平的。基于 Dual Queue 的 SynchronousQueue 因为入队和出队的冲突相对较小,所以在竞争频繁的场景下相对于非公平模式反而具有更好的性能。
总结
本文我们分析了 SynchronousQueue 的设计与实现,相对于之前文章中介绍的一系列线程安全队列而言,SynchronousQueue 在实现和使用上有其特别之处。SynchronousQueue 没有容量的概念,入队列的线程在完成入队列操作之后会在队列上等待出队列的线程前来执行出队列操作,反之亦然。SynchronousQueue 中的结点除了承载结点元素之外,还附着着相应的操作线程,这些线程在对应的结点上等待被匹配。此外,SynchronousQueue 区分公平和非公平模式,其中公平模式基于 Dual Queue 数据结构实现,非公平模式基于 Dual Stack 数据结构实现。理解 SynchronousQueue 的核心在于理解 Dual Stack 和 Dual Queue 的设计思想。