Design goals

ConcurrentHashMap is designed to minimize update contention while maintaining concurrent readability. The secondary goal is to make space consumption consistent with or less than a HashMap, and to ensure multithreaded initial insert rates for empty tables.

The overall structure

ConcurrentHashMap similar in structure to HashMap, usually a bin hash table (bucketed). Maintains an internal array to hold Node nodes. Each position in the array represents a bucket bit corresponding to a specific hash value. ConcurrentHashMap uses the zipper method to resolve hash conflicts when inserted data occurs. It maintains a Node list at the location where the hash conflict occurs to store the data. When there is too much data on the linked list, it is necessary to traverse too many elements to query the data of the current position, which will reduce the query efficiency. Therefore, when the length of the linked list is too large, the structure of the linked list will be transformed into a red-black tree to improve the query efficiency.

Concurrency control

ConcurrentHashMap combines CAS operation with synchronized to ensure thread safety. When the updated data is NULL in the table, the CAS operation is performed using the Unsafe class to save the data. When a Node Node already exists at the updated location, synchronized locks the first Node at the location to ensure thread safety. As shown in the figure above, Node1 Node will be locked first before related operations when Node3 is inserted.

A constructor

/**
     * Creates a new, empty map with an initial table size based on
     * the given number of elements ({@code initialCapacity}), table
     * density ({@code loadFactor}), and number of concurrently
     * updating threads ({@code concurrencyLevel}).
     *
     * @param initialCapacity the initial capacity. The implementation
     * performs internal sizing to accommodate this many elements,
     * given the specified load factor.
     * @param loadFactor the load factor (table density) for
     * establishing the initial table size
     * @param concurrencyLevel the estimated number of concurrently
     * updating threads. The implementation may use this value as
     * a sizing hint.
     * @throwsIllegalArgumentException if the initial capacity is * negative or the load factor or concurrencyLevel are * nonpositive * /
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
        // Check parameters
        if(! (loadFactor >0.0 f) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        // Set the minimum initialization value based on the concurrency parameter
        if (initialCapacity < concurrencyLevel)   // Use at least as many bins
            initialCapacity = concurrencyLevel;   // as estimated threads
        // the capacity is calculated by the loadFactor loadFactor. ConcurrentHashMap differs from HashMap, where the load factor parameter is passed, but is only used to calculate the actual capacity in this method. Instead of using this load factor for capacity expansion, the default load factor 0.75 is used.
        long size = (long) (1.0 + (long)initialCapacity / loadFactor);
        //tableSizeFor Converts the size calculated in the previous step to the minimum power 2 (n) greater than or equal to size.
        // Then save this value to sizeCtl, you can see that there is no actual initialization of the storage space in the constructor, just a calculation of the capacity to be used during initialization.
        int cap = (size >= (long)MAXIMUM_CAPACITY) ?
            MAXIMUM_CAPACITY : tableSizeFor((int)size);
        this.sizeCtl = cap;
}
Copy the code

The get method

/**
     * Returns the value to which the specified key is mapped,
     * or {@code null} if this map contains no mapping for the key.
     *
     * <p>More formally, if this map contains a mapping from a key
     * {@code k} to a value {@code v} such that {@code key.equals(k)},
     * then this method returns {@code v}; otherwise it returns
     * {@code null}.  (There can be at most one such mapping.)
     *
     * @throws NullPointerException if the specified key is null
     */
    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        // Computes the hash value of the key
        int h = spread(key.hashCode());
        // Table is not null and the hash position is not null, and the first Node of the current position is obtained
        if((tab = table) ! =null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) ! =null) {
            // The hash value of this Node is the same as the hash value of the key. If the keys are the same, this value is returned
            if ((eh = e.hash) == h) {
                if((ek = e.key) == key || (ek ! =null && key.equals(ek)))
                    return e.val;
            }
            // If the hash value of the first Node is negative, it indicates that the Node is being expanded. This Node is a ForwardingNode object, which can be found in nextTable using find.
            else if (eh < 0)
                return(p = e.find(h, key)) ! =null ? p.val : null;
            // Traverse the linked list for this location to find the mapped data
            while((e = e.next) ! =null) {
                if(e.hash == h && ((ek = e.key) == key || (ek ! =null && key.equals(ek))))
                    returne.val; }}return null;
    }
Copy the code

Put method

/** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        // Computes the hash value of the key
        int hash = spread(key.hashCode());
        // Initialize binCount to store the number of nodes in the current bucket to determine whether to convert to red-black tree storage
        int binCount = 0;
        / / spin
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            // If the table is not initialized, call the initTable method to initialize it
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            // Hash location. If the current location is null, you can directly save the value to this location by cas operation.
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null.new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            // If a Node exists in the current location and the hash value of the Node is -1, expansion is being performed.
            else if ((fh = f.hash) == MOVED)
                // Call the helpTransfer method to try to help expand together and avoid invalid waits
                tab = helpTransfer(tab, f);
            // A Node exists in the current location and no capacity expansion operation is performed
            else {
                V oldVal = null;
                // Try to lock the current node
                synchronized (f) {
                    // The location of the node may change after the lock is successfully performed during node deletion and capacity expansion
                    if (tabAt(tab, i) == f) {
                        // The current bucket storage structure is a linked list, insert this data into the list
                        if (fh >= 0) {
                            binCount = 1;
                            // Iterate over the list and record the number of nodes in the list
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                // This key is already mapped
                                if(e.hash == hash && ((ek = e.key) == key || (ek ! =null && key.equals(ek)))) {
                                     // If the data is added when it does not exist, the data is not updated; If the mapping of this key is not updated to the value passed in.
                                    oldVal = e.val;
                                    if(! onlyIfAbsent) e.val = value;break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break; }}}// Insert this data into the red-black tree
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) ! =null) {
                                oldVal = p.val;
                                if(! onlyIfAbsent) p.val = value; }}}}// If the number of nodes is not 0, the update is successful. Instead, it's spinning
                if(binCount ! =0) {
                    // The current data structure is a linked list, and the number of nodes has reached the threshold to become a red-black tree
                    if (binCount >= TREEIFY_THRESHOLD)
                        // Convert to red-black tree storage
                        treeifyBin(tab, i);
                    if(oldVal ! =null)
                        return oldVal;
                    break; }}}// The current capacity is increased by 1. If the capacity expansion threshold is reached, capacity expansion is required
        addCount(1L, binCount);
        return null;
    }
Copy the code

Counting principle and expansion analysis

The ConcurrentHashMap counts in the same way as the LongAdder class does, updating the baseCount field when there is no race. If there is a multi-thread race, maintain an array of counterCells on updating the baseCount operation to avoid a full block. It is used to store multiple values, and when a contention occurs, the thread uses its probe value to determine which value in the counterCells array it is updating, thereby reducing the contention. Calculate the total by adding the values in baseCount and the array counterCells.

/** * Adds to count, and if table is too small and not already * resizing, initiates transfer. If already resizing, helps * perform transfer if work is available. Rechecks occupancy * after a transfer to see if another resize is already Needed * Because resizings are lagging. * Increase quantity to count. If count reaches the capacity expansion threshold and no capacity expansion is started, initialize the capacity expansion operation. If other threads are already expanding, try to help with the expansion. After capacity expansion is complete, check whether the capacity needs to be expanded. *@paramX The count to add Number of entries *@param check if <0, don't check resize, if <= 1 only check if uncontended
     */
    private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        // If the CounterCell array is not empty, or cas fails to update the baseCount field, the number of updates is competing with multiple threads.
        if((as = counterCells) ! =null| |! U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            // If the CounterCell array is not initialized, or the current thread's CountCell is null
            if (as == null || (m = as.length - 1) < 0 ||
                // Get the corresponding CountCell from the thread hash.
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                // The array is initialized and the CountCell corresponding to the thread is not null. Try CAS to update this CountCell. If this fails, execute fullAddCount! (uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended);return;
            }
            if (check <= 1)
                return;
            s = sumCount();
        }
        // If check is greater than 0, check whether expansion is required
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            // If the total number reaches the capacity expansion threshold, expand the capacity
            while (s >= (long)(sc = sizeCtl) && (tab = table) ! =null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                // Calculate the flag bit of the table with size n
                int rs = resizeStamp(n);
                // If sizeCtl is smaller than 0, capacity expansion is in progress
                if (sc < 0) {
                    // Determining whether the expansion is over is buggized in Java8. Sc == RS + 1 and SC == RS + MAX_RESIZERS can never be true.
                    //sizeCtl values change stage:
                    // Initial expansion: sizeCtl = (rs << RESIZE_STAMP_SHIFT) + 2
                    SizeCtl = sizeCtl + 1;
                    SizeCtl = sizeCtl -1;
                    SizeCtl = (rs << RESIZE_STAMP_SHIFT) + 1, which is the initial capacity expansion value -1
                    // Five: sizeCtl stores the new capacity expansion threshold, which becomes positive
                    // When sizeCtl is less than 0, sizeCtl has these stages
                    SizeCtl =(rs << RESIZE_STAMP_SHIFT) + 2
                    SizeCtl =(rs << RESIZE_STAMP_SHIFT) + 1 + Number of expansion threads
                    // So the judgment here should be
                    SizeCtl = (rs << RESIZE_STAMP_SHIFT) + 1;
                    SizeCtl = (RS << RESIZE_STAMP_SHIFT) + MAX_RESIZERS = (RS << RESIZE_STAMP_SHIFT) + MAX_RESIZERS
                    // In both cases, the current thread does not need to participate in the expansion.Look further down if nextTable==nullOr transferIndex < =0It also means the expansion has been completed, directlybreak
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    // Help with capacity expansion, CAS update sizeCtl flags current threads to participate in capacity expansion
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                //sizeCtl not smaller than 0 indicates that the current capacity is not expanded
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null); s = sumCount(); }}}// Returns a unique expansion flag corresponding to n capacity
    // Convert the information about n to the number of preceding 0's, using the number of preceding 0's to distinguish different n's
    //1 << (resize_stamp_bits-1), obtain a number with the 16th digit as 1 and the rest zeros, ensuring that the sign bit is 1 after moving 16 bits to the left, that is, a negative number
    // Then perform the or operation, add the information of n, obtain the mark value for expansion calculation
    static final int resizeStamp(int n) {
        return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
    }
Copy the code

Transfer method and helperTransfer method

Expansion of ConcurrentHashMap Expansion is accomplished by adding a temporary array and transferring all nodes in the original array to the new array. The minimum unit of capacity expansion is a node in the table. Each thread tries to participate in capacity expansion when it finds that the node it operates is being moved. It is controlled by the transferIndex field. Each thread updates the transferIndex before capacity expansion to mark the scope of nodes it processes. When the transferIndex turns to 0, it means that all nodes have threads processing capacity expansion transfer or have finished processing.

/** * Moves and/or copies the nodes in each bin to new table. See * above for explanation. */
    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        if ((stride = (NCPU > 1)? (n >>>3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        // If nextTab is null, capacity expansion is not initialized
        if (nextTab == null) {            // initiating
            try {
                // Create an array with twice the current size
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])newNode<? ,? >[n <<1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                // The maximum length is exceeded
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            // The number to be moved is n
            transferIndex = n;
        }
        // The new array length
        int nextn = nextTab.length;
        // Create a transfer node
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        // Go through the mark of the scope shared by this thread. If it is false, it means that the current allocated scope is complete, continue to get the task, or complete
        boolean advance = true;
        // The capacity expansion completed mark is used to check all nodes again after capacity expansion is complete
        I can't think of any situations where this check would make sense :)
        boolean finishing = false; // to ensure sweep before committing nextTab
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            // Continue the capacity expansion task for the current scope
            while (advance) {
                int nextIndex, nextBound;
                // The expansion subscript is less than or equal to the boundary, or it is marked complete, and the loop is not continued
                if (--i >= bound || finishing)
                    advance = false;
                //(nextIndex = transferIndex) <= 0 Indicates that the expansion is complete. I =-1 is used to determine the end of the expansion
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                // Cas updates nextIndex to indicate that the current thread has received the capacity expansion task for the current range and adjusts the current boundary
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false; }}// If I is less than 0, transferIndex<=0 indicates that the capacity expansion task of this thread has been completed and there are no unallocated capacity expansion tasks
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                // Mark as done
                if (finishing) {
                    // Table is set to nextTab and nextTable is cleared
                    nextTable = null;
                    table = nextTab;
                    // The expansion threshold changes to 1.5 times the current value
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                // If the thread is marked as incomplete, there are still threads that have not completed their assignment, and the attempt is sizecl-1, indicating that the current thread is completed.
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    If it is true, all threads have completed the task. If it is not equal to, there are still threads that have not completed the task. The expansion logic of this thread returns and waits for other threads to complete.
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    Change the finishing tag to true and iterate over all nodes to make sure that all nodes have been expanded.
                    finishing = advance = true;
                    // We assign I to n. The above while logic executes -- I every time in the loop, so the value of I is n to 0, and then performs the following node checking logic.
                    i = n; // recheck before commit}}// The current capacity expansion task of this thread is not completed
            The current node is null, the CAS node value is FWD transfer node, and the hash value of the transfer node is MOVED
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
            //2. Check whether the node has been transferred (for checking again after all nodes are completed)
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
            else {
                //3. The node has not been transferred to nextTab
                // Transfer nodes have a recalculation of subscripts, and since the length of nextTable is twice that of TAB, hash indices may change after transfer.
                // We know that since the length of TAB is always 2 to the n, the subscript is calculated by the following formula
                //int index = hash & (length-1); (That is, modulo n of hash.)
                // When length is 2*length, int newIndex = hash & (leng-1);
                // If newIndex is equal to index, we can get the following conclusion
                // We assume that the length binary x bit is 1, and when x equals 6 the binary is 00100000,
                // Then the binary representation of 2*length is 01000000. The difference between 2*length and 2* leng-1 is that the x-th bit of 2*length is 1, while the x-th bit of length is 0, so the difference between the hash and the two results is the x-th bit. If the x bit of hash is 0, then newIndex=index; If the x-th bit of hash is 1, then newIndex = index + 2^x; So newIndex = index + length;
                // Therefore, during the transfer process, there will be some nodes whose subscripts are larger than the original, the difference is length, that is, in the lower part of the array, we divide the first half into the low part, the second half is called the high part.
                synchronized (f) {
                    // Check whether the value of the current position has changed after the lock is complete. If it has changed, it indicates that another thread has changed the value
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        if (fh >= 0) {
                            // From the above analysis, we just need to find the value of the x bit of the hash to determine whether the subscript has changed
                            // Since n = 2 ^ x, the hash value and n are the x bits. If n = 1, the index is n more than the original index.
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            // Traverses the list of current nodes, recording the last node whose subscript changed
                            for(Node<K,V> p = f.next; p ! =null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            // reverse traversal the list, where the judgment condition p! = lastRun
                            // In order to avoid creating unnecessary new Node objects, there are links in the list that can be used directly to migrate to the new table as a whole, which is the purpose of the first forward traversal above.
                            // We know from the above analysis that the whole list is not reversed, it is possible to reuse the original chain. Comments at the beginning of source code are more likely to have to be treated. As follows:
                            //On average, only about one-sixth of them need cloning when a table doubles.
                            for(Node<K,V> p = f; p ! = lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;
                                // The index does not change, the node is placed in the low
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    // The subscript is changed and the node is placed high
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            // CAS updates the list of low level nodes
                            setTabAt(nextTab, i, ln);
                            // CAS updates the list of high level nodes
                            setTabAt(nextTab, i + n, hn);
                            // The location of the original TAB sets the transfer node to mark that this node has been transferred with the value nextTable
                            setTabAt(tab, i, fwd);
                            // Proceed to the next node in the current expansion range
                            advance = true;
                        }
                        // Migration of red-black tree nodes
                        else if (f instanceof TreeBin) {
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for(Node<K,V> e = t.first; e ! =null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null.null);
                                // Handle the low level
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                // Handle high
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    elsehiTail.next = p; hiTail = p; ++hc; }}// Check whether the migrated quantity storage format needs to be changed to a linked list
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                // Set the root node of the tree(hc ! =0)?new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                // Set the root node of the tree(lc ! =0)?new TreeBin<K,V>(hi) : t;
                            // Update the low value
                            setTabAt(nextTab, i, ln);
                            // Update high
                            setTabAt(nextTab, i + n, hn);
                            // The location of the original TAB sets the transfer node to mark that this node has been transferred with the value nextTable
                            setTabAt(tab, i, fwd);
                            // Proceed to the next node in the current expansion range
                            advance = true;
                        }
                    }
                }
            }
        }
    }
Copy the code
/** * Help expand */
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;
    // Check whether the current expansion is complete
    if(tab ! =null && (f instanceofForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) ! =null) {
        // Get the expansion tag value related to the length of the table
        int rs = resizeStamp(tab.length);
        // Check again that expansion is underway
        while (nextTab == nextTable && table == tab &&
               (sc = sizeCtl) < 0) {
            // If capacity expansion has been completed, return
            if((sc >>> RESIZE_STAMP_SHIFT) ! = rs || sc == rs +1 ||
                sc == rs + MAX_RESIZERS || transferIndex <= 0)
                break;
            //cas updates the number of threads to add itself
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                // Execute transfer to perform expansion operations with other threads
                transfer(tab, nextTab);
                break; }}return nextTab;
    }
    return table;
}
Copy the code