The ConcurrentHashMap source has been widely parsed on the web. This article mainly focuses on method transfer, trying to carefully analyze the implementation, if there are any mistakes, please correct.
Transfer of ConcurrenthashMap is mainly used for capacity expansion and reorganization. When the capacity of the internal array exceeds the threshold, capacity expansion and reorganization will be triggered. Transfer is the main implementation of this process.
1. Related concepts
2.
1. Related concepts
1.1 ConcurrentHashMap uses one field to reuse multiple functions, such as threshold control, internal Node[] array state control, and expansion thread control. The field is sizeCtl.
/** * <pre> * Array initialization and reorganization of the controller * is negative: indicates that the table is being initialized or reorganized (resize) * -1: initialization * -(1+n) : number of reorganized threads n, that is, the first thread to be modified should be: -2, because -1 defaults to initialization * When the table is null, use the size specified at initialization, or default to 0 * after initialization is complete, * </pre> */ private TRANSIENT volatile int sizeCtl;Copy the code
1.2 ConcurrentHashMap Is reorganized like HashMap, but an internal array variable, nextTable, is used to ensure concurrency control. Other such as: linked list reorganization, tree structure of the reorganization process are similar.
1.3 The reorganization of ConcurrentHashMap takes a similar approach to the idea of a segmented table, essentially dividing the array into different segmented ranges that can be accessed as auxiliary conversions if threads enter
1.4 transferIndex is an internal attribute of ConcurrentHashMap. It is mainly used in the reorganization stage to represent the array that has not been converted. The range is as follows: table[0] ~ table[transferIndex-1]
1.5 ConcurrentHashMap Concurrent conversion process, with the help of the concept of semaphore, only the thread that obtains the signal can enter the auxiliary conversion, and the semaphore is stored in the sizeCtl. SizeCtl + 1 (sizeCtl + 2 if the first thread opens the conversion). The main thing to note is that the initial sizeCtl value is negative. Adding threads will increase sizeCtl until the sizeCtl size reaches 0 and the semaphore runs out. The default value that adds to the semaphore equals 0 is: 65534, that is, a maximum of 65534 threads are allowed to participate in the auxiliary conversion (non-fixed, adjustable). Therefore, the boundary control of RS + 1 ~ RS + 65534 can be used to determine whether the thread joins the auxiliary conversion. The main code to make sizeCtl negative and semaphore is:
resizeStamp(n) << RESIZE_STAMP_SHIFT
Copy the code
1.6 In the conversion process of ConcurrentHash, two auxiliary attributes, nextTable and transferIndex, are shared by threads. Therefore, the “spin/dead-loop + CAS” method is used to change them to ensure thread concurrency security.
2.
2.1 Transfer Process Each call entry of Transfer actually has the external operation of “spin + CAS” for sizeCtl. That is, in the case of concurrency, even if multiple threads want to expand capacity, only one thread can succeed, and the other threads will enter the process of auxiliary expansion. The judgment before entering the expansion method is as follows:
private final void addCount(long x, int check) {
// omit some code...
// nt -> nextTable
// num, sc -> sizeCtl
Node<K,V>[] tab, nt; int n, sc;
// The current storage capacity is greater than 75% and the total storage capacity is smaller than the maximum capacity
while (s >= (long)(sc = sizeCtl) && (tab = table) ! =null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
// resizeStamp is purely a shift to ensure that the right 16 bits are 0, which can be used to control the maximum number of threads
// The left 16 bits actually do not retain much information (because it is obvious: resizeStamp(4), resizeStamp(5), resizeStamp(6), (7) are the same result)
int rs = resizeStamp(n) << RESIZE_STAMP_SHIFT;
if (sc < 0) {
// Limit the maximum or minimum number of threads, when the maximum 65534 (default) or 1, then jump directly
// rs + 1 --> minimum number of threads (equivalent to incorrect case, or initialization, because the start is at least RS + 2)
// Rs + MAX_RESIZERS --
// If nexTable is null or transferIndex <= 0, the transfer will not be assisted.
// The first two conditions limit the number of threads, and the last two conditions indicate that the capacity expansion has been completed
if (sc == rs + MAX_RESIZERS || sc == rs + 1 ||
(nt = nextTable) == null || transferIndex <= 0)
break;
if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// if sc >= 0, it is just starting, because sc < 0 indicates how many threads are transferring: sc-1
// So rs + 2
else if (U.compareAndSetInt(this, SIZECTL, sc, rs + 2))
transfer(tab, null);
s = sumCount();
}
Copy the code
A lot of judgment comparisons occur here, which can be confusing, but the main thing to remember is: ** Most of these judgment comparisons in ConcurrentHashMap are boundary judgments. ** Keeping this in mind will help you understand that most of the comparisons, such as sc == RS + MAX_RESIZERS and SC == RS + 1, are actually upper and lower bounds on the number of threads, beyond which no auxiliary transformation can be entered.
2.2 ConcurrentHashMap ConcurrentHashMap is a segmented array of concurrent conversions, divided by “amplitude”, and then the corresponding thread is responsible for the completion of the conversion of the grouping. So where is the exit from restructuring transformation? Only when all threads have finished executing and the semaphore of the thread handling the transformation has not been retrieved does the entire transformation exit. The default minimum is 16, which means the minimum number of elements processed by a thread is 16.
Private final void transfer(Node<K,V>[] TAB, Node<K,V>[] nextTab) {// stride width int n = tab.length, stride; // If ((stride = (NCPU > 1)) stride ==> n / 8 / NCPU (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // that state is not the same as that of the other state. // That state is not the same as that of the other state. Is it because the new operation might be abnormal? @suppressWarnings ("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<? ,? >[n << 1]; nextTab = nt; } catch (Throwable ex) {// try to cope with OOME // return; } // Since all outer calls to this method use CAS, this assignment is guaranteed to be correct (in multithreaded cases) nextTable = nextTab; // Table size transferIndex = n; } / /... }Copy the code
2.3 After entering the conversion method, the first step is to determine the thread processing scope, initialize nextTable (if necessary), and initialize auxiliary attributes such as transferIndex = n = table.length.
2.4 What follows is an endless loop (illusion). Dead loop embedded dead loop. The first loop uses the local parameters I and bound. In effect, each thread gets its local values as it enters the method, and their changes start assignments in the inner loop. Once assignments are successful, the first loop becomes a bounded for loop
2.5 Look first at the second inner loop, which is controlled by the advance variable. The advance variable basically says whether to advance to the next element. If the relation between I and bound does not match, advance must be false, and the thread is no longer allowed to advance with (– I). That is, once a thread enters, there are three variables that control its execution. Bound, I, is the boundary of the array that the thread handles, and advance controls the thread’s movement within that boundary
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { // ... int nextn = nextTab.length; ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); // Whether to proceed to the next element, false means to proceed to the current element. boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) {// f -> findNode; fh -> findNode hash Node<K,V> f; int fh; // An infinite loop is mainly used to divide the thread processing interval! While (advance) {int nextIndex, nextBound; // If -- I > bound indicates that the interval is beyond the scope of the thread's processing range, then the thread's processing range is set to -- I > bound. If -- I > bound indicates that the interval is beyond the scope of the thread's processing range, the thread's processing range is set to -- I > bound. // The thread will no longer be able to advance within this range, with the flag bit set to false. The next element into the interval processing the if (- > = I bound | | finishing) advance = false; Else if ((nextIndex = transferIndex) <= 0) {// Assign nextIndex < 0; advance = false; } else if (U.compareAndSetInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {// CAS replace value, update transferIndex to transferIndex -stride // Control the processing interval of this thread: Bound ~ (nextindex-1) // If NCPU = 2, bound ~ (nextindex-1) // If NCPU = 2, bound ~ (nextindex-1) Transfer = nextBound; transfer = transfer; transfer = transfer; transfer = transfer; i = nextIndex - 1; advance = false; }} / /... }Copy the code
It can be seen that the main function of the internal loop is to partition (stride). It can also be realized that even if it is a single thread, its execution is executed according to the partition, and the partition order of execution is from the tail to the head. The CAS ensures thread-safe partitioning of the partition. If the partition fails, the loop is restarted.
2.6 After partitioning, all that remains is the processing of the thread. The process consists of two parts, one is normal element processing, the other is boundary control — exit exit.
For each element, the thread determines whether an exit has been reached. But exit can be made either by a normal secondary thread, which only cleans its own ass, or by a whole thread, which handles its own exit and re-assigns the reconstituted result, nextTable, to table. SizeCtl is assigned a threshold 0.75 times the size of the new array
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
// ...
/ / if I < 0 | | I > = n | | I + n > = nextn, belong to the boundary of the interval judgment
// If the thread exceeds the boundary, it can determine whether all threads have finished executing.
// The other threads did not trigger this judgment because the stride < I < 2stride
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// Expand the export
// Only when finishing is true does nextTable really assign to the old table pointer
// Finishing is true only if all threads are finished
if (finishing) {
nextTable = null;
table = nextTab;
// Multiply minus 0.25, which is 0.75 times the threshold of the new array size
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSetInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
Finishing can only be true if all threads are finished
finishing = advance = true;
i = n; // recheck before commit}}/ /...
}
Copy the code
Said the border exit, the rest is the ordinary operation, have the following judgment:
If the old array is null during thread conversion, CAS will be replaced with ForwardingNode (whose hash = MOVED), indicating that the switch has been MOVED. In this case, if the external put operation hits this position, the auxiliary conversion process will be started, based on if (hash == MOVED). That is, during the reorganization transformation, the put operation will enter the auxiliary transformation process.
If hash is MOVED, that position has been MOVED by another thread, advancing to the next element
Finally, we enter the same logic for list and tree reorganization as HashMap, and after successful execution, advance = true, we continue processing elements (– I). This goes one step further than HashMap by marking the location of the old array as processed.
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
// ...
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);// If the position of the old table is null, it is marked as processed
else if ((fh = f.hash) == MOVED) // The thread is only responsible for its own region.
advance = true; // already processed
else {
// Enter the conversion
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// The hash node of an ordinary list is a normal hash code, while the hash of a tree node is less than 0 by default
// The rehash algorithm is the same as HashMap, which is 2 to the NTH power corresponding to the binary is exactly 1,
// Move the high element directly
if (fh >= 0) {}
else if (f instanceofTreeBin) {setTabAt(nextTab, I, ln); setTabAt(nextTab, i + n, hn);// When the processing is complete, mark the nodes of the old array as processed (old data will have no data)SetTabAt (TAB, I FWD); }else if (f instanceof ReservationNode){}
}
Copy the code
At this point, the whole ConcurrentHashMap conversion process is finished, the whole parsing feeling is valid, if any errors, must be corrected.
At the end of the article, I will summarize some of the points that are easy to ignore or difficult to understand:
- Most of the judgments that seem complicated and confusing are really boundary judgments
- The whole approach uses an infinite loop +CAS to control concurrency
- Dividing the range of arrays handled by the thread by magnitude is done using an “infinite loop + CAS”
- Implementation if there is an exception, resulting in multithreading, a thread does not execute the exit logic, no successful deduction
ConcurrentHashMap
的sizeCtl
The number of threads that enter an error state and cannot exit the conversion process (unverified) - Controlling the number of threads is actually multiplexing
sizeCtl
So this variable, you keep some information and then you move it to the left and you leave the right16
Bit to increase the number of threads
The flow chart is as follows: