深入理解 JUC:Semaphore

前面我们分析了同步器 CountDownLatch 和 CyclicBarrier 的设计和实现,这两个同步器在使用上都有一个共同的特点,就是在构造时需要指定参与的线程数目,然后对计数器执行减值操作。本文将要介绍的 Semaphore 信号量同样在构造时需要指定一个 int 类型 permits 参数,不过该参数并不用于指定参与的线程数目,相反,Semaphore 并不限制参与的线程数,该参数用于限制同一时间最大允许执行的线程数目上限。

参与到 Semaphore 中的线程如果希望继续运行,需要从 Semaphore 那里申请获取一个或多个令牌,只有成功拿到令牌的线程才允许继续执行,否则需要阻塞等待,并在执行完成之后需要归还令牌。参数 permits 可以理解为令牌的总数,只要 Semaphore 手上有可用的令牌,就允许有新的线程过来申请。一个线程一次性可以申请一个或多个令牌,只要令牌的数量足够多,Semaphore 就允许同一个时间有多个线程并行执行。

Semaphore 示例

下面以一个排队就餐的例子来演示 Semaphore 的基本使用,假设一个餐厅一次性最多只能容纳 5 个人同时就餐,但是因为菜品口味极佳,所以生意非常好,来就餐的人多于餐厅能够同时容纳的人数上限,所以超出的人需要在外面排队等待叫号。假设今天有 20 个人前来就餐,那么叫号的过程可以实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private static final int MAX_COUNT = 5;

private static class Person implements Runnable {

private Semaphore semaphore;

public Person(Semaphore semaphore) {
this.semaphore = semaphore;
}

@Override
public void run() {
try {
System.out.println("Thread " + Thread.currentThread().getName() + " is waiting.");
semaphore.acquire();
System.out.println("Thread " + Thread.currentThread().getName() + " is eating.");
TimeUnit.SECONDS.sleep(RandomUtils.nextInt(1, 3));
System.out.println("Thread " + Thread.currentThread().getName() + " ate up.");
} catch (Exception e) {
e.printStackTrace();
} finally {
semaphore.release();
}

}
}

public static void main(String[] args) {
// 使用公平锁,保证叫号尽量的公平
Semaphore semaphore = new Semaphore(MAX_COUNT, true);
for (int i = 0; i < 20; i++) {
new Thread(new Person(semaphore), String.valueOf(i)).start();
}
}

上述示例中当一个顾客到达时需要调用 Semaphore#acquire 方法申请获取令牌(即询问是否有空位),如果没有空闲的令牌则需要等待。当一个顾客就餐完毕之后需要归还之前申请到的令牌(执行 Semaphore#release 方法),此时允许下一位顾客申请令牌进入餐厅就餐。

Semaphore 实现内幕

下面来看一下 Semaphore 的设计与实现。Semaphore 同样基于 AQS 实现,其内部类 Sync 继承自 AbstractQueuedSynchronizer,并派生出 FairSync 和 NonfairSync 两个子类,分别表示公平锁和非公平锁,这些设计与前面文章中介绍的基于 AQS 实现的 ReentrantLock 和 ReentrantReadWriteLock 如出一辙。Semaphore 在构造时允许我们通过参数指定是使用公平锁还是非公平锁,默认为非公平锁,如下:

1
2
3
4
5
6
7
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

Semaphore 中定义的方法在实现上均委托给 Sync 对象执行,并复用 AQS 的 state 字段记录当前剩余可用的令牌数。下面重点来分析一下 Semaphore 申请和归还令牌的方法实现,即 Semaphore#acquireSemaphore#release 方法。首先来看一下令牌申请的过程,Semaphore 提供了多个版本的 Semaphore#acquire 方法实现,包括:

  • Semaphore#acquire():申请 1 个令牌,如果成功则立即返回,否则阻塞等待,期间支持响应中断请求。
  • Semaphore#acquire(int):相对于 Semaphore#acquire() 的区别在于一次性申请多个令牌。
  • Semaphore#acquireUninterruptibly():申请 1 个令牌,如果成功则立即返回,否则阻塞等待,期间忽略中断请求。
  • Semaphore#acquireUninterruptibly(int):相对于 Semaphore#acquireUninterruptibly() 的区别在于一次性申请多个令牌。
  • Semaphore#tryAcquire():尝试申请 1 个令牌,不管成功还是失败都会立即返回,成功则返回 true,失败则返回 false。
  • Semaphore#tryAcquire(int):相对于 Semaphore#tryAcquire() 的区别在于一次性申请多个令牌。
  • Semaphore#tryAcquire(long, TimeUnit):尝试申请 1 个令牌,相对于 Semaphore#tryAcquire() 引入了超时等待机制。
  • Semaphore#tryAcquire(int, long, TimeUnit):相对于 Semaphore#tryAcquire(long, TimeUnit) 的区别在于一次性申请多个令牌。

这些申请令牌的方法在实现上大同小异,下面以 Semaphore#acquire() 为例分析一下具体的执行过程。方法实现如下:

1
2
3
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

上述方法直接委托给 AQS 的 AbstractQueuedSynchronizer#acquireSharedInterruptibly 方法执行,申请令牌单位为 1。前面在分析 AQS 时已经介绍了该方法的运行机制,下面重点来看一下 Sync 对于模板方法 AbstractQueuedSynchronizer#tryAcquireShared 的实现(以 NonfairSync 为例):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// NonfairSync#tryAcquireShared
protected int tryAcquireShared(int acquires) {
return this.nonfairTryAcquireShared(acquires);
}

// Sync#nonfairTryAcquireShared
final int nonfairTryAcquireShared(int acquires) {
for (; ; ) {
// 获取 state 状态值
int available = this.getState();
// 计算剩余可用的资源数
int remaining = available - acquires;
if (remaining < 0 // 当前没有可用的资源
|| this.compareAndSetState(available, remaining)) { // 当前有可用的资源,且获取资源成功
return remaining;
}
}
}

申请令牌的执行流程可以总结为:

  1. 获取当前剩余可用的令牌数,即 state 值;
  2. 如果剩余可用的令牌数小于本次申请的数目,则返回差值(负值);
  3. 否则,更新 state 值,如果更新成功则说明获取令牌成功,返回差值(非负值)。

AbstractQueuedSynchronizer#acquireSharedInterruptibly 方法的实现我们知道,如果上述过程返回负值,则会将当前线程添加到同步队列中阻塞等待。

再来看一下令牌归还的过程,Semaphore 同样提供了多个版本的 Semaphore#release 方法实现,包括:

  • Semaphore#release():归还 1 个令牌。
  • Semaphore#release(int):归还指定数目的令牌。

下面以 Semaphore#release() 方法为例分析一下令牌归还的执行过程,实现如下:

1
2
3
public void release() {
sync.releaseShared(1);
}

上述方法直接委托给 AQS 的 AbstractQueuedSynchronizer#releaseShared 方法执行,归还令牌单位为 1。前面在分析 AQS 时同样已经介绍了该方法的运行机制,下面重点来看一下 Sync 对于模板方法 AbstractQueuedSynchronizer#tryReleaseShared 的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
protected final boolean tryReleaseShared(int releases) {
for (; ; ) {
// 获取 state 状态值
int current = this.getState();
// 计算释放之后剩余的资源数
int next = current + releases;
if (next < current) { // overflow
// 溢出
throw new Error("Maximum permit count exceeded");
}
// 更新 state 状态值
if (this.compareAndSetState(current, next)) {
return true;
}
}
}

令牌归还的执行过程如上述代码注释,比较简单,但是有一点疑问的是什么情况下 next 值会溢出?

一般来说线程在归还令牌之前必须先申请令牌,这样就能够保证空闲令牌的数量始终不会大于我们在构造 Semaphore 时指定的初始值,然而在上述方法实现中,我们并没有看到有任何逻辑限定在调用 Semaphore#release 方法之前必须调用 Semaphore#acquire 方法。实际上 Semaphore 在实现时也的确没有添加这一限制,也就说任何线程都可以调用 Semaphore#release 方法归还令牌,即使它之前从来没有申请过令牌,这样就会导致令牌的数量溢出。官方文档中有如下说明:

There is no requirement that a thread that releases a permit must have acquired that permit by calling {@link #acquire}. Correct usage of a semaphore is established by programming convention in the application.

也就是说,Semaphore 并不要求线程在归还令牌之前一定要先申请获取令牌,具体由应用程序自己决定。

总结

本文我们分析了 Semaphore 信号量的设计与实现,了解到 Semaphore 同样是基于 AQS 实现的同步器组件。Semaphore 通过令牌机制以限定参与的线程在同一时间执行的线程数目不能超过令牌的个数,在语义和实现上都比较简单,但功能却很强大。最后还需要注意的一点就是,Semaphore 并不要求在归还令牌之前一定要先申请获取令牌,开发者可以结合自身业务逻辑来灵活应用这一点。

参考

  1. JDK 1.8 源码
  2. The java.util.concurrent Synchronizer Framework