ConcurrentHashMap source code analysis, please note the source
If you want to learn more about the inner workings of ConcurrentHashMap, you can check out the HashMap source code I wrote earlier to learn about hash algorithms, array expansion, red-black tree conversion, and so on.
initTable
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0) // At this point, another thread has acquired execution rights and sleeps for a while
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0)? sc : DEFAULT_CAPACITY;@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])newNode<? ,? >[n];// Initialize the array
table = tab = nt;
sc = n - (n >>> 2); }}finally {
sizeCtl = sc;
}
break; }}return tab;
}
Copy the code
SizeCtl: a volatile shared variable that competes to initialize or expand array execution through swap comparisons. When the thread finds that sizeCtl is smaller than 0, it cedes execution. When the thread successfully competes with -1, it acquires execution, so it can initialize the array. When positive, the next element count to resize the Map is saved
Addressing the Unsafe exchange comparison method
* @param o Specifies the object whose property needs to be modified. * @param l Specifies the pointer address of the Field object. @return */ public final Native Boolean @param i1 public final Native Boolean CompareAndSwapInt (java.lang.Object O, long L, int I, int i1);Copy the code
Put method parsing
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
//ConcurrentHashMap does not allow the key value to be null, because the existence of the key cannot be determined by getting the key in concurrent cases
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh; K fk; V fv;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // Use hash to get the index of the array. If the index is null, use swap comparison to set it
if (casTabAt(tab, i, null.new Node<K,V>(hash, key, value)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED) // This indicates that the array is expanding and the linked list and black mangrove need to be migrated
tab = helpTransfer(tab, f);
else if (onlyIfAbsent // check first node without acquiring lock&& fh == hash && ((fk = f.key) == key || (fk ! =null&& key.equals(fk))) && (fv = f.val) ! =null)
return fv;
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) { // The head node does not change, indicating that the process of acquiring the lock, no thread expansion
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if(e.hash == hash && ((ek = e.key) == key || (ek ! =null && key.equals(ek)))) {
oldVal = e.val;
if(! onlyIfAbsent) e.val = value;break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value);
break; }}}else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) ! =null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update"); }}if(binCount ! =0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if(oldVal ! =null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
Copy the code
Put business logic:
-
Check whether the table array is empty and unlocked, and call initTable to initialize it
-
Compute the key hash value, hash the index in the array, if the index happens to be empty, set it by swap comparison. When multiple threads set the subscript through the array, when the setting fails, it will not meet the null subscript situation and enter to obtain the head node lock
-
At this point, the node has a value and the hash value is -1, indicating that the array is being expanded. Multithreaded helpTransfer method is called to copy the array
-
If only the linked list or red-black root node is locked, check whether the root node is equal to the object obtaining the lock first, because the lock object may have been offset due to array expansion. If the offset is already carried out and the root node is inserted, hash calculation errors will occur and get cannot obtain the value. This is very important for the security check.
-
Fh >= 0 because ConcurrentHashMap Node hash has special meaning
- int MOVED = -1; Expanding the array and preparing for migration
- Int TREEBIN = -2 Red and black root node
- int RESERVED = -3; Temporary reserved node
-
If the value of binCount is greater than 0, we know that this is a linked list. If we iterate from the beginning to the end, we know the length of the linked list. Each time the nodes are traversed, the key is equal, overwriting the old value, or being inserted at the end of the list.
-
The logical insertion of a red-black tree is similar to that of a linked list. PutTreeVal is used to traverse and insert a node. The node object is returned only if the same key node is found.
-
BinCount is the list length. If the length is greater than the list length threshold (default 8), it is converted to a red-black tree. If oldVal is not empty, the node will be overridden. You do not need to perform accumulation, but simply return the node.
-
AddCount simply adds to the map size, because in concurrent cases it is not possible to add one to the size method, which violates the weight and fineness of the lock. The actual situation is more copy, array expansion logic is also implemented in this method, the following specific analysis.
The size () method
Before we look at map size, let’s see how the size method gets the length, which will help explain addCount.
public int size(a) {
long n = sumCount();
return ((n < 0L)?0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
final long sumCount(a) {
CounterCell[] cs = counterCells;
long sum = baseCount;
if(cs ! =null) {
for (CounterCell c : cs)
if(c ! =null)
sum += c.value;
}
return sum;
}
Copy the code
The size length is accumulated mainly through the baseCount + CounterCell array. BaseCount only uses baseCount when a thread is using a map. When multiple threads operate on a map at the same time, baseCount is discarded and the number of operations per thread is placed in the CounterCell array. CounterCell is just a calculation box. It uses some algorithms to put the number of operations of different threads into the specified index position. It can isolate threads from competing for the same number of changes, similar to the hash method of putting subscripts into arrays, which can reduce the number of conflicts.
AddCount analysis
private final void addCount(long x, int check) {
CounterCell[] cs; long b, s;
if((cs = counterCells) ! =null| |! U.compareAndSetLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell c; long v; int m;
boolean uncontended = true;
// At this point, cs is not initialized, or cs has just been initialized, the length is still 0, and the subscript obtained from thread random number is just empty
if (cs == null || (m = cs.length - 1) < 0 ||
(c = cs[ThreadLocalRandom.getProbe() & m]) == null| |! (uncontended = U.compareAndSetLong(c, CELLVALUE, v = c.value, v + x))) { fullAddCount(x, uncontended);// Add the cumulative value to the array
return;
}
if (check <= 1)
return;
s = sumCount();
}
if (check >= 0) { // 0 does not judge array expansion
Node<K,V>[] tab, nt; int n, sc;
// The following logic is performed only when size is equal to or greater than the size threshold
while (s >= (long)(sc = sizeCtl) && (tab = table) ! =null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n) << RESIZE_STAMP_SHIFT;
if (sc < 0) { // The current TAB array is used for expansion
if (sc == rs + MAX_RESIZERS || sc == rs + 1 ||
(nt = nextTable) == null || transferIndex <= 0)
break;
if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSetInt(this, SIZECTL, sc, rs + 2))
transfer(tab, null); s = sumCount(); }}}Copy the code
So let’s start with if, if countCells is not empty, let’s ignore the second condition, go to countCells and set the summation. If the first condition is not true, baseCount accumulation is performed. If the success condition is not true, the if logic is not entered. FullAddCount is needed to initialize countCells, add an x value to an array, or add multiple subscripts to an array. As mentioned above, if the size ectL is a positive integer, it is the array capacity expansion threshold. If the map size reaches or exceeds the threshold, the loop will be entered for array expansion. When sizeCtl > 0, multiple threads are expanding the array and competing for execution rights. How does the specific analysis of expansion conditions set up
resizeStamp(n) << RESIZE_STAMP_SHIFT;
The method is to move the low zero complement of array N to the left by 16, which is equivalent to saving the length of the array to the high 16 bits. SizeCtl +1 for each thread participating in a concurrent process, 16 bits lower, is used to store the number of participating arrays.
-
sc > 0
- There are no threads to expand the TAB array yet
-
sz < 0
- There are already threads sizeCtl+1 and transfer() is called to allow the current thread to participate in the expansion.
Let’s analyze the judgment in while
- Sc == RS + MAX_RESIZERS are already at their maximum number of threads and there is no point in adding new threads
- Sc == RS + 1 Transfer will be called as long as RS + 1 is successful, but now rs value has been modified by other threads, the accumulation has failed, there is no need to perform a exchange comparison.
- (nt = nextTable) == NULL The capacity expansion has been completed and new threads do not need to enter the capacity expansion method
- TransferIndex <= 0 Indicates the number of unallocated migrations in the transferIndex TAB array. In this case, 0 indicates that the number of expanded threads has reached the maximum and no new threads need to be used
Fine speak fullAddCount method ThreadLocalRandom. GetProbe () can be as simple as each thread hash value, but the value is not fixed, in can’t find the slot array is, can be recalculated.
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
if ((h = ThreadLocalRandom.getProbe()) == 0) { / / initialization
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
boolean collide = false;
for (;;) {
CounterCell[] cs; CounterCell c; int n; long v;
if((cs = counterCells) ! =null && (n = cs.length) > 0) {
if ((c = cs[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
CounterCell r = new CounterCell(x); // Optimistic create
if (cellsBusy == 0 &&
U.compareAndSetInt(this, CELLSBUSY, 0.1)) { // If the contest succeeds, you can modify the array
boolean created = false;
try { // Recheck under lock
CounterCell[] rs; int m, j;
if((rs = counterCells) ! =null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true; }}finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if(! wasUncontended)// The CAS has failed and continues to spin to wait for expansion
wasUncontended = true; // Continue after rehash
else if (U.compareAndSetLong(c, CELLVALUE, v = c.value, v + x)) // Add successfully, exit spin
break;
// When the array length exceeds the number of cpus, it should not expand, but continue to spin until the sum is successful
else if(counterCells ! = cs || n >= NCPU) collide =false; // At max size or stale
else if(! collide)// If the race continues to fail, the spin will continue without expansion
collide = true;
else if (cellsBusy == 0 &&
// The first competition fails
U.compareAndSetInt(this, CELLSBUSY, 0.1)) {
try {
if (counterCells == cs) // Expand table unless stale
counterCells = Arrays.copyOf(cs, n << 1);
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
// Re-generate probe because CAS insertion failed. Try replacing the other slots
h = ThreadLocalRandom.advanceProbe(h);
}
else if (cellsBusy == 0 && counterCells == cs &&
U.compareAndSetInt(this, CELLSBUSY, 0.1)) {// Initialize the array
boolean init = false;
try { // Initialize table
if (counterCells == cs) {
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true; }}finally {
cellsBusy = 0;
}
if (init)
break;
}
// The array is not initialized at this time. If there are multiple threads competing, baseCount can be used to accumulate the array
else if (U.compareAndSetLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base}}Copy the code
The cellsBusy property is very similar to the one we talked about with sizeCtl, which controls the ability to modify arrays. When cellsBusy =0, the array can be inserted and expanded. The thread only needs to set cellsBusy to 1 to obtain the modification permission, and then change cellsBusy to 0. If you have seen LongAdder source code, do you think it is very familiar? I can’t say very familiar, but it is exactly the same, even the variable name is copied over. In fact, the reason WHY I am willing to talk about this detail is related to the design idea of LongAdder. The value value is separated into an array, and when multithreading accesses, it is counted by a number mapped to it through the hash algorithm
transfer
How does map expand in multiple threads
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// Calculate the maximum number of arrays to be migrated by a thread
if ((stride = (NCPU > 1)? (n >>>3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])newNode<? ,? >[n <<1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true; // Array subscript move flag
boolean finishing = false; // A thread is responsible for whether all elements in the array have been migrated
// I indicates the number of bytes left in our last bound array
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
// This is just for i-1, or all the array elements have been migrated
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {// The array is completely annotated
i = -1;
advance = false;
}
// The thread is assigned the number of migrations, and if the allocation is successful, the wile loop will be executed below
else if (U.compareAndSetInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false; }}// If any of the three conditions is true, the old data has been fully migrated
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSetInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// If sc +2 is added to addCount, sc +2 will be added. If sc +2 is added to addCount, sc +2 will be added. If sc +2 is added to addCount, sc +2 will be added
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit}}// Set advance to true for each element and enter while again
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// Indicates that the node is already being processed by another thread
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else { // To migrate this element, lock it directly
synchronized (f) {
if (tabAt(tab, i) == f) { // Double check
Node<K,V> ln, hn; // Split the list into two ln to represent the low hn high
if (fh >= 0) { As mentioned above, any hash greater than 0 is a linked list
int runBit = fh & n;
Node<K,V> lastRun = f;
for(Node<K,V> p = f.next; p ! =null; p = p.next) { // Find the last element first
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
// Split the list into two
for(Node<K,V> p = f; p ! = lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
// Marks the old array. This subscript is no longer usable
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) { // The red black tree is almost the same as the one above
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
// Traverse the red-black tree using the linked list subscript to find the last one
for(Node<K,V> e = t.first; e ! =null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null.null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
elsehiTail.next = p; hiTail = p; ++hc; }}// The newly generated red-black tree linked list does not meet the threshold and is converted to a linked listln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc ! =0)?newTreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc ! =0)?new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update");
}
}
}
}
}
Copy the code
ForwardingNode This is a special node that will be labeled as this object when the array is migrated. Prevents insertion into an old array during migration. Encapsulate a find method that looks for the key value in the new array, as used in the get method below. How to support concurrency, assign tasks to individual threads. Each thread entry is assigned a task in the while of the double-loop. The nextIndex is still the length of the array. When the race starts to modify the transferIndex, set the thread that succeeded. < span style = “max-width: 100%; clear: both; clear: both; I’m going to break out of the while loop, otherwise every time I go into the while, I’m going to put the subscript -1. The thread that loses the race reenters the WHLE loop and continues to fetch the allocated amount from transferIndex. When the transferIndex is not empty, it indicates that the array still has tasks assigned and continues to compete for them.
If (I < 0 | | I > = n | | I + n > = nextn) {when I < 0 was founded according to the current thread has finished the copy task, I > = n | | I + n > = nextn are associated with the following I = n, for the current array bounds checking. SizeCtl 16 bits lower represents the amount of current thread capacity expansion, allowing a thread to complete a task in sizeCtl -1. SizeCtl +2 calls Transfer during capacity expansion. If sizeCtl +2 calls Transfer during capacity expansion, there are still other threads in the copy process. If capacity expansion is not completed, the threads that have completed capacity expansion will exit automatically. Let the last thread finish up, turning nextTab into a TAB.
The get method
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if((tab = table) ! =null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) ! =null) {
if ((eh = e.hash) == h) { // The array element is the current key
if((ek = e.key) == key || (ek ! =null && key.equals(ek)))
return e.val;
}
else if (eh < 0) // Here is a red-black tree or ForwardingNode, using the internal encapsulation method to query
return(p = e.find(h, key)) ! =null ? p.val : null;
while((e = e.next) ! =null) {
if(e.hash == h && ((ek = e.key) == key || (ek ! =null && key.equals(ek))))
returne.val; }}return null;
}
Copy the code
The GET method does not use locks, supports maximum concurrent reads, and presents no security issues. When eh < 0 and it is a red-black tree, the red-black tree traversal method is called to operate the node. If a ForwardingNode, even though the node’s data has been migrated to the new array, encapsulates find to look in the new array. The corresponding key value can be found regardless of which node.
Delete methods
public V remove(Object key) {
return replaceNode(key, null.null);
}
final V replaceNode(Object key, V value, Object cv) {
int hash = spread(key.hashCode()); // Computes the hash value
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0 ||
(f = tabAt(tab, i = (n - 1) & hash)) == null) // Unable to find the corresponding key node, exit the loop
break;
else if ((fh = f.hash) == MOVED) // See put above
tab = helpTransfer(tab, f);
else {
V oldVal = null;
boolean validated = false;
synchronized (f) { // Lock the array subscript node
if (tabAt(tab, i) == f) {
if (fh >= 0) { // This is a list structure
validated = true;
for (Node<K,V> e = f, pred = null;;) {
K ek;
if(e.hash == hash && ((ek = e.key) == key || (ek ! =null && key.equals(ek)))) { // The corresponding element has been found in the linked list
V ev = e.val;
if (cv == null|| cv == ev || (ev ! =null && cv.equals(ev))) {
oldVal = ev;
if(value ! =null) // Replace the old value
e.val = value;
// If the value of the previous node is null, delete the current node and change the guidance
else if(pred ! =null)
pred.next = e.next;
else
// If the front node is empty, the first node is to be deleted
setTabAt(tab, i, e.next); //
}
break;
}
pred = e;
if ((e = e.next) == null) // Exit the loop by iterating over the last one
break; }}else if (f instanceof TreeBin) {
validated = true;
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> r, p;
if((r = t.root) ! =null &&
(p = r.findTreeNode(hash, key, null)) != null) { // Use the red-black tree method to find
V pv = p.val;
if (cv == null|| cv == pv || (pv ! =null && cv.equals(pv))) {
oldVal = pv;
if(value ! =null)
p.val = value;
else if (t.removeTreeNode(p)) // Remove the node from the black mangrovesetTabAt(tab, i, untreeify(t.first)); }}}else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update"); }}if (validated) {
if(oldVal ! =null) { // If the old value does not exist, the node data will be deleted
if (value == null)
addCount(-1L, -1); //size -1
return oldVal;
}
break; }}}return null;
}
Copy the code
Data reference
Xilidou.com/2018/11/27/…
Cloud.tencent.com/developer/a…
Blog.csdn.net/zzu_seu/art…