线程在读取缓存数据 data 之前需要获取读锁(ReadLock),如果判断缓存数据 data 未被初始化,则尝试获取写锁(WriteLock)以初始化缓存数据。因为读锁是共享的,所有上述示例允许多个线程并发读取 data 数据,但是一旦某个线程检测到 data 未被初始化,则尝试获取写锁并更新缓存数据,期间其它线程需要阻塞等待初始化过程的完成。
privatevoidprocessCachedDataWithoutDowngrade(int newValue){ readLock.lock(); if (cacheInvalid) { readLock.unlock(); writeLock.lock(); try { if (cacheInvalid) { value = newValue; System.out.println("<thread-" + Thread.currentThread().getName() + "> update value as " + value); cacheInvalid = true; } } finally { writeLock.unlock(); } } try { /* * 模拟处理数据的过程,期间 cacheInvalid 可能会被其它线程修改以期望修改数据, * 没有锁降级保证,其它线程期间可能获取到写锁并更改数据,导致当前线程之前看到的数据发生变化 */ TimeUnit.SECONDS.sleep(5); System.out.println("<thread-" + Thread.currentThread().getName() + "> read value is " + value); } catch (InterruptedException e) { e.printStackTrace(); } finally { if (lock.getReadHoldCount() > 0) { readLock.unlock(); } } }
执行结果如下:
1 2 3 4 5 6
<thread-A> start to run after 1 seconds <thread-A> update value as 11 <thread-B> start to run after 2 seconds <thread-B> update value as 12 <thread-A> read value is 12 <thread-B> read value is 12
<thread-A> start to run after 1 seconds <thread-A> update value as 43 <thread-B> start to run after 2 seconds <thread-A> read value is 43 <thread-B> update value as 48 <thread-B> read value is 48
staticfinalclassThreadLocalHoldCounterextendsThreadLocal<HoldCounter> { @Override public HoldCounter initialValue(){ returnnew HoldCounter(); } }
staticfinalclassHoldCounter{ int count = 0; // 重入次数 // Use id, not reference, to avoid garbage retention finallong tid = getThreadId(Thread.currentThread()); // 线程 ID }
ThreadLocalHoldCounter 覆盖实现了 ThreadLocal#initialValue 方法,当某个线程第一次访问 Sync#readHolds 属性时会为该线程创建一个 HoldCounter 对象(下文简称线程计数器),用于记录当前线程 ID 和重入次数。
protectedfinalinttryAcquireShared(int unused){ /* * Walkthrough: * 1. If write lock held by another thread, fail. * 2. Otherwise, this thread is eligible for lock wrt state, so ask if it should block because of queue policy. * If not, try to grant by CASing state and updating count. * Note that step does not check for reentrant acquires, which is postponed to full version * to avoid having to check hold count in the more typical non-reentrant case. * 3. If step 2 fails either because thread apparently not eligible or CAS fails or count saturated, * chain to version with full retry loop. */
finalintfullTryAcquireShared(Thread current){ /* * This code is in part redundant with that in tryAcquireShared but is simpler overall by not * complicating tryAcquireShared with interactions between retries and lazily reading hold counts. */ HoldCounter rh = null; for (; ; ) { // 获取 state 状态值 int c = this.getState(); // 当前写锁已被持有 if (exclusiveCount(c) != 0) { // 如果持有写锁的线程不是当前线程,则尝试加读锁失败 if (this.getExclusiveOwnerThread() != current) { return -1; } // else we hold the exclusive lock; blocking here would cause deadlock. } // 对于公平锁而言,前面存在等待的线程 elseif (this.readerShouldBlock()) { // Make sure we're not acquiring read lock reentrantly if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { // 更新计数器,如果当前线程未持有读锁,则移除计数器 if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) { readHolds.remove(); } } } if (rh.count == 0) { return -1; } } }
// 基于 CAS 修改 state 状态值 for (; ; ) { int c = this.getState(); int nextc = c - SHARED_UNIT; if (this.compareAndSetState(c, nextc)) { /* * Releasing the read lock has no effect on readers, * but it may allow waiting writers to proceed if both read and write locks are now free. */ return nextc == 0; // 如果 nextc 为 0,则说明当前锁(读锁、写锁)未被任何线程持有 } } }
protectedfinalbooleantryAcquire(int acquires){ /* * Walkthrough: * 1. If read count nonzero or write count nonzero and owner is a different thread, fail. * 2. If count would saturate, fail. (This can only happen if count is already nonzero.) * 3. Otherwise, this thread is eligible for lock if it is either a reentrant acquire or * queue policy allows it. If so, update state and set owner. */
// 获取当前线程对象 Thread current = Thread.currentThread(); // 获取 state 状态值 int c = this.getState(); // 获取写锁的重入次数 int w = exclusiveCount(c); // 如果不为 0,则说明当前锁(读锁、写锁)已被线程持有 if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0) if (w == 0// 写锁重入次数为 0,说明读锁已被持有,获取写锁失败 || current != this.getExclusiveOwnerThread()) { // 写锁已经被其它线程持有,获取写锁失败 returnfalse; } // 写锁重入次数达到上限 if (w + exclusiveCount(acquires) > MAX_COUNT) { thrownew Error("Maximum lock count exceeded"); } // 更新 state 状态值 this.setState(c + acquires); returntrue; }
// 当前锁对象未被线程持有,对于非公平锁则抢占式获取锁对象,对于公平锁则等待前面的线程优先获取锁 if (this.writerShouldBlock() || !this.compareAndSetState(c, c + acquires)) { returnfalse; } // 记录当前持有写锁的线程对象 this.setExclusiveOwnerThread(current); returntrue; }
上述方法尝试获取写锁资源的执行流程可以概括为:
获取当前线程对象、state 状态值,以及写锁的重入次数;
如果 state 状态值不为 0,但写锁重入次数为 0,说明当前读锁已被持有,获取写锁失败;
如果 state 状态值不为 0,且写锁重入次数也不为 0,说明当前写锁已被持有,如果持有该写锁的线程不是当前线程,则获取写锁失败;
否则,说明当前线程已经持有该写锁资源,本次加锁相当于重入,此时需要校验写锁重入次数是否已达上限,是则抛出异常,否则说明获取写锁成功,更新 state 状态值;
如果 2 中检测到 state 状态值为 0,说明当前锁对象未被任何线程持有,对于公平锁而言需要考虑公平性,优先让排在前面的线程先获取锁;
尝试更新 state 状态值,如果成功则说明加锁成功,需要记录当前持有写锁的线程对象并返回 true,否则返回 false。