Design goals
ConcurrentHashMap is designed to minimize update contention while maintaining concurrent readability. The secondary goal is to make space consumption consistent with or less than a HashMap, and to ensure multithreaded initial insert rates for empty tables.
The overall structure
ConcurrentHashMap similar in structure to HashMap, usually a bin hash table (bucketed). Maintains an internal array to hold Node nodes. Each position in the array represents a bucket bit corresponding to a specific hash value. ConcurrentHashMap uses the zipper method to resolve hash conflicts when inserted data occurs. It maintains a Node list at the location where the hash conflict occurs to store the data. When there is too much data on the linked list, it is necessary to traverse too many elements to query the data of the current position, which will reduce the query efficiency. Therefore, when the length of the linked list is too large, the structure of the linked list will be transformed into a red-black tree to improve the query efficiency.
Concurrency control
ConcurrentHashMap combines CAS operation with synchronized to ensure thread safety. When the updated data is NULL in the table, the CAS operation is performed using the Unsafe class to save the data. When a Node Node already exists at the updated location, synchronized locks the first Node at the location to ensure thread safety. As shown in the figure above, Node1 Node will be locked first before related operations when Node3 is inserted.
A constructor
/**
* Creates a new, empty map with an initial table size based on
* the given number of elements ({@code initialCapacity}), table
* density ({@code loadFactor}), and number of concurrently
* updating threads ({@code concurrencyLevel}).
*
* @param initialCapacity the initial capacity. The implementation
* performs internal sizing to accommodate this many elements,
* given the specified load factor.
* @param loadFactor the load factor (table density) for
* establishing the initial table size
* @param concurrencyLevel the estimated number of concurrently
* updating threads. The implementation may use this value as
* a sizing hint.
* @throwsIllegalArgumentException if the initial capacity is * negative or the load factor or concurrencyLevel are * nonpositive * /
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
// Check parameters
if(! (loadFactor >0.0 f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
// Set the minimum initialization value based on the concurrency parameter
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
// the capacity is calculated by the loadFactor loadFactor. ConcurrentHashMap differs from HashMap, where the load factor parameter is passed, but is only used to calculate the actual capacity in this method. Instead of using this load factor for capacity expansion, the default load factor 0.75 is used.
long size = (long) (1.0 + (long)initialCapacity / loadFactor);
//tableSizeFor Converts the size calculated in the previous step to the minimum power 2 (n) greater than or equal to size.
// Then save this value to sizeCtl, you can see that there is no actual initialization of the storage space in the constructor, just a calculation of the capacity to be used during initialization.
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
Copy the code
The get method
/**
* Returns the value to which the specified key is mapped,
* or {@code null} if this map contains no mapping for the key.
*
* <p>More formally, if this map contains a mapping from a key
* {@code k} to a value {@code v} such that {@code key.equals(k)},
* then this method returns {@code v}; otherwise it returns
* {@code null}. (There can be at most one such mapping.)
*
* @throws NullPointerException if the specified key is null
*/
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// Computes the hash value of the key
int h = spread(key.hashCode());
// Table is not null and the hash position is not null, and the first Node of the current position is obtained
if((tab = table) ! =null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) ! =null) {
// The hash value of this Node is the same as the hash value of the key. If the keys are the same, this value is returned
if ((eh = e.hash) == h) {
if((ek = e.key) == key || (ek ! =null && key.equals(ek)))
return e.val;
}
// If the hash value of the first Node is negative, it indicates that the Node is being expanded. This Node is a ForwardingNode object, which can be found in nextTable using find.
else if (eh < 0)
return(p = e.find(h, key)) ! =null ? p.val : null;
// Traverse the linked list for this location to find the mapped data
while((e = e.next) ! =null) {
if(e.hash == h && ((ek = e.key) == key || (ek ! =null && key.equals(ek))))
returne.val; }}return null;
}
Copy the code
Put method
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// Computes the hash value of the key
int hash = spread(key.hashCode());
// Initialize binCount to store the number of nodes in the current bucket to determine whether to convert to red-black tree storage
int binCount = 0;
/ / spin
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// If the table is not initialized, call the initTable method to initialize it
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// Hash location. If the current location is null, you can directly save the value to this location by cas operation.
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null.new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// If a Node exists in the current location and the hash value of the Node is -1, expansion is being performed.
else if ((fh = f.hash) == MOVED)
// Call the helpTransfer method to try to help expand together and avoid invalid waits
tab = helpTransfer(tab, f);
// A Node exists in the current location and no capacity expansion operation is performed
else {
V oldVal = null;
// Try to lock the current node
synchronized (f) {
// The location of the node may change after the lock is successfully performed during node deletion and capacity expansion
if (tabAt(tab, i) == f) {
// The current bucket storage structure is a linked list, insert this data into the list
if (fh >= 0) {
binCount = 1;
// Iterate over the list and record the number of nodes in the list
for (Node<K,V> e = f;; ++binCount) {
K ek;
// This key is already mapped
if(e.hash == hash && ((ek = e.key) == key || (ek ! =null && key.equals(ek)))) {
// If the data is added when it does not exist, the data is not updated; If the mapping of this key is not updated to the value passed in.
oldVal = e.val;
if(! onlyIfAbsent) e.val = value;break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break; }}}// Insert this data into the red-black tree
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) ! =null) {
oldVal = p.val;
if(! onlyIfAbsent) p.val = value; }}}}// If the number of nodes is not 0, the update is successful. Instead, it's spinning
if(binCount ! =0) {
// The current data structure is a linked list, and the number of nodes has reached the threshold to become a red-black tree
if (binCount >= TREEIFY_THRESHOLD)
// Convert to red-black tree storage
treeifyBin(tab, i);
if(oldVal ! =null)
return oldVal;
break; }}}// The current capacity is increased by 1. If the capacity expansion threshold is reached, capacity expansion is required
addCount(1L, binCount);
return null;
}
Copy the code
Counting principle and expansion analysis
The ConcurrentHashMap counts in the same way as the LongAdder class does, updating the baseCount field when there is no race. If there is a multi-thread race, maintain an array of counterCells on updating the baseCount operation to avoid a full block. It is used to store multiple values, and when a contention occurs, the thread uses its probe value to determine which value in the counterCells array it is updating, thereby reducing the contention. Calculate the total by adding the values in baseCount and the array counterCells.
/** * Adds to count, and if table is too small and not already * resizing, initiates transfer. If already resizing, helps * perform transfer if work is available. Rechecks occupancy * after a transfer to see if another resize is already Needed * Because resizings are lagging. * Increase quantity to count. If count reaches the capacity expansion threshold and no capacity expansion is started, initialize the capacity expansion operation. If other threads are already expanding, try to help with the expansion. After capacity expansion is complete, check whether the capacity needs to be expanded. *@paramX The count to add Number of entries *@param check if <0, don't check resize, if <= 1 only check if uncontended
*/
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// If the CounterCell array is not empty, or cas fails to update the baseCount field, the number of updates is competing with multiple threads.
if((as = counterCells) ! =null| |! U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
// If the CounterCell array is not initialized, or the current thread's CountCell is null
if (as == null || (m = as.length - 1) < 0 ||
// Get the corresponding CountCell from the thread hash.
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
// The array is initialized and the CountCell corresponding to the thread is not null. Try CAS to update this CountCell. If this fails, execute fullAddCount! (uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended);return;
}
if (check <= 1)
return;
s = sumCount();
}
// If check is greater than 0, check whether expansion is required
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// If the total number reaches the capacity expansion threshold, expand the capacity
while (s >= (long)(sc = sizeCtl) && (tab = table) ! =null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
// Calculate the flag bit of the table with size n
int rs = resizeStamp(n);
// If sizeCtl is smaller than 0, capacity expansion is in progress
if (sc < 0) {
// Determining whether the expansion is over is buggized in Java8. Sc == RS + 1 and SC == RS + MAX_RESIZERS can never be true.
//sizeCtl values change stage:
// Initial expansion: sizeCtl = (rs << RESIZE_STAMP_SHIFT) + 2
SizeCtl = sizeCtl + 1;
SizeCtl = sizeCtl -1;
SizeCtl = (rs << RESIZE_STAMP_SHIFT) + 1, which is the initial capacity expansion value -1
// Five: sizeCtl stores the new capacity expansion threshold, which becomes positive
// When sizeCtl is less than 0, sizeCtl has these stages
SizeCtl =(rs << RESIZE_STAMP_SHIFT) + 2
SizeCtl =(rs << RESIZE_STAMP_SHIFT) + 1 + Number of expansion threads
// So the judgment here should be
SizeCtl = (rs << RESIZE_STAMP_SHIFT) + 1;
SizeCtl = (RS << RESIZE_STAMP_SHIFT) + MAX_RESIZERS = (RS << RESIZE_STAMP_SHIFT) + MAX_RESIZERS
// In both cases, the current thread does not need to participate in the expansion.Look further down if nextTable==nullOr transferIndex < =0It also means the expansion has been completed, directlybreak
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// Help with capacity expansion, CAS update sizeCtl flags current threads to participate in capacity expansion
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
//sizeCtl not smaller than 0 indicates that the current capacity is not expanded
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null); s = sumCount(); }}}// Returns a unique expansion flag corresponding to n capacity
// Convert the information about n to the number of preceding 0's, using the number of preceding 0's to distinguish different n's
//1 << (resize_stamp_bits-1), obtain a number with the 16th digit as 1 and the rest zeros, ensuring that the sign bit is 1 after moving 16 bits to the left, that is, a negative number
// Then perform the or operation, add the information of n, obtain the mark value for expansion calculation
static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
Copy the code
Transfer method and helperTransfer method
Expansion of ConcurrentHashMap Expansion is accomplished by adding a temporary array and transferring all nodes in the original array to the new array. The minimum unit of capacity expansion is a node in the table. Each thread tries to participate in capacity expansion when it finds that the node it operates is being moved. It is controlled by the transferIndex field. Each thread updates the transferIndex before capacity expansion to mark the scope of nodes it processes. When the transferIndex turns to 0, it means that all nodes have threads processing capacity expansion transfer or have finished processing.
/** * Moves and/or copies the nodes in each bin to new table. See * above for explanation. */
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1)? (n >>>3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// If nextTab is null, capacity expansion is not initialized
if (nextTab == null) { // initiating
try {
// Create an array with twice the current size
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])newNode<? ,? >[n <<1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
// The maximum length is exceeded
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
// The number to be moved is n
transferIndex = n;
}
// The new array length
int nextn = nextTab.length;
// Create a transfer node
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// Go through the mark of the scope shared by this thread. If it is false, it means that the current allocated scope is complete, continue to get the task, or complete
boolean advance = true;
// The capacity expansion completed mark is used to check all nodes again after capacity expansion is complete
I can't think of any situations where this check would make sense :)
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// Continue the capacity expansion task for the current scope
while (advance) {
int nextIndex, nextBound;
// The expansion subscript is less than or equal to the boundary, or it is marked complete, and the loop is not continued
if (--i >= bound || finishing)
advance = false;
//(nextIndex = transferIndex) <= 0 Indicates that the expansion is complete. I =-1 is used to determine the end of the expansion
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// Cas updates nextIndex to indicate that the current thread has received the capacity expansion task for the current range and adjusts the current boundary
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false; }}// If I is less than 0, transferIndex<=0 indicates that the capacity expansion task of this thread has been completed and there are no unallocated capacity expansion tasks
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// Mark as done
if (finishing) {
// Table is set to nextTab and nextTable is cleared
nextTable = null;
table = nextTab;
// The expansion threshold changes to 1.5 times the current value
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// If the thread is marked as incomplete, there are still threads that have not completed their assignment, and the attempt is sizecl-1, indicating that the current thread is completed.
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
If it is true, all threads have completed the task. If it is not equal to, there are still threads that have not completed the task. The expansion logic of this thread returns and waits for other threads to complete.
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
Change the finishing tag to true and iterate over all nodes to make sure that all nodes have been expanded.
finishing = advance = true;
// We assign I to n. The above while logic executes -- I every time in the loop, so the value of I is n to 0, and then performs the following node checking logic.
i = n; // recheck before commit}}// The current capacity expansion task of this thread is not completed
The current node is null, the CAS node value is FWD transfer node, and the hash value of the transfer node is MOVED
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
//2. Check whether the node has been transferred (for checking again after all nodes are completed)
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
//3. The node has not been transferred to nextTab
// Transfer nodes have a recalculation of subscripts, and since the length of nextTable is twice that of TAB, hash indices may change after transfer.
// We know that since the length of TAB is always 2 to the n, the subscript is calculated by the following formula
//int index = hash & (length-1); (That is, modulo n of hash.)
// When length is 2*length, int newIndex = hash & (leng-1);
// If newIndex is equal to index, we can get the following conclusion
// We assume that the length binary x bit is 1, and when x equals 6 the binary is 00100000,
// Then the binary representation of 2*length is 01000000. The difference between 2*length and 2* leng-1 is that the x-th bit of 2*length is 1, while the x-th bit of length is 0, so the difference between the hash and the two results is the x-th bit. If the x bit of hash is 0, then newIndex=index; If the x-th bit of hash is 1, then newIndex = index + 2^x; So newIndex = index + length;
// Therefore, during the transfer process, there will be some nodes whose subscripts are larger than the original, the difference is length, that is, in the lower part of the array, we divide the first half into the low part, the second half is called the high part.
synchronized (f) {
// Check whether the value of the current position has changed after the lock is complete. If it has changed, it indicates that another thread has changed the value
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
// From the above analysis, we just need to find the value of the x bit of the hash to determine whether the subscript has changed
// Since n = 2 ^ x, the hash value and n are the x bits. If n = 1, the index is n more than the original index.
int runBit = fh & n;
Node<K,V> lastRun = f;
// Traverses the list of current nodes, recording the last node whose subscript changed
for(Node<K,V> p = f.next; p ! =null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
// reverse traversal the list, where the judgment condition p! = lastRun
// In order to avoid creating unnecessary new Node objects, there are links in the list that can be used directly to migrate to the new table as a whole, which is the purpose of the first forward traversal above.
// We know from the above analysis that the whole list is not reversed, it is possible to reuse the original chain. Comments at the beginning of source code are more likely to have to be treated. As follows:
//On average, only about one-sixth of them need cloning when a table doubles.
for(Node<K,V> p = f; p ! = lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;
// The index does not change, the node is placed in the low
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
// The subscript is changed and the node is placed high
hn = new Node<K,V>(ph, pk, pv, hn);
}
// CAS updates the list of low level nodes
setTabAt(nextTab, i, ln);
// CAS updates the list of high level nodes
setTabAt(nextTab, i + n, hn);
// The location of the original TAB sets the transfer node to mark that this node has been transferred with the value nextTable
setTabAt(tab, i, fwd);
// Proceed to the next node in the current expansion range
advance = true;
}
// Migration of red-black tree nodes
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for(Node<K,V> e = t.first; e ! =null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null.null);
// Handle the low level
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
// Handle high
else {
if ((p.prev = hiTail) == null)
hi = p;
elsehiTail.next = p; hiTail = p; ++hc; }}// Check whether the migrated quantity storage format needs to be changed to a linked list
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
// Set the root node of the tree(hc ! =0)?new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
// Set the root node of the tree(lc ! =0)?new TreeBin<K,V>(hi) : t;
// Update the low value
setTabAt(nextTab, i, ln);
// Update high
setTabAt(nextTab, i + n, hn);
// The location of the original TAB sets the transfer node to mark that this node has been transferred with the value nextTable
setTabAt(tab, i, fwd);
// Proceed to the next node in the current expansion range
advance = true;
}
}
}
}
}
}
Copy the code
/** * Help expand */
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
// Check whether the current expansion is complete
if(tab ! =null && (f instanceofForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) ! =null) {
// Get the expansion tag value related to the length of the table
int rs = resizeStamp(tab.length);
// Check again that expansion is underway
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
// If capacity expansion has been completed, return
if((sc >>> RESIZE_STAMP_SHIFT) ! = rs || sc == rs +1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
//cas updates the number of threads to add itself
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
// Execute transfer to perform expansion operations with other threads
transfer(tab, nextTab);
break; }}return nextTab;
}
return table;
}
Copy the code