Author: Vivo Game Technology team
An overview,
ConcurrentHashMap (hereinafter referred to as C13Map) is one of the data structures with the highest concurrent programming presence. A large number of concurrent cases are supported by C13Map, and it is also the component with the largest amount of code in JUC package (more than 6000 lines). Oracle has done a lot of work on it since JDK8.
Starting from the basic knowledge of HashMap, this paper tries to analyze the implementation and security guarantee of each component in C13Map one by one.
2. Basics of HashMap
Before analyzing C13MAP, you need to know the following HashMap knowledge or conventions:
- The length of a hash table is always a power of 2 because hashCode %tableSize== HashCode &(tableSize-1).
- The hashCode perturbation function is called several times to xor the high 16 bits with the low 16 bits, because the high 16 bits are more random.
- If the number of elements in a table exceeds tableSize * 0.75, tableSize is added to the hash table.
- Buckets, hashing buckets, and BIN all represent the same concept, that is, a column in a hash table.
- When the old table is being moved, the node in slot I can determine whether slot I or I +tableSize on the new table based on the bit of the tableSize bit in the hash value.
- Hash conflicts may occur in each slot. When a certain threshold is not reached, it is a linked list structure, and when a threshold is reached, it is upgraded to a red-black tree structure.
- HashMap itself is not designed for multithreaded environments, never try to use HashMap directly in a concurrent environment, C13Map does not have this security issue.
3. Field definition of C13Map
C13Map field definition
Private static final int MAXIMUM_CAPACITY = 1 << 30; Private static final int DEFAULT_CAPACITY = 16; Static final int MAX_ARRAY_SIZE = integer. MAX_VALUE - 8; Private static final int DEFAULT_CONCURRENCY_LEVEL = 16; private static final int DEFAULT_CONCURRENCY_LEVEL = 16; Private static final float LOAD_FACTOR = 0.75f; Static final int TREEIFY_THRESHOLD = 8; Static final int UNTREEIFY_THRESHOLD = 6; Static final int MIN_TREEIFY_CAPACITY = 64; static final int MIN_TREEIFY_CAPACITY = 64; Private static final int MIN_TRANSFER_STRIDE = 16; Private static final int RESIZE_STAMP_BITS = 16; private static final int RESIZE_STAMP_BITS = 16; Private static final int MAX_RESIZERS = (1 << (32-resize_stamp_bits)) - 1; Private static final int RESIZE_STAMP_SHIFT = 32-resize_stamp_bits; private static final int RESIZE_STAMP_SHIFT = 32-resize_stamp_bits; static final int MOVED = -1; ForwardingNode static final int TREEBIN = -1; Static final int RESERVED = -1; Node static final int HASH_BITS = 0x7fffffff; Transient Volatile Node<K,V>[] table transient volatile Node<K,V>[] table; Private TRANSIENT volatile Node<K,V>[] nextTable; Private TRANSIENT long baseCount; private transient long baseCount; Private TRANSIENT volatile int sizeCtl; private transient volatile int sizeCtl; Private TRANSIENT int transferIndex; // CAS obtains the lower value of the segment during concurrent handling. Private TRANSIENT volatile int cellsBusy; private transient volatile int cellsBusy; Private TRANSIENT Volatile CounterCell[] counterCells;Copy the code
4. Operate Node safely
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getReferenceAcquire(tab, ((long)i << ASHIFT) + ABASE);
}
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSetReference(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putReferenceRelease(tab, ((long)i << ASHIFT) + ABASE, v);
}
Copy the code
The Unsafe helper class is also used to read and write any index on Node<K,V>[]. The volatile nature of table itself does not guarantee that the memory semantics of each element under table is volatile.
Unsafe is needed to ensure atomic or visibility for the “read/write /CAS” operations of Node<K,V>[] elements in a multi-core, concurrent environment.
Why is get thread-safe
To be clear, C13Map reads are generally unlocked (except for TreeBin’s read/write locks), and read and write operations can be parallel; C13Map writes require the syncronized mutex of the bin header, which ensures that only one thread is updating at most. This is a single thread write/multithread read concurrency issue.
Get method of C13Map
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; Int h = spread(key.hashcode ()); if ((tab = table) ! = null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) ! = null) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek ! = null && key.equals(ek))) return e.val; } else if (eh < 0) return (p = e.find(h, key)) ! = null ? p.val : null; while ((e = e.next) ! = null) { if (e.hash == h && ((ek = e.key) == key || (ek ! = null && key.equals(ek)))) return e.val; } } return null; }Copy the code
1. If the current hash table is null
Return NULL if the hash table has not been initialized or is not being initialized. Although other threads may have gone through a lot between line5 and line18, at least the key must not exist at the point in time when TAB ==null is determined.
2. If the bin header is null
If no node exists in the slot, null is returned.
3. If bin is a linked list
Note The head Node is a common Node.
(1) If a list is working to a red-black tree’s Treeify, the treeify does not break the structure of the old list bin itself, just replace the first node with the newly created TreeBin once the entire treeify is complete.
(2) If resize is occurring and bin is being transferred, the transfer itself does not destroy the structure of the old linked list bin, but the head node is replaced with ForwardingNode once after all transfer is completed, which can be read safely.
(3) If other threads are operating the linked list, add/replace/remove may occur at any point in time when the current thread traverses the list.
- If it is an add operation, the tailNode will be traversed more or less than once since JDK8.
- If the operation is remove, when a Node is traversed, another thread will remove it, causing it to be isolated from the whole linked list. But because its next reference has not changed, the entire list is not broken and you can still traverse the list up to tailNode as usual.
- If replace is used, the structure of the linked list is unchanged, but the value of a Node is changed. There is no security problem.
** Conclusion: ** For linear data structures like linked lists, concurrent reads are safe on the premise that single-threaded write and insert operations are guaranteed to be back-loaded; There will be no misreading, link list disconnection caused by missed reading, read the circular list and other problems.
4. If bin is a red-black tree
The head node is a TreeBin node.
(1) If a red-black tree to the linked list untreeify operation is taking place, because unTreeify itself does not destroy the old red-black tree structure, but after all untreeify is completed, the head Node is replaced with a newly created normal Node, which can be read safely.
(2) If resize is occurring and bin is being transferred, the transfer itself does not destroy the old red-black tree structure, but the head node is replaced with ForwardingNode once after all transfer is completed, which can be read safely.
(3) If other threads are operating the red-black tree, at any point in time when the current thread traverses the red-black tree, there may be a single other thread to flip the add/replace/remove/ red-black tree and other operations, refer to the implementation of read/write lock of the red-black tree below.
Read/write lock implementation in TreeBin
TreeNode<K,V> root; volatile TreeNode<K,V> first; volatile Thread waiter; volatile int lockState; // values for lockState static final int WRITER = 1; // set while holding write lock static final int WAITER = 2; // set when waiting for write lock static final int READER = 4; // Increment Value for setting read lock private final void lockRoot() {increment value for setting read lock private final void lockRoot() { Loop to acquire write locks or sleep until if (! U.compareAndSetInt(this, LOCKSTATE, 0, WRITER)) contendedLock(); // offload to separate method } private final void unlockRoot() { lockState = 0; } private final void contendedLock() {Boolean waiting = false; for (int s;;) {// If the lockState is 0 in all but the second digit, the red-black tree has no read lock or write lock. If (((s = lockState) &~ WAITER) == 0) {if (U.compareAndSetInt(this, lockState, s, WRITER)) { if (waiting) waiter = null; return; Else if ((s & WAITER) == 0) {// If (s & WAITER) == 0) {// If (s & WAITER) == 0) Said a thread waiting to write locks if (U.com pareAndSetInt (this, LOCKSTATE, s, s | WAITER)) {waiting = true; waiter = Thread.currentThread(); Else if (waiting) locksupport. park(this); Final Node<K,V> find(int h, Object K) {if (K! = null) { for (Node<K,V> e = first; e ! = null; ) { int s; K ek; // If a waiter or write lock is present, use linear search, because the red-black tree replaces the linked list, but retains the structure of the linked list. Although the query performance of the linked list is not good, the security of reading is guaranteed according to the previous analysis. // If a write lock is found, it is changed to linear search to avoid waiting too long for the write lock release. In order to avoid too much superposition of read locks, the lock write thread will need to wait too long. Are essentially in order to reduce the reading and writing in the process of collision / / linear traversal, do every traverse to the next node of a judgment, once found the lock contention is less likely to walk the tree retrieval to improve the performance of the if (((s = lockState) & (WAITER | WRITER))! = 0) { if (e.hash == h && ((ek = e.key) == k || (ek ! = null && k.equals(ek)))) return e; e = e.next; } // add a shared lock to the red-black tree. Else if (U.compareAndSetInt(this, LOCKSTATE, s, s + READER)) {TreeNode<K,V> r, p; try { p = ((r = root) == null ? null : r.findTreeNode(h, k, null)); } finally { Thread w; / / release read lock, if after release and a waiter, to awaken the if (U.g etAndAddInt (this, LOCKSTATE - READER) = = (READER | waiter) && (w = waiter)! = null) LockSupport.unpark(w); } return p; } } } return null; } final TreeNode<K,V> putTreeVal(int h, K K,V V) {Class<? > kc = null; boolean searched = false; for (TreeNode<K,V> p = root;;) { int dir, ph; K pk; / /... Else {// Write with mutex lockRoot(); try { root = balanceInsertion(root, x); } finally {// Release the mutex unlockRoot(); } } break; } } assert checkInvariants(root); return null; }}Copy the code
The red-black tree has a set of built-in lock read/write logic, which defines a 32-bit int variable lockState. The first bit is the write lock flag bit, the second bit is the write lock wait flag bit, and from 3 to 32 bits is the shared lock flag bit.
Read/write operations are mutually exclusive, allowing multiple threads to read data at the same time. However, parallel read/write operations are not allowed. Only one thread can perform write operations at a time. So at any point in time, it’s a legitimate red-black tree, and it’s safe on the whole.
Some students may wonder why there is no operation to wake the waiter when the write lock is released. Is it possible that thread A enters the waiting area and thread B obtains the write lock and releases the write lock only with lockState=0?
Does thread A have no chance of being woken up, only waiting for the next read lock to be released?
C13Map does not have such omissions. Further observation shows that the periphery of the red-black tree change operation, that is, the putValue/replaceNode layer, is the synchornized mutex for the BIN header node. Only one writer thread can enter the TreeBin method at a time. If the waiter is not empty, the writer thread can acquire the write lock without worrying about being unable to wake up.
TreeBin makes a tradeoff between linearSearch, which has poor performance but concurrency safety, and treeSearch, which requires concurrency control and may result in lock contention. Designers use linear search to avoid lock contention caused by read/write collisions as much as possible, but when the race condition is evaluated to disappear, they tend to use tree search to improve performance immediately, achieving an excellent balance between security and performance. Refer to the find method and comments for a detailed compromise strategy.
The bottom-diving scheme of linear retrieval and the synchornized mechanism of the entry bin header ensures that only one writer thread enters the entire TreeBin code block; The overall design of read/write locks in TreeBin is relatively simple compared to ReentrantReadWriteLock. For example, there is no threadQueue for wake-up threads, reader threads spin instead of blocking, and so on. Think of it as a simplified version of ReadWriteLock for certain conditions.
5. If bin is a ForwardingNode
Bin has been migrated. Call its find method to read data in nextTable.
ForwardingNode’s find method
static final class ForwardingNode<K,V> extends Node<K,V> { final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null); this.nextTable = tab; } // Recursion <K,V> find(int h, Object K) {// Loop to avoid deep recursion on forwarding nodes outer: for (Node<K,V>[] tab = nextTable;;) { Node<K,V> e; int n; if (k == null || tab == null || (n = tab.length) == 0 || (e = tabAt(tab, (n - 1) & h)) == null) return null; for (;;) { int eh; K ek; if ((eh = e.hash) == h && ((ek = e.key) == k || (ek ! = null && k.equals(ek)))) return e; if (eh < 0) { if (e instanceof ForwardingNode) { tab = ((ForwardingNode<K,V>)e).nextTable; continue outer; } else return e.find(h, k); } if ((e = e.next) == null) return null; }}}}Copy the code
ForwardingNode stores a reference to nextTable and searches it to the next hash table, but nextTable is not guaranteed to be currentTable, because high concurrent inserts can lead to multiple expansion of the hash table in a very short time. It is very likely that there is a hash chain in memory, connected to each other by a ForwardingNode on the head node of bin, and that the thread starts with Table1 and traverses through the hash chain.
There are three cases when eh<0:
- If it’s ForwardingNode, go through the next hash table.
- In the case of TreeBin, its find method is called into the TreeBin read-write lock’s protected area to read the data.
- If it is a ReserveNode, bin is an empty structure in the current compute and returns NULL.
6. If the bin read is a ReserveNode
The method used by ReserveNode for compute/computeIfAbsent atom calculation by placing a ReserveNode placeholder mark on the BIN head node when the BIN head node is null and the calculation has not yet been completed.
When a read operation finds that ReserveNode returns NULL, the write operation enters a blocking state due to competing for the mutex of ReserveNode, and is woken up after compute has completed and then retry.
PutValue /replaceNode putValue/replaceNode
Typical programming paradigms are as follows:
PutValue method of C13Map
Node<K,V>[] tab = table; // Assign table from the heap to a local variable in the thread stack Node f = tabAt(TAB, I); If (f==null){// If (casTabAt(TAB, I, null, new Node<K,V>(hash, key, value))) break; }else if(f.hash == MOVED){// helpTransfer(TAB,f); } // not forwardingNode else if(f.hash! = MOVED){synchronized (f) {if (tabAt(TAB, I) == f) {if (tabAt(TAB, I) == f) { Various writes}}}Copy the code
1. If the head node of the current slot is null, directly write CAS data
Some people may question whether it is possible to lose data by writing bin of an old table if resize operation has been completed and table is converted to nextTable.
This possibility does not exist, because after the resize of a table is completed, all BIN will be marked as ForwardingNode, which can be vividly understood as all slots are filled with red flags. Here, the null variable of compare in CAS, It is guaranteed that the table has not changed at least at the point in time when the CAS atomic operation occurs.
2. If the head node of the current slot is not null
Here we use a little trick: first lock the head node in slot I, enter the synchronization code block, and then doubleCheck to see if the head node in slot I has changed.
DoubleCheck is required after entering the synchronization block: Although the head node F obtained at the beginning was not ForwardingNode, before obtaining the synchronization lock of F, other threads may have obtained the synchronization lock of F in advance and completed the transfer work, and marked the head node in slot I as ForwardingNode. F becomes an obsolete bin header.
However, since the marking operation and transfer as a whole are executed in the synchronized code block, if the result of doubleCheck is that the head node on this slot is still F, It indicates that the slot has not been transferred to the new table at least at the current point in time (if there is transfer in progress). You can safely perform put, remove, and replace operations on the bin.
As long as there is no transfer or Treeify operation, the new operations of the linked list are backward. Once the head node is confirmed, it will not be changed easily. This backward update mode ensures that locking the head node is equivalent to locking the whole bin.
If doubleCheck judgment is not performed, the current slot may have been transferred and BIN of the old table may be written, resulting in the loss of written data. It is also possible that before acquiring the synchronization lock for F, another thread does a Treeify operation on the BIN and replaces the head node with TreeBin, resulting in writing to the old list instead of the new red-black tree.
3. Whether doubleCheck has ABA problem
One might wonder if there is an ABA problem if another thread removes/puts the current bin, introduces a new head Node, and the JVM frees and reallocations so that the new Node references the same address as the old one.
This can be refuted by contradiction. In a GC environment, ABA problems usually do not occur because the current thread contains a reference to the head node F, the current thread is not dead, and there is no possibility that the memory of the F node will be reclaimed by GC.
If the main hash table changes during the write process, whether it is possible to write the bin of the old table and cause data loss, this can also be overturned by contradiction. This is because tables can be converted to nextTable (that is, new hash tables after resize are committed) only after all slots have been transferred successfully. If only one bin has not been transferred successfully, the current table has not changed. At the current point in time, you can safely write data to the bin of the table.
4, how to operate safely
It can be concluded that after CAS operation is successfully performed on the slot of table and the compare value is null, or the head node of the non-forwardingNode of the slot is locked, the doubleCheck head node does not change, and all write operations on bin are safe.
Methods of atomic calculation
Atomic computing includes four methods: computeIfAbsent, computeIfPresent, compute, and merge.
1. Comparison of several methods
The main differences are as follows:
(1) computeIfAbsent will only be inserted when the key is detected to be absent. Null and insert are an atomic operation. The provided FunctionalInterface is a binary Function that accepts the key parameter and returns the value result. If the result is null, no insert is performed.
(2) computeIfPresent will only be updated when the order to Key is not empty, and the determination of non-empty and insert is an atomic operation. The provided FunctionalInterface is a ternary BiFunction, which accepts Key and value and returns a new value result. If the new value is null, the node corresponding to the key is deleted.
(3) Compute is a ternary BiFunction, which takes key and value, and returns a new value. If the old value does not exist, null is used for calculation. If the new value is null, the node corresponding to the key does not exist.
The function interface provided is a ternary BiFunction. It takes oldValue and newVALUE and returns the value after the merge. If the old value does not exist, newVALUE is directly used as the final result. If the old value does exist, the result after the merge is returned. If the final result is null, the node corresponding to the key is guaranteed not to exist.
2. When ReserveNode is used as a placeholder
CasTabAt (TAB, I, NULL, R) is used to generate a new node r, because the value of compare is null to ensure the safety of concurrency.
Another way is to create a placeholder ReserveNode that locks the node and sets its CAS to the head node of the bin for further atomic calculation operations. Both approaches are likely to fail at CAS and require spin trial and error.
(1) Why only computeIfAbsent/compute methods use placeholders
ComputeIfPresent only starts atomic calculation when the BIN structure is not empty, and naturally there is no need for ReserveNode placeholder. Lock the existing head node.
When the BIN structure is empty, the computeIfAbsent/compute method requires Function or BiFunction operation. The operation is imported from the outside and the time required cannot be accurately evaluated. In this case, casTabAt(TAB, I, NULL, R) is used. If another thread updates the BIN earlier, the new head node needs to be locked and the atomic calculation is repeated. (C13Map does not cache the results of the last calculation. Because the input of the calculation may change), this overhead is relatively large.
The ReserveNode method does not need to wait until the atom calculates the result, so it can preempt the ownership of the BIN first, causing other concurrent writer threads to block.
(2) Why does merge method not need placeholder
The reason is that if the BIN structure is empty, according to the merge processing strategy, the old value is empty, then the new value is directly replaced, so that the calculation of the merge between the new and old values is omitted, and the consumption is almost zero. Therefore, casTabAt(TAB, I, NULL, R) can be used to directly modify the ReserveNode, avoiding the unnecessary cost of two CAS changes after the ReserveNode is locked.
Compute method for C13Map
public V compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) { if (key == null || remappingFunction == null) throw new nullPointerException(); int h = spread(key.hashCode()); V val = null; int delta = 0; int binCount = 0; for (Node<K, V>[] tab = table; ;) { Node<K, V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) tab = initTable(); Else if ((f = tabAt(TAB, I = (n-1) & h)) == null) {// Create a placeholder Node Node<K, V> r = new ReservationNode<K, V>(); If (casTabAt(TAB, I, null, r)) {binCount = 1; Node<K, V> node = null; If ((val = remappingfunction. apply(key, null))! = null) { delta = 1; node = new Node<K, V>(h, key, val, null); }} finally {// Set setTabAt(TAB, I, node); } } } if (binCount ! = 0) break; } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { synchronized (f) { if (tabAt(tab, I) == f) {if (fh >= 0) {else if (f instanceof TreeBin) {}}}}} if (delta! = 0) addCount((long) delta, binCount); return val; }Copy the code
3. How to ensure atomicity
ComputeIfAbsent/computeIfPresent sentenced to empty and calculation is atomic operations, according to the above analysis mainly through casTabAt (TAB, I, null, r) atomic operation, or use ReserveNode placeholder and lock, Or lock the bin’s head node.
In other words, the whole bin is always locked. After obtaining whether the value of the target KEY is null, other threads cannot change the value of the target KEY, so nullation and calculation are naturally atomic.
CasTabAt (TAB, I, NULL, R) is guaranteed by atomic instructions at the hardware level to ensure that the same memory region cannot be changed by any other instruction between compare and set.
Viii. Concurrent transfer during resize
There are three places in C13Map that will trigger the call of transfer method, namely addCount, tryPresize and helpTransfer functions.
- AddCount is used to check the number of elements after the write operation is complete. If it exceeds the sizeCtl threshold, resize expansion and transfer of the old table to the new table will be triggered.
- TryPresize is putAll’s self-test before inserting a collection at once. If the number of collections is large, a resize expansion and transfer of the old table to the new table will be triggered in advance.
- HelpTransfer: During write operations, it is found that the head node of bin is ForwardingNode, and helpTransfer is called to help move.
1. Start the check before transfer
Take the check logic in addCount as an example:
Transfer check in addCount
Node<K, V>[] tab, nt; int n, sc; // tableSize has exceeded the sizeCtl threshold and is smaller than the maximum value. While (s >= (long) (sc = sizeCtl) && (TAB = table)! = null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); If (sc < 0) {if (sc >>>) RESIZE_STAMP_SHIFT! = rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; If (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(TAB, nt); } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) Transfer (TAB, NULL); s = sumCount(); }Copy the code
Multiple CAS operations are applied to the sizeCtl variable, a global control variable.
Private TRANSIENT volatile int sizeCtl;
- An initial value of 0 indicates that the hash table has not been initialized
- A value of -1 indicates that initialization is under way, and only one thread is allowed to enter the initialization code block
- After the initialization or reSize is successful, sizeCtl=loadFactor * tableSize Specifies the threshold for triggering additional capacity expansion. It is a positive integer
- During capacity expansion, sizeCtrl is a negative integer. The high 16 bits are the postmark resizeStamp associated with tableSize, and the low 16 bits are the number of moving threads plus 1
Table, sizeCtrl, and nextTable are assigned to local variables each time in the method body to ensure that the latest values are read and that the variables are stable during logical calculations.
If the 16-bit high postmark in the sizeCtrl does not match tableSize, the number of tableSize threads has reached the maximum value, or all moved threads have exited (exited only after all slots have been traversed; otherwise, the loop will continue), or the move operation is skipped when the nextTable is cleared.
For sizeCtrl>=0, the initial number of threads is set to 2; for sizeCtrl<0, the value is increased by 1. After the CAS is successful, the handling operation starts.
The reason for setting the initial value for the first thread to 2 is that the total number of threads involved in porting is -1 when the thread exits through the CAS operation. If the initial value is set to 1 as usual, then the initial value decreases by 1 to 0.
At this time, when other threads find that the number of threads is 0, they cannot distinguish whether there is no thread that has done the handling, or whether there are threads that have finished the handling but all quit, and they cannot judge whether to join the handling.
It is important to note that the code “sc = = rs + 1 | | sc = = rs + MAX_RESIZERS” is the obvious bugs in JDK8, less rs unsigned left 16 operation; JDK12 has fixed this issue.
2. Concurrent handling process and exit mechanism
Transfer method of C13Map
private final void transfer(Node<K, V>[] tab, Node<K, V>[] nextTab) { int n = tab.length, stride; If ((stride = (NCPU > 1)? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; If (nextTab == null) {try {// The first handling thread responsible for initializing nextTable Node<K, V>[] nt = (Node<K, V>[]) new Node<? ,? >[n << 1]; nextTab = nt; } catch (Throwable ex) { sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; TransferIndex = n; } int nextn = nextTab.length; // Public forwardingNode forwardingNode <K, V> FWD = new forwardingNode <K, V>(nextTab); boolean advance = true; boolean finishing = false; For (int I = 0, bound = 0; ;) { Node<K, V> f; int fh; While (advance) {int nextIndex, nextBound; // Exit the loop body if the current section has not been completed. Last bottom scan, auxiliary do I only minus one operation if (-- > = I bound | | finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } if (U.compareAndSwapInt(this, SIZECTL, sc = SIZECTL, sc-1)) {// Not the last exit thread if ((sc-2)! = resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; // The last thread to push I back to the highest bit, forcing the last full table scan; The program simply executes the following else if code to see if any slots are missing, or if all of them are forwardingNode flags; I = n; I = n; I = n; }} else if ((f = tabAt(TAB, I)) == null) // Add forwardingNode to the empty slot. Advance = casTabAt(TAB, I, NULL, FWD); else if ((fh = f.hash) == MOVED) advance = true; // The last bottom check time, Else {// Lock the header synchronized (f) {if (tabAt(TAB, I) == f) {Node<K, V> ln, hn; if (fh >= 0) { //...... The linked list handling code is omitted here: The responsibility is to split the linked list in two and move it to setTabAt(nextTab, I, LN) in slots I and I + N of nextTable. setTabAt(nextTab, i + n, hn); ForwardingNode setTabAt(TAB, I, FWD); advance = true; } else if (f instanceof TreeBin) { //...... The red-black tree transport code is omitted: The responsibility is to split the red-black tree into two pieces and transport it to slots I and I + N of nextTable. If the red-black tree degradation condition is met, degrade it to a linked list setTabAt(nextTab, I, ln). setTabAt(nextTab, i + n, hn); ForwardingNode setTabAt(TAB, I, FWD); advance = true; } } } } } }Copy the code
If multiple threads are concurrently moved, the first thread initializes the nextTable. Then, with the help of the global transferIndex variable, the table is scanned and moved from the current N-1 slot to the lower slot successively, and one section is obtained at a time through the CAS operation of the transferIndex (the default is 16). When the transferIndex reaches the lowest level, no new section can be obtained. The thread exits, and when it exits, the total number of threads is reduced by one in sizeCtl. The last thread to exit pushes the scan coordinate I back to the highest bit, forcing a bottom-fetching global scan.
3. Read and write security analysis in transfer process
(1) Firstly, is it possible for the global hash table to resize several times during the transfer process, or there is a risk of expiration?
Observing the code of nextTable to table, it is found that the table cannot be replaced only after all threads have been moved and quit, so the table cannot be replaced if only one thread is in the transfer block. So there is no risk of table expiration.
(2) Is there any security risk when there are concurrent write operations?
Because both transfer operation and write operation compete with syncronized lock of bin’s head node, they are mutually exclusive serial. When the writer thread gets the lock, it doubleCheck, finds out it’s not the original head node, doesn’t do anything, finds out it’s a forwardingNode, joins the move until the new table is committed, and then goes directly to the new table.
Submissions of nextTable are always made after all slots have been moved and the ForwardingNode identifier is inserted, so as long as new tables are committed, old tables cannot be written. This effectively prevents data from being written to the old table.
Reasoning: access to the bin head node synchronization locks began to write operation — — — — — — — — — — > transfer inevitably unfinished — — — — — — — — – > the new table must not submit — — — — — — — – > writing must be the current table.
This means that both the old table and the new table can never be written to at the same time, and that table can only be read when written to.
(3) Is there any security risk when there are concurrent read operations?
The transfer operation does not destroy the old bin structure. If the transfer has not started, the old bin structure will be traversed as usual. If the handling is complete, the find method of forwadingNode is called to recursively query the new table, as described in forwadingNode above.
9. Traverser
Given the existence of generic apis such as Iterator or containsValue, and the fact that some business scenarios really need to traverse the entire Map, it makes sense to design a traversal mechanism that is secure and performance guaranteed.
The difficulty of C13Map traverser is that the read operation and transfer may be parallel, and how to deal with the problem of forwadingNode when scanning each bin.
Due to the concurrent transfer mechanism, when a forwadingNode is encountered in a slot, it only indicates that the current slot has been moved, but does not mean that subsequent slots have been moved or have not been moved. In other words, the following slots are in an uncontrollable state.
The solution is to introduce a mechanism similar to method call stack. When jumping to nextTable, the current table and slots are recorded and pushed, and then the I and I +n slots of the nextTable are traversed. If forwadingNode is pushed again, the cycle repeats.
Each time if slot I + N reaches the right segment and is about to overflow, it will follow the original rule of pushing out the stack, that is, go back to the previous context node, and finally return to the original table, that is, the node in the initialTable.
Traverser component of C13Map
static class Traverser<K,V> { Node<K,V>[] tab; // current table; updated if resized Node<K,V> next; // the next entry to use TableStack<K,V> stack, spare; // to save/restore on ForwardingNodes int index; // index of bin to use next int baseIndex; // current index of initial table int baseLimit; // index bound for initial table final int baseSize; // initial table size Traverser(Node<K,V>[] tab, int size, int index, int limit) { this.tab = tab; this.baseSize = size; this.baseIndex = this.index = index; this.baseLimit = limit; this.next = null; } /** * return the next Node */ final Node<K,V> advance() {Node<K,V> e; if ((e = next) ! = null) e = e.next; for (;;) { Node<K,V>[] t; int i, n; // local variables guarantee stability if (e! = null) return next = e; if (baseIndex >= baseLimit || (t = tab) == null || (n = t.length) <= (i = index) || i < 0) return next = null; if ((e = tabAt(t, i)) ! = null && e.hash < 0) { if (e instanceof ForwardingNode) { tab = ((ForwardingNode<K,V>)e).nextTable; e = null; pushState(t, i, n); continue; } else if (e instanceof TreeBin) e = ((TreeBin<K,V>)e).first; else e = null; } // If (stack! = null) recoverState(n); Else if ((index = I + baseSize) >= n) index = ++baseIndex; // visit upper slots if present}} // private void pushState(Node<K,V>[] t, int I, int n) { TableStack<K,V> s = spare; // reuse if possible if (s ! = null) spare = s.next; else s = new TableStack<K,V>(); s.tab = t; s.length = n; s.index = i; s.next = stack; stack = s; Private void recoverState(int n) {TableStack<K,V> s; int len; While ((s = stack)! If (s = stack) = (s = stack)! If (s = stack)! = null && (index += (len = s.length)) >= n) { n = len; index = s.index; tab = s.tab; s.tab = null; TableStack<K,V> next = s.next; s.next = spare; // save for reuse stack = next; spare = s; If (s == null && (index += baseSize) >= n) index = ++baseIndex; }}Copy the code
Assume that the initial table initalTable=table1 in the whole traversal process, and the largest table at the end of the traversal is Table5, that is, the traversal process experienced four capacity expansion, which belongs to the most complex scene of traversal and capacity expansion;
So the whole traversal process is a reference table based on the initialization table initalTable, and slots I and I + N of the following table are the jump targets of forwadingNode, similar to the process of particle fission radiating from the lowest table to the highest table.
Traverser traverser cannot always traverse all slots in a table, but if you assume that a slot in a lower-order table will always have a projection in the highest-order table, for example, a node in Table1 will have 16 projections in Table5.
Traverser enables the projection of all slots traversed at once onto the highest order table without leaving anything out.
10. Concurrency counts
As in a HashMap, where the size field is defined directly, getting the totalCount of an element in a C13MAP definitely does not traverse the entire data structure; C13MAP designed the CounterCell[] array to solve the problem of concurrent counting.
The mechanism of CounterCell[] does not care about the change of old and new tables. No matter the new table or old table is operated, there is no essential difference in counting. CounterCell[] only cares about the increase or decrease of the total amount.
1. Memory alignment from LongAdder to CounterCell
C13MAP refers to the counting mechanism of LongAdder and Striped64 in JUC, and a large number of codes are repeated with LongAdder and Striped64. Its core idea is the counting operation of 64-bit long data in multi-core environment. Concurrency is safe with volatile and CAS operations, but because multiple cores operate in the same memory area, each CPU has its own local cache, such as LV1 cache, LVL2 cache, registers, and so on.
Due to the existence of the memory consistency protocol MESI, the frequent refresh of the local Cache affects the performance. A better solution is that each CPU only operates on a fixed area of memory alignment, and finally uses the summation method to count.
This method can improve performance, but it is not suitable for all scenarios because the final value is estimated by summation. The summation process of CounterCell is not atomic and cannot represent the exact value at a certain moment, so atomic operations such as compareAndSet cannot be supported.
2, CounterCell[], cellBusy, baseCount
In CounterCell[], an exponential power of 2 May be expanded during concurrent operations. Each expansion is twice the original size. Once the number of CPU cores exceeds, the expansion will not be extended, because the total number of cpus is usually an exponential power of 2. CounterCell[] When initializing, expanding, and filling elements, cellBusy is used for spinLock control. BaseCount is the basic data.
When the concurrency is not so large and CAS does not fail, count directly based on baseCount variable; Once CAS fails, it indicates concurrency conflict, and the initialization or expansion operation of CounterCell[] is considered. However, when initialization is not completed, it is still counted as a bottom-fishing scheme.
So the final technical sum =baseCount+ all the values in counterCells.
C13Map addCount method
private final void addCount(long x, int check) { CounterCell[] cs; long b, s; // Always count baseCount directly until the first failure occurs or a ready-made array of counterCells [] is available if ((cs = counterCells)! = null || ! U.compareAndSetLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell c; long v; int m; Boolean uncontended = true; If (cs = = null | | (m = cs. Length - 1) < 0 | | / / Sir Into random number again to CounterCell [] array size o, which is randomly assigned to one of the slot (c = cs[ThreadLocalRandom.getProbe() & m]) == null || ! (uncontended = U.compareAndSetLong(c, CELLVALUE, v = C. value, v + x)) { uncontended); return; } if (check <= 1) return; s = sumCount(); } if (check >= 0) {Node<K,V>[] TAB, nt; int n, sc; while (s >= (long)(sc = sizeCtl) && (tab = table) ! = null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n) << RESIZE_STAMP_SHIFT; if (sc < 0) { if (sc == rs + MAX_RESIZERS || sc == rs + 1 || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } else if (U.compareAndSetInt(this, SIZECTL, sc, rs + 2)) transfer(tab, null); s = sumCount(); }}}Copy the code
ThreadLocalRandom is a random number generator in the thread context, which can not be affected by other threads and improve the performance of random number generation. The initialization or expansion of CounterCell[] is always performed after CAS fails, that is, when multithreading competition is clearly perceived.
C13Map fullAddCount method
Private final void fullAddCount(long x, Boolean wasUncontended) {int h; if ((h = ThreadLocalRandom.getProbe()) == 0) { ThreadLocalRandom.localInit(); // force initialization h = ThreadLocalRandom.getProbe(); wasUncontended = true; } boolean collide = false; // Is there a new conflict for (;;) { CounterCell[] cs; CounterCell c; int n; long v; if ((cs = counterCells) ! = null && (n = cs.length) > 0) {if ((c = cs[(n-1) &h]) == null) {// If (cellsBusy == 0) { // Try to attach new Cell CounterCell r = new CounterCell(x); // Optimistic create if (cellsBusy == 0 && U.compareAndSetInt(this, CELLSBUSY, 0, 1)) { boolean created = false; try { // Recheck under lock CounterCell[] rs; int m, j; if ((rs = counterCells) ! = null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; // Slot is now non-empty } } collide = false; } else if (! wasUncontended) wasUncontended = true; Else if (U.compareAndSetLong(c, CELLVALUE, v = c.value, v + x)) break; else if (counterCells ! = cs || n >= NCPU) collide = false; // If the maximum number of CPU cores is exceeded, or if counterCells are detected to have expanded, set the conflict state to none else if (! collide) collide = true; // If none of the above conditions is met, there must be a conflict. Else if (cellsBusy == 0 &&u.com pareAndSetInt(this, cellsBusy, 0, 1)) {try {if (counterCells == cs) // doubleCheck counterCells = array.copyof (cs, n << 1); } finally {cellsBusy = 0; } collide = false; continue; / / for sex counterCell [] retry the CAS operation} h = ThreadLocalRandom. AdvanceProbe (h); } else if (cellsBusy == 0 && counterCells == cs &&u.com pareAndSetInt(this, cellsBusy, 0, cellsBusy) 1)) {// Initialize the array for the first time. try { // Initialize table if (counterCells == cs) { CounterCell[] rs = new CounterCell[2]; rs[h & 1] = new CounterCell(x); counterCells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; Else if (U.compareAndSetLong(this, BASECOUNT, v = BASECOUNT, v + x)) break; }}Copy the code
A new random number is generated and matched to a new slot for counting CAS. If the CAS fails, the system does not rush to expand capacity. Instead, it always tries to expand when CAS fails continuously.
The overall scheme of CounterCell[] is relatively independent and has little relationship with C13Map, so it can be regarded as a mature high-performance technical scheme to be used in various scenarios.
Bulk operation support similar to stream
1. A subclass of bulkTask
All batch task execution classes are subclasses of bulkTask, which has built-in implementation similar to traverser to support traversal of C13Map. ForkJoinTask is a subclass of ForkJoinTask, which supports the use of fork/join methods to perform batch tasks.
Because ForkJoinTask is not the focus of this article, only a few representative batch methods and their corresponding task implementations are listed here.
2. Several representative batch methods
Batch tasks of C13Map
Public <U> void forEach(long parallelismThreshold, BiFunction<? super K, ? super V, ? extends U> transformer, Consumer<? super U> action); Public <U> U search(long parallelismThreshold, BiFunction<? super K, ? super V, ? extends U> searchFunction); Public <U> U reduce(long parallelismThreshold, BiFunction<? super K, ? super V, ? extends U> transformer, BiFunction<? super U, ? super U, ? Var var var var var var var var var var var var var var var var var var var var var var var var var var var var var var var Then execute the reducer convergence Function public <U> U reduceValues(Long parallelismThreshold, Function<? super V, ? extends U> transformer, BiFunction<? super U, ? super U, ? extends U> reducer)Copy the code
All of the above batch methods have a unique corresponding batch task execution class, which is based on fork/ Join.
3. Implementation of batch tasks
Taking the MapReduceMappingsTask corresponding to the Reduce method listed in 2 as an example, the implementation details of fork/ Join do not belong to the scope of this paper and will not be discussed in detail.
The C13Map MapReduceMappingsTask
static final class MapReduceMappingsTask<K,V,U> extends BulkTask<K,V,U> { final BiFunction<? super K, ? super V, ? extends U> transformer; final BiFunction<? super U, ? super U, ? extends U> reducer; U result; MapReduceMappingsTask<K,V,U> rights, nextRight; MapReduceMappingsTask (BulkTask<K,V,? > p, int b, int i, int f, Node<K,V>[] t, MapReduceMappingsTask<K,V,U> nextRight, BiFunction<? super K, ? super V, ? extends U> transformer, BiFunction<? super U, ? super U, ? extends U> reducer) { super(p, b, i, f, t); this.nextRight = nextRight; this.transformer = transformer; this.reducer = reducer; } public final U getRawResult() { return result; } public final void compute() { final BiFunction<? super K, ? super V, ? extends U> transformer; final BiFunction<? super U, ? super U, ? extends U> reducer; if ((transformer = this.transformer) ! = null && (reducer = this.reducer) ! = null) { for (int i = baseIndex, f, h; batch > 0 && (h = ((f = baseLimit) + i) >>> 1) > i;) { addToPendingCount(1); (this, Batch >>>= 1, baseLimit = h, f, TAB, rights, transformer, reducer)).fork(); } U r = null; // Batch element for (Node<K,V> p; (p = advance()) ! = null; ) { U u; If ((u = transformer. Apply (p.key, p.val))! = null) r = (r == null) ? u : reducer.apply(r, u); } result = r; CountedCompleter<? > c; for (c = firstComplete(); c ! = null; c = c.nextComplete()) { @SuppressWarnings("unchecked") MapReduceMappingsTask<K,V,U> t = (MapReduceMappingsTask<K,V,U>)c, s = t.rights; while (s ! = null) { U tr, sr; if ((sr = s.result) ! = null) t.result = (((tr = t.result) == null) ? sr : reducer.apply(tr, sr)); s = t.rights = s.nextRight; } } } } }Copy the code
Twelve, summary
Since JDK8, C13Map has abandoned the Segment implementation scheme in JDK7, and refined the lock granularity to each bin, with smaller lock granularity and stronger concurrency. The ReentrantLock mutex is replaced with the syncronized keyword, which can achieve better performance than ReentrantLock due to a lot of syncronized optimization in JDK8.
The mechanism of concurrent transfer is introduced to support multithreaded handling, and write operations and transfer operations can be parallel in different bin. ForwardingNode is introduced to support parallelism of read operations and transfer, and further support traversal of hash chains that may exist during transfer. ReserveNode is introduced to preempt the calculation of compute atoms in cases where it may take a long time to compute and avoid double-counting.
The red-black tree is introduced to optimize the retrieval performance in hash conflict, and the lightweight read/write lock is implemented to ensure the read/write security, and the intelligent switch between linear retrieval and tree retrieval is made, which achieves an excellent balance between performance and security. CounterCell mechanism is introduced to optimize the counting of multi-core scenarios to solve the problem of false memory sharing.
A subclass of ForkJoinTask was introduced to optimize bulk calculation performance. The whole C13Map implementation process uses volatile to guarantee visibility and CAS to guarantee atoms, which is a model implementation of locally-unlocked lockFree dataStructure.
Unlike the single-thread read and write operations of HashMap, the data read by HashMap is always stable between the next write operation and a stable snapshot between multiple write operations, while the data read by C13Map changes rapidly due to the existence of concurrent threads and only reads the correct data at a certain point in time. Writes are safe only at one point in time, so C13Map generally talks about security rather than real-time, which makes programming difficult and is a significant difference between single-threaded and concurrent data structures.