曾经写过一篇《基于锁分段机制的 ConcurrentHashMap 实现内幕 》的文章,介绍了在 JDK 1.7 之前 ConcurrentHashMap 的实现机制。文章的结尾我们提及到在 JDK 1.8 之后,ConcurrentHashMap 在实现上抛弃了锁分段机制,转而采用 CAS(Compare-And-Swap) 策略,并和 HashMap 一样引入了红黑树的支持。本文我们将基于 JDK 1.8 源码,分析基于 CAS 机制的 ConcurrentHashMap 实现。
CAS 机制简述
CAS 属于原子操作的一种,能够保证一次读写操作是原子的。CAS 通过将内存中的值与期望值进行比较,只有在两者相等时才会对内存中的值进行修改。一个 CAS 操作的伪代码实现如下(引用自 WIKI ):
1 2 3 4 5 6 7 function cas(p : pointer to int, old : int, new : int) returns bool { if *p ≠ old { return false } *p <- new return true }
Java 中的 CAS 实现位于 sun.misc.Unsafe
类中,该类中定义了大量的 native 方法,CAS 的实现也不例外:
1 2 3 public final native boolean compareAndSwapObject (Object o, long offset, Object expected, Object x) ;public final native boolean compareAndSwapInt (Object o, long offset, int expected, int x) ;public final native boolean compareAndSwapLong (Object o, long offset, long expected, long x) ;
仅仅从 java 源码层面我们只能看到对应的 native 定义,而具体实现需要依赖于操作系统,这里对方法的参数进行说明:
o :目标操作对象。
offset :目标操作数内存偏移地址。
expected :期望值。
x :更新值。
CAS 是支撑 JUC 的基础,除了本文介绍的 ConcurrentHashMap 实现外,典型的应用场景就是 java 中的原子类,例如 AtomicInteger,其中运用了大量的 CAS 操作。CAS 能够在保证性能的同时提供并发场景下的线程安全性,以 AtomicInteger#getAndSet
方法为例:
1 2 3 public final int getAndSet (int newValue) { return unsafe.getAndSetInt(this , valueOffset, newValue); }
该方法原子性的将当前 AtomicInteger 类型的变量值设置为 newValue,并返回修改之前的值。整个过程无需加锁,实现上依赖于 Unsafe#getAndSetInt
方法,其中 valueOffset 变量是当前 AtomicInteger 类型变量值的内存偏移地址:
1 2 3 4 5 6 7 public final int getAndSetInt (Object var1, long var2, int var4) { int var5; do { var5 = this .getIntVolatile(var1, var2); } while (!this .compareAndSwapInt(var1, var2, var5, var4)); return var5; }
方法 Unsafe#getAndSetInt
首先会获取当前 AtomicInteger 类型变量的值,然后基于 CAS 更新变量值为 newValue。
CAS 机制虽然无需加锁、安全且高效,但也存在一些缺点,概括如下:
循环检查的时间可能较长,不过可以限制循环检查的次数。
只能对一个共享变量执行原子操作。
存在 ABA 问题。
所谓 ABA 问题是指在 CAS 两次检查操作期间,目标变量的值由 A 变为 B,又变回 A,但是 CAS 看不到这中间的变换,对它来说目标变量的值并没有发生变化,一直是 A,所以 CAS 操作会继续更新目标变量的值。大部分时候该问题并不会对结果产生实质性影响,如果确实需要关心该问题(例如 lock-free 算法),可以为目标变量引入版本特性,例如 AtomicStampedReference 工具类通过为引用建立类似版本号(stamp)的方式,来解决 ABA 问题。
ConcurrentHashMap 实现内幕
JDK 1.7 之前,ConcurrentHashMap 通过加锁保证线程安全,并引入锁分段机制以减小加锁的粒度,从而提升性能。JDK 1.8 中的 ConcurrentHashMap 实现则引入了 CAS 机制以尽量避免加锁操作,虽然仍然有部分同步代码,不过锁的粒度相对于分段锁而言更加细。另外一个重要的设计就是在结点个数达到阈值时会自动将链表转换成红黑树,从而进一步提升性能。
存储结构设计
在存储结构设计上,新的 ConcurrentHashMap 相对于之前看起来更加的简洁。如下图,在一个 Node 类型的数组(下文如不做特殊说明,均使用 table 指代该数组)上挂载着多个链表和红黑树(下文如不做特殊说明,均使用 bin 指代一个完整的链表或红黑树):
在结点类型上主要包含:
Node<K, V>
:基本结点数据结构,用于存储 key、value,以及结点的哈希值。
ForwardingNode<K, V>
:扩容节点,哈希值始终为 -1,在扩容过程中作为一个占位符表示当前结点为 null,或正在迁移。
ReservationNode<K, V>
:同样是一个占位符结点,哈希值始终为 -3,用于 computeIfAbsent
和 compute
操作。
TreeNode<K, V>
:红黑树结点,除了包含基本的 key、value,以及结点哈希值外,还定义了红黑树结点特有的指针,以及结点颜色标记。
TreeBin<K, V>
:封装红黑树相关的操作。
ConcurrentHashMap 针对 ForwardingNode、ReservationNode,以及树根结点都定义了特定的哈希值:
1 2 3 4 5 6 7 8 static final int MOVED = -1 ; static final int TREEBIN = -2 ; static final int RESERVED = -3 ;
基本方法实现
工具方法
ConcurrentHashMap 主要定义了 3 个工具方法:tabAt、casTabAt 和 setTabAt。
tabAt :用于获取 table 上下标为 i 的头结点,实现上依赖 Unsafe 类。
1 2 3 static final <K, V> Node<K, V> tabAt (Node<K, V>[] tab, int i) { return (Node<K, V>) U.getObjectVolatile(tab, ((long ) i << ASHIFT) + ABASE); }
casTabAt :基于 CAS 尝试更新 table 上下标为 i 的结点的值为 v。
1 2 3 static final <K, V> boolean casTabAt (Node<K, V>[] tab, int i, Node<K, V> c, Node<K, V> v) { return U.compareAndSwapObject(tab, ((long ) i << ASHIFT) + ABASE, c, v); }
setTabAt :用于设置 table 上下标为 i 的结点为 v,相对于 casTabAt 方法的区别在于不关注历史值。
1 2 3 static final <K, V> void setTabAt (Node<K, V>[] tab, int i, Node<K, V> v) { U.putObjectVolatile(tab, ((long ) i << ASHIFT) + ABASE, v); }
构造方法
ConcurrentHashMap 的构造方法存在多个重载版本,对应的最底层版本实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public ConcurrentHashMap (int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0.0f ) || initialCapacity < 0 || concurrencyLevel <= 0 ) { throw new IllegalArgumentException(); } if (initialCapacity < concurrencyLevel) { initialCapacity = concurrencyLevel; } long size = (long ) (1.0 + (long ) initialCapacity / loadFactor); int cap = (size >= (long ) MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int ) size); this .sizeCtl = cap; }
该构造方法允许我们指定初始化容量(initialCapacity)、负载因子(loadFactor),以及并行度(concurrencyLevel),并依据这些参数计算 sizeCtl 值。类实例变量 sizeCtl 是一个核心变量,用于控制 table 的初始化和扩容策略,该变量的值定义了几种不同的语义:
-1:表示正在初始化。
-N:表示有 N-1 个线程正在执行扩容操作。
0:表示还未执行初始化。
N:表示初始化或下次扩容的大小。
添加或更新键值对
方法 put 用于往 ConcurrentHashMap 中添加或更新键值对,这是 map 集合的基础操作,实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 public V put (K key, V value) { return this .putVal(key, value, false ); } final V putVal (K key, V value, boolean onlyIfAbsent) { if (key == null || value == null ) { throw new NullPointerException(); } int hash = spread(key.hashCode()); int binCount = 0 ; for (Node<K, V>[] tab = table; ; ) { Node<K, V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0 ) { tab = this .initTable(); } else if ((f = tabAt(tab, i = (n - 1 ) & hash)) == null ) { if (casTabAt(tab, i, null , new Node<>(hash, key, value, null ))) { break ; } } else if ((fh = f.hash) == MOVED) { tab = this .helpTransfer(tab, f); } else { V oldVal = null ; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0 ) { binCount = 1 ; for (Node<K, V> e = f; ; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) { e.val = value; } break ; } Node<K, V> pred = e; if ((e = e.next) == null ) { pred.next = new Node<>(hash, key, value, null ); break ; } } } else if (f instanceof TreeBin) { Node<K, V> p; binCount = 2 ; if ((p = ((TreeBin<K, V>) f).putTreeVal(hash, key, value)) != null ) { oldVal = p.val; if (!onlyIfAbsent) { p.val = value; } } } } } if (binCount != 0 ) { if (binCount >= TREEIFY_THRESHOLD) { this .treeifyBin(tab, i); } if (oldVal != null ) { return oldVal; } break ; } } } this .addCount(1L , binCount); return null ; }
上述方法的执行流程可以概括为:
计算 key 的哈希值;
如果 table 为空,则执行初始化;
否则,计算 key 哈希值对应的下标,并获取 table 中对应下标的头结点;
如果头结点为 null,则基于 CAS 尝试添加头结点;
否则,如果头结点不为 null,但是头结点的哈希值为 MOVED,说明目前正在执行扩容 table 的操作,则帮助扩容;
否则,如果头结点不为 null,且未处于扩容状态,则尝试添加或更新结点;
判断当前 bin 范围内结点数目是否大于阈值,如果大于阈值则执行扩容 table 的操作。
下面就流程中的一些关键点展开作进一步分析。
初始化 table
Table 的初始化采用延迟策略,在我们构造 ConcurrentHashMap 对象时只是初始化了一些参数值,并没有对 table 进行构造,而 table 的初始化发生在第一次使用 table 时,例如这里 put 方法。
初始化 table 的过程位于 ConcurrentHashMap#initTable
方法中,实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 private final Node<K, V>[] initTable() { Node<K, V>[] tab; int sc; while ((tab = table) == null || tab.length == 0 ) { if ((sc = sizeCtl) < 0 ) { Thread.yield(); } else if (U.compareAndSwapInt(this , SIZECTL, sc, -1 )) { try { if ((tab = table) == null || tab.length == 0 ) { int n = (sc > 0 ) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n]; table = tab = nt; sc = n - (n >>> 2 ); } } finally { sizeCtl = sc; } break ; } } return tab; }
Table 本质上就是一个 Node 数组,其初始化过程也就是对 Node 数组的初始化过程,方法中使用了 CAS 策略控制线程初始化操作之间的竞争。初始化 table 的执行流程可以概括为:
判断 sizeCtl 值是否小于 0,如果小于 0 则表示 ConcurrentHashMap 正在执行初始化操作,所以需要先等待一会,如果其它线程初始化失败还可以顶替上去;
如果 sizeCtl 值大于等于 0,则基于 CAS 策略抢占标记 sizeCtl 为 -1,表示 ConcurrentHashMap 正在执行初始化,然后构造 table,并更新 sizeCtl 的值。
协助扩容
在 put 过程中,如果当前头结点的哈希值为 MOVED,则说明 ConcurrentHashMap 正在对结点执行扩容操作,此时可以让当前线程加入到扩容工作中协助扩容。该过程由 ConcurrentHashMap#helpTransfer
方法实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 final Node<K, V>[] helpTransfer(Node<K, V>[] tab, Node<K, V> f) { Node<K, V>[] nextTab; int sc; if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K, V>) f).nextTable) != null ) { int rs = resizeStamp(tab.length); while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0 ) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0 ) { break ; } if (U.compareAndSwapInt(this , SIZECTL, sc, sc + 1 )) { this .transfer(tab, nextTab); break ; } } return nextTab; } return table; }
该方法的主要作用是基于 CAS 尝试添加一个线程去协助扩容操作,如果能够成功加入则将 sizeCtl 值加 1。方法 ConcurrentHashMap#transfer
是真正执行扩容操作的地方,并在多个步骤中被触发,这里先了解一下该方法的参数,后文再对具体实现展开分析。
该方法接收 2 个参数,其中第一个参数 tab 是当前需要被扩容的 table,而第二个参数 nextTab 则表示扩容之后的 table,容量上是之前的两倍。上述方法传递的 nextTab 是一个非 null 值,因为触发 helpTransfer 的前提就是当前已经处于扩容阶段。
链表转红黑树
ConcurrentHashMap 在设计上并不是一上来就在 table 上建立红黑树数据结构作为 bin,而是先建立一个链表,并在链表长度与 table 长度均达到一定的阈值时才执行转换,即将链表转换成红黑树:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 private final void treeifyBin (Node<K, V>[] tab, int index) { Node<K, V> b; int n, sc; if (tab != null ) { if ((n = tab.length) < MIN_TREEIFY_CAPACITY) { this .tryPresize(n << 1 ); } else if ((b = tabAt(tab, index)) != null && b.hash >= 0 ) { synchronized (b) { if (tabAt(tab, index) == b) { TreeNode<K, V> hd = null , tl = null ; for (Node<K, V> e = b; e != null ; e = e.next) { TreeNode<K, V> p = new TreeNode<>(e.hash, e.key, e.val, null , null ); if ((p.prev = tl) == null ) { hd = p; } else { tl.next = p; } tl = p; } setTabAt(tab, index, new TreeBin<>(hd)); } } } } }
上述方法默认会在链表长度大于等于 8 时触发,此时并没有直接执行转换操作,而是先判断当前 table 的长度是否小于 64,如果小于则先尝试扩容操作,否则才会将链表转换成红黑树。如果是扩容的话会基于 CAS 尝试将 sizeCtl 的值设置为 (rs << RESIZE_STAMP_SHIFT) + 2
,然后调用 ConcurrentHashMap#transfer
方法执行扩容,该过程由 ConcurrentHashMap#tryPresize
方法实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 private final void tryPresize (int size) { int c = (size >= (MAXIMUM_CAPACITY >>> 1 )) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1 ) + 1 ); int sc; while ((sc = sizeCtl) >= 0 ) { Node<K, V>[] tab = table; int n; if (tab == null || (n = tab.length) == 0 ) { n = (sc > c) ? sc : c; if (U.compareAndSwapInt(this , SIZECTL, sc, -1 )) { try { if (table == tab) { @SuppressWarnings("unchecked") Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n]; table = nt; sc = n - (n >>> 2 ); } } finally { sizeCtl = sc; } } } else if (c <= sc || n >= MAXIMUM_CAPACITY) { break ; } else if (tab == table) { int rs = resizeStamp(n); if (sc < 0 ) { Node<K, V>[] nt; if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0 ) { break ; } if (U.compareAndSwapInt(this , SIZECTL, sc, sc + 1 )) { this .transfer(tab, nt); } } else if (U.compareAndSwapInt(this , SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2 )) { this .transfer(tab, null ); } } } }
上述方法的核心操作在于添加 transfer 任务,并设置 sizeCtl 值。该方法第一次调用 transfer 方法时 sizeCtl 一定是大于等于 0 的,所以会尝试将 sizeCtl 设置为 (rs << RESIZE_STAMP_SHIFT) + 2
,这是一个大负数,并执行 transfer(tab, null)
操作。后面的循环 sizeCtl 均小于 0,所以会执行 transfer(tab, nt)
,并将 sizeCtl 加 1。注意整个过程中 sizeCtl 值的变化,在一次扩容操作中第一次调用 transfer 方法时将 sizeCtl 设置为 (rs << RESIZE_STAMP_SHIFT) + 2
,并在扩容过程再次调用 transfer 方法时将 sizeCtl 加 1,这对于下一节理解扩容操作什么时候结束至关重要。
扩容操作
扩容操作简单地说就是新建一个长度翻倍的 nextTable,然后将之前 table 上的结点重新哈希迁移到新的 nextTable 上,并在迁移完成之后使用 nextTable 替换原先的 table。对于一个 table 而言,上面分布着 n 个 bin 结点,而结点迁移的过程可以是并发的,这样可以提升迁移的效率。ConcurrentHashMap 使用了一个 stride 变量用于指定将 stride 个 bin 结点组成一个任务单元由一个线程负责处理,在单核 CPU 下 stride 的值为 table 的长度 n,在多核 CPU 下为 (n >>> 3) / NCPU
,最小值为 16。
ConcurrentHashMap 定义了一个类实例变量 transferIndex,用于指定任务的边界。任务划分的过程在 table 上是从后往前进行的,例如现在有 n 个结点,则编号 (n-1-stride, ..., n-1)
的任务交给第 1 个线程进行处理,编号 (n-1-2*stride, ..., n-1-stride)
的任务交给第 2 个线程进行处理,以此类推。当有新的线程加入时可以依据 transferIndex 值知道接下去应该分配哪一块的 bin 结点给当前线程。
整个扩容的核心实现位于 ConcurrentHashMap#transfer
方法中,我们在前面的分析中多次提及到该方法,实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 private final void transfer (Node<K, V>[] tab, Node<K, V>[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1 ) ? (n >>> 3 ) / NCPU : n) < MIN_TRANSFER_STRIDE) { stride = MIN_TRANSFER_STRIDE; } if (nextTab == null ) { try { @SuppressWarnings("unchecked") Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n << 1 ]; nextTab = nt; } catch (Throwable ex) { sizeCtl = Integer.MAX_VALUE; return ; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; ForwardingNode<K, V> fwd = new ForwardingNode<>(nextTab); boolean advance = true ; boolean finishing = false ; for (int i = 0 , bound = 0 ; ; ) { Node<K, V> f; int fh; while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) { advance = false ; } else if ((nextIndex = transferIndex) <= 0 ) { i = -1 ; advance = false ; } else if (U.compareAndSwapInt( this , TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0 ))) { bound = nextBound; i = nextIndex - 1 ; advance = false ; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { nextTable = null ; table = nextTab; sizeCtl = (n << 1 ) - (n >>> 1 ); return ; } if (U.compareAndSwapInt(this , SIZECTL, sc = sizeCtl, sc - 1 )) { if ((sc - 2 ) != resizeStamp(n) << RESIZE_STAMP_SHIFT) { return ; } finishing = advance = true ; i = n; } } else if ((f = tabAt(tab, i)) == null ) { advance = casTabAt(tab, i, null , fwd); } else if ((fh = f.hash) == MOVED) { advance = true ; } else { synchronized (f) { if (tabAt(tab, i) == f) { Node<K, V> ln, hn; if (fh >= 0 ) { int runBit = fh & n; Node<K, V> lastRun = f; for (Node<K, V> p = f.next; p != null ; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0 ) { ln = lastRun; hn = null ; } else { hn = lastRun; ln = null ; } for (Node<K, V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0 ) { ln = new Node<>(ph, pk, pv, ln); } else { hn = new Node<>(ph, pk, pv, hn); } } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true ; } else if (f instanceof TreeBin) { TreeBin<K, V> t = (TreeBin<K, V>) f; TreeNode<K, V> lo = null , loTail = null ; TreeNode<K, V> hi = null , hiTail = null ; int lc = 0 , hc = 0 ; for (Node<K, V> e = t.first; e != null ; e = e.next) { int h = e.hash; TreeNode<K, V> p = new TreeNode<>(h, e.key, e.val, null , null ); if ((h & n) == 0 ) { if ((p.prev = loTail) == null ) { lo = p; } else { loTail.next = p; } loTail = p; ++lc; } else { if ((p.prev = hiTail) == null ) { hi = p; } else { hiTail.next = p; } hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0 ) ? new TreeBin<>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0 ) ? new TreeBin<>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true ; } } } } } }
方法的实现很复杂,不过整体流程可以概括为 2 大步骤:
如果 nextTable 未初始化,则先执行初始化操作,新的 table 容量翻倍;
执行迁移任务。
其中步骤 1 比较简单,不过需要注意的是并不是所有触发 transfer 方法都需要执行初始化 table 的操作,只有主动触发扩容的线程需要执行该操作,对于后来加入“帮忙”的线程会跳过步骤 1,直接进入步骤 2。
步骤 2 通过 transferIndex 实例变量协调任务的分配,并为每个线程分配 stride 个结点进行迁移,任务分配的过程实际上就是确定当前线程迁移结点的上下界的过程,该过程位于 while 循环中(即代码注释 2.1)。该循环整体上就是一个 CAS 操作,如果迁移任务已经完成,或者没有剩余的结点可以迁移(实例变量 transferIndex 小于等于 0),则退出 CAS,否则尝试为本次任务分配新的上下界,同时更新 transferIndex 值。
接下来正式开始迁移工作,整体流程可以概括为:
检查整体迁移任务是否完成,如果完成则更新 table 和 sizeCtl 值;
否则,检查当前任务是否已经完成,如果完成则退出本次任务;
对于仍在进行中的任务会继续执行迁移操作,如果当前结点是一个空结点,则在该位置设置一个空的 ForwardingNode 结点,用于标记当前结点正在迁移中;
否则,如果当前结点是一个 ForwardingNode 结点,即当前结点正在迁移中,进入下一轮任务分配;
否则,对当前结点执行迁移操作。
下面针对流程中的一些关键点进行说明,首先来看一下步骤 2 相关的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 if (U.compareAndSwapInt(this , SIZECTL, sc = sizeCtl, sc - 1 )) { if ((sc - 2 ) != resizeStamp(n) << RESIZE_STAMP_SHIFT) { return ; } finishing = advance = true ; i = n; }
前面我们曾提到当新增一个线程支持迁移任务时会执行 U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)
操作,并且在扩容操作开始前会设置 sizeCtl 的值为 (rs << RESIZE_STAMP_SHIFT) + 2
,而这里在完成一个任务的时候会执行 U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)
操作将 sizeCtl 的值减 1。上面的代码会判定当前 sizeCtl 值是否等于 (rs << RESIZE_STAMP_SHIFT) + 2
,如果相等则说明整体扩容任务完成,否则仅说明当前任务完成,将线程任务数减 1。
接下来我们继续来看一个结点迁移的过程,迁移区分链表和红黑树,不过基本思想是想通的,这里以链表为例进行说明,相关实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 if (fh >= 0 ) { int runBit = fh & n; Node<K, V> lastRun = f; for (Node<K, V> p = f.next; p != null ; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0 ) { ln = lastRun; hn = null ; } else { hn = lastRun; ln = null ; } for (Node<K, V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0 ) { ln = new Node<>(ph, pk, pv, ln); } else { hn = new Node<>(ph, pk, pv, hn); } } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true ; }
这一段代码如果希望更好的理解,建议自己模拟一个 table,并 debug 一下执行流程。其实也不难,这段代码的工作就是将一个链表拆分成两个链表,并将它们插入到新 table 适当的位置。假设老的 table 长度为 16,那么上面的实现有一个巧妙的地方在于对链表中所有结点的哈希值执行 p.hash & n
操作,其结果不是 0 就是 16(老 table 的长度)。所以我们可以依据 p.hash & n
的值将一个链表拆分成两个链表,其中值均为 0 的结点构成的链表仍然放置在新 table 的当前位置 i,而值均为 16 的结点构成的链表则放置在新的位置,即 i + 16。变量 lastRun 所表示的结点实际上是最后几个具备相同 p.hash & n
值的连续结点的最左边结点,因为这样可以减少该结点右边几个结点的迁移工作,因为它们具备相同的 p.hash & n
值,自然也就位于同一个链表上。
获取指定键值
方法 put 的执行流程可以加深我们对于 ConcurrentHashMap 存储结构的理解,而理解了 ConcurrentHashMap 的存储结构,那么分析 get 方法的运行机制也是水到渠成的事情,实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public V get (Object key) { Node<K, V>[] tab; Node<K, V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1 ) & h)) != null ) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) { return e.val; } } else if (eh < 0 ) { return (p = e.find(h, key)) != null ? p.val : null ; } while ((e = e.next) != null ) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { return e.val; } } } return null ; }
上述方法首先依据相同的实现计算 key 的哈希值,然后定位 key 在 table 中的 bin 位置。如果 bin 结点存在,则依据当前 bin 类型(链表或红黑树)检索目标值。
参考
JDK 1.8 源码
Java7/8 中的 HashMap 和 ConcurrentHashMap 全解析