An overview of the

ConcurrentHashMap is recommended for concurrent multithreading. What is ConcurrentHashMap? What is its design idea, how is the source code implemented?

What is ConcurrentHashMap

Concurrent translates to Concurrent and is literally used as a HashMap for handling Concurrent situations, a recap before introducing it. We learned from the previous two sections that a HashMap is unsafe for multithreaded concurrency (e.g., an infinite loop). More generally, a HashMap is unsafe for multithreaded concurrency (e.g., an infinite loop). Since the heap memory is shared by each thread, and the HashMap put method is not an atomic operation, let’s assume that Thread1 puts first. Then sleep for 2 seconds, during which time the value is changed by Thread2. When Thread1 “wakes up” and gets again, it finds that the value is no longer the original value, which can cause problems.

So how to avoid this multi-threaded “Audi into Otto” situation? The common idea is to lock the HashMap put method (synchronized) and ensure that only one thread can write to the HashMap at a time. However, if thread 1 takes a long time to operate and doesn’t get out of the pit for half a day, other threads that need to operate on the HashMap will have to queue up at the door for half a day, which can seriously affect the user experience (which is what HashTable does). In addition to saving money, many banks also support saving and saving valuables, which are kept in safe deposit boxes.

A thread is a person, as follows:

  • HashMap Bank: Our service is designed to eliminate queues and allow multiple people to change the contents of safes at the same time. You think you’re saving dollars? What they took out was yen, and the bankruptcy was instantaneous.
  • HashTable bank: Our service is to wait in line so that only one person at a time has a chance to change the contents of the safe, and the rest of us can only see and not make changes. What? What do you think happens if the guy falls asleep in there and doesn’t come out? Don’t worry. Here, sit down and play mah-jongg until he comes out.

Using HashMap in multithreading is too uncertain and has the risk of bankruptcy, so it cannot be selected. HashTable is not bankrupt, but the user experience is not very good, so how can multiple access without affecting others to save value, without queuing? Some people propose to establish a “bankers’ union”. It would be good to open more banks like HashTable with locks. As many people handle business, we can open as many banks, providing one-to-one services.

Here’s what happened next: So Galen and Yasso go to the bank to deposit their big sword, and the league of Bankers says to Galen, bank Number one is empty, you can deposit your big sword there, you don’t have to wait in line, and Galen goes to Bank Number one to deposit his big sword, and bank Number one takes Galen in, pulls the brakes, and runs, Then put Galen’s great sword in the x safe, row X, and wait until Galen has left before opening the lock. In the same way, the Union of Bankers said to Yasso, bank No. 2 is empty now, so you can go there to deposit your sword without waiting in line. Then Yasso went to Bank No. 2 to deposit his sword, and bank No. 2 took Yasso in and immediately pulled the gate. No matter how long Galen and Yasuo stay in their respective banks, they don’t have to worry about their swords being swapped. So that’s the idea of ConcurrentHashMap, just a graph

As you can see from the figure above, the corresponding individual banks are locked, not the entire “bankers union”. Analyze the characteristics of this design:

  • A “banker’s union” of banks
  • When someone comes to do business, the bankers’ union needs to determine which bank that person goes to
  • When this person goes to the designated bank to handle business, the bank is locked, other people can not perform modification operations at the same time, until this person leaves the unlock

These basic ideas can trigger some thinking, such as:

1. How many banks did you first know when you set up the “Bankers’ Union”? How to design reasonably?

The above chart does not give the conclusion of whether there is a need to queue, because it needs to be analyzed based on the actual situation. For example, there are 16 banks at the beginning and only two people to handle business, so there is no need to queue. If 16 banks have people doing business and the 17th person shows up, he still has to wait in line. As the “bankers’ Union” cannot know how many people will come to handle business in advance, so it needs to establish a “standard” at the time of its establishment, that is, the initial number of banks. In the case of a large number of people, the “bankers’ Union” should open more banks to avoid others queuing. Fewer people should be less open, to avoid wasting money (what, you say not poor money? That won’t do either)

2. When someone comes to do business, how does the bankers’ union determine which bank the person goes to?

Normally, if all the banks were unlocked, there would be no queue for anyone to go anywhere, and if some of the banks were already locked, the League of Bankers would not be able to direct customers to the locked banks, otherwise they would be hammered into muggles in minutes. Therefore, the “bankers union” needs to keep a clear head at all times, know their bank’s idle situation, and recommend the best choice to users every time.

3. How can the “bankers’ Union” guarantee that no two people will have access to the same bank at the same time?

This is done by locking/unlocking the specified bank.

Source code analysis

Java7 source code analysis

Through Java7 source code analysis code implementation, first look at some important members

Static final int DEFAULT_INITIAL_CAPACITY = 16; static final int DEFAULT_INITIAL_CAPACITY = 16; // Expansion factor 0.75 static finalfloatDEFAULT_LOAD_FACTOR = 0.75 f; // Final Segment<K,V>[] segments // default concurrent segments 16 static final int DEFAULT_CONCURRENCY_LEVEL = 16; //Segment is a subclass of ReentrantLock Static final class Segment<K,V> extends ReentrantLock implements Serializable {// Transient Volatile HashEntry<K,V>[] table; transient volatile HashEntry<K,V>[] table; transient int count; transient int threshold; finalfloat loadFactor;

  Segment(floatlf, int threshold, HashEntry<K,V>[] tab) { this.loadFactor = lf; this.threshold = threshold; this.table = tab; }} // change the vest or know you!! A HashEntry object that holds keys, values,hashValue and the next node static final class HashEntry<K,V> {final inthash; final K key; volatile V value; volatile HashEntry<K,V> next; } //segment HashEntry[] static final int MIN_SEGMENT_TABLE_CAPACITY = 2; // The final int segmentMask is used to locate the segment in the segments array. final int segmentShift;Copy the code

This is going to be a little bit of a shock, but I’m going to cover it.

Next, we will start with the simplest cognition

ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
Copy the code

The default constructor calls a constructor that takes three arguments

    public ConcurrentHashMap() {
        this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
    }

    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if(! (loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException();if(concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // step ① start int sshift = 0; int ssize = 1;while(ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } this.segmentShift = 32 - sshift; this.segmentMask = ssize - 1; // Step ① end // Step ② startif (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)
            cap< < = 1; Start Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int))(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; // step ③ end}Copy the code

There are so many temporary variables defined and so few comments that at first glance you don’t know what the hell they mean, but we can plug in the data we already know, figure out the values of these variables, and then analyze it to see if there’s something wrong with them. Assume this is the first default creation:

  • Step 1 concurrencyLevel = 16, sshift = 4, ssize = 16, segmentShift = 28, segmentMask = 15;
  • Step 2 c = 16/16 = 1, cap = 2;
  • Segment array segments [0]; Segment array segments [0]; Segment array segments [0]; Now look at the initialization of ss. Ssize is 16, so the default array length is 16, so it feels exactly the same as the concurrencyLevel we passed, right? Look at the example below
Example 1 Example 2
Ssize = 1, concurrencyLevel = 10 Ssize = 1, concurrencyLevel = 8
Ssize <<= 1 – > 2<10 satisfy Ssize <<= 1 – > 2<10 satisfy
Ssize <<= 1 – > 4<10 satisfy Ssize <<= 1 – > 4<10 satisfy
Ssize <<= 1 – > 8<10 satisfy Ssize <<= 1 – > 8<10 Does not meet ssize = 8
Ssize <<= 1 – > 16<10 Does not meet ssize = 16

ConcurrencyLevel is not necessarily the length of the final array.

Length = 2 ^ n (2 ^ n >= concurrencyLevel)

Create Segment array with length 16 and initialize position 0 of Segment array

Now let’s look at the put method

public V put(K key, V value) { Segment<K,V> s; // Step ① Note that valus cannot be empty!!if(value == null) throw new NullPointerException(); // Calculate by keyhashValue, key cannot be null, otherwisehash(key) returns a null pointer inthash = hash(key); // Step ② comes in handy, according tohashThe position of the value computed in the segments array int j = (hash>>> segmentShift) & segmentMask; // Step 3 Check whether the specified Segment in the array is empty // If the Segment is empty, create an initial Segment and then put the value.if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
        return s.put(key, hash, value, false);
    }
Copy the code

If the key/value value is null, a null pointer exception is reported. Step 2 first calculates the hash value based on the key value, and then calculates which Segment the key should be placed in with the previously calculated two variables (if you are interested in how to calculate the Segment, you can study it first, then take and). Assume that the key value pair should be placed in number 5, and step 3 determines that number 5 is empty. Take a look at the ensureSegment() method

Private Segment<K,V> ensureSegment(int K) {// obtain segments final Segment<K,V>[] ss = this.segments; long u = (k << SSHIFT) + SBASE; // raw offset Segment<K,V> seg;if((seg = (Segment<K,V>) unsafe. getObjectVolatile(ss, u)) == null) {Segment<K,V> proto = ss[0];  // use segment 0 as prototypecap= proto.table.length; // Load factor = 0.75 as segment 0floatlf = proto.loadFactor; Int threshold = (int)(segment 0)cap* lf); TAB HashEntry<K,V>[] TAB = (HashEntry<K,V>[])new HashEntry[cap]; // Check againif((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, U)) == null) {// recheck Segment<K,V> s = new Segment<K,V>(lf, threshold, TAB);while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                       == null) {
                    if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                        break; }}}return seg;
    }
Copy the code

Segment [0], which copies segments[0], is the same configuration as Segment [0]. Looking UNSAFE, the new Segment must have the same configuration as Segment [0].

Now that you have “stage”, please start your performance by looking at the Segment put method

        final V put(K key, int hash,V value, Boolean onlyIfAbsent) {step ① start HashEntry<K,V> node = tryLock()? null : scanAndLockForPut(key,hash, value); // Step ① end V oldValue; Try {// step ② start // get HashEntry from Segment [] HashEntry<K,V>[] TAB = table; Int index = (tab.length - 1) &hash; HashEntry<K,V> first = entryAt(TAB, index);for(HashEntry<K,V> e = first;;) {// If not empty, traverse the chainif(e ! = null) { K k; // If (1) has been saved before, replace the original valueif ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            oldValue = e.value;
                            if(! onlyIfAbsent) { e.value = value; ++modCount; }break;
                        }
                        e = e.next;
                    }
                    else{// case ② another thread is readyif(node ! = null) // Node.setNext (first);elseNode = new HashEntry<K,V>(hash, key, value, first); Int c = count +1; // If the number of key-value pairs exceeds the thresholdif(c > threshold && tab.length < MAXIMUM_CAPACITY) // Capacity expansionrehash(node);
                        else// The value does not exceed the threshold and is directly placed in the specified positionsetEntryAt(tab, index, node); ++modCount; count = c; OldValue = null;break; }} // step ② end} finally {// step ③ // unlock(); } // Successful modification, return the original valuereturn oldValue;
        }
Copy the code

The above put method is essentially the same as in the Java7 HashMap, except that the lock/unlock step is added to ensure that only one thread can modify at a time. Step by step to analyze the above process:

  • Step 1 Run the tryLock method to obtain the lock. If the lock is obtained, return null. If the lock is not obtained, run the scanAndLockForPut method.
  • Step 2 is the same as the set of ideas in the HashMap. If you do not understand, please refer to the introduction of the previous article (case 2 is introduced below).
  • Step 3 Use unLock to unLock the device

If Thread1 now comes in and no one has been there before, it can successfully acquire the lock and calculate that the key/value pair it wants to store should be placed at position 0 of HashEntry[], which is empty. Create a HashEntry and place it at position 0 using the setEntryAt() method. However, before Thread1 can release the lock, the system time slice is cut to Thread2

Segments [5], Thread2 tries to acquire the lock, and fails. Execute scanAndLockForPut() :

        private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {// pass Segment andhashValue find matching HashEntry HashEntry<K,V> first = entryForHash(this,hash); HashEntry<K,V> e = first; HashEntry<K,V> node = null; Int retries = -1; // negativewhileLocating node // The loop tries to get the lockwhile(! tryLock()) { HashEntry<K,V> f; // To recheck first belowif(retries < 0) {// Condition 1 does not exist in the previous tableif (e == null) {
                        if(node == null) // Speculatively create node // Create HashEntry standby,retries to 0 node = new HashEntry<K,V>(hash, key, value, null); retries = 0; } // the first node is retrieselse if(key.equals(e.key)) retries = 0; // If the first node is not, move to the next node (retries is still -1)elsee = e.next; } // step ② // Failed to get the lock after trying MAX_SCAN_RETRIES.else if(++retries > MAX_SCAN_RETRIES) {break; } // Step 3 // If the entry corresponding to the key changes during MAX_SCAN_RETRIES, start from the beginningelse if ((retries & 1) == 0 &&
                         (f = entryForHash(this, hash)) != first) {
                    e = first = f; // re-traverse ifentry changed retries = -1; }}return node;
        }
Copy the code

Thread2 does not have access to the lock, but it does not have access to the lock. It uses the lock time to calculate the location of the key/value pair in the array, so that when Thread2 takes the lock, it can locate it immediately. For example, Thread2 finds that the value it wants to change is in position 0, but in Thread1 it changes the value to position 1.

If Thread2 put is (” yasso “, “98”) and corresponds to position 1, then the scanAndLockForPut method corresponds to position 1.

Segment[5] : select * from Segment[5] : select * from Segment[5] : select * from Segment[5] : select * from Segment[5];

        private void rehash(HashEntry<K,V> node) {
            /*
             * Reclassify nodes in each list to new table.  Because we
             * are using power-of-two expansion, the elements from
             * each bin must either stay at same index, or move with a
             * power of two offset. We eliminate unnecessary node
             * creation by catching cases where old nodes can be
             * reused because their next fields won't change. * Statistically, at the default threshold, only about * one-sixth of them need cloning when a table * doubles. The nodes they replace will be garbage * collectable  as soon as they are no longer referenced by * any reader thread that may be in the midst of * concurrently traversing Table. Entry accesses use plain * array indexing because they are followed by volatile * table write. */ / Old array references HashEntry
      
       [] oldTable = table; Int oldCapacity = oldtable.length; // The new array is twice as long as the old array int newCapacity = oldCapacity << 1; // Change the new threshold. Threshold = (int)(newCapacity * loadFactor); // Create a new table HashEntry
       
        [] newTable = (HashEntry
        
         []) new HashEntry[newCapacity]; int sizeMask = newCapacity - 1; For (int I = 0; i < oldCapacity ; i++) { HashEntry
         
           e = oldTable[i]; if (e ! = null) { HashEntry
          
            next = e.next; // Determine the position in the new table int idx = e.hash & sizeMask; If (next == null) // Single node on list newTable[idx] = e; else { // Reuse consecutive sequence at same slot HashEntry
           
             lastRun = e; int lastIdx = idx; for (HashEntry
            
              last = next; last ! = null; Last = last.next) {last = last.next) {last = last.next) {last = last.next); if (k ! = lastIdx) { lastIdx = k; lastRun = last; NewTable [lastIdx] = lastRun; newTable[lastIdx] = lastRun; For (HashEntry
             
               p = e; for (HashEntry
              
                p = e; p ! = lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry
               
                 n = newTable[k]; newTable[k] = new HashEntry
                
                 (h, p.key, v, n); }}}} // Add new nodes (linked header insertion mode) int nodeIndex = node.hash & sizeMask; // add the new node node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; }
                ,v>
               ,v>
              ,v>
             ,v>
            ,v>
           ,v>
          ,v>
         ,v>
        ,v>
       ,v>
      ,v>Copy the code

As we learned from HashMap expansion in the previous chapter, the new location of the key-value pair after expansion is either the same as the original location or equal to the original location + the length of the old array, so draw a graph to understand the reason for the above code:

Premise: The current length of HashEntry[] is 8, and the threshold is 8*0.75 = 6, so the 7th key pair of PUT needs to be expanded, and the positions of Galen and Yasso remain unchanged before and after expansion, and the positions of Glamour and Carter need to be added to the original array length, so the above code flow is executed:






In the above code, nodes that need to be transferred before and after capacity expansion are identified, transferred first, and then the remaining nodes on the chain are transferred. The reason for this is to achieve reuse effect. It is also stated in the note that only 1/6 nodes need to be cloned under the default threshold. Note that so far, Java7 has adopted the header insertion mode for both expansion migration and new nodes. The flow chart is as follows:

In contrast, the get method does not lock/unlock operations, so the code is relatively simple and will not be analyzed.

A little bit about Java8

Java8 has some major differences from Java7, such as eliminating the Segments array and allowing concurrent expansion.

Let’s look at the initialization of ConcurrentHashMap

public ConcurrentHashMap() {}Copy the code

Unlike Java7, this is an empty method, so how does it initialize? Let’s look at the put method

public V put(K key, V value) {
    return putVal(key, value, false);
}

/** Implementation forPut and putIfAbsent */ final V putVal(K key, V value, Boolean onlyIfAbsent) {// Key /value cannot be empty!!if(key == null || value == null) throw new NullPointerException(); / / calculatehashValue of the inthash = spread(key.hashCode());
    int binCount = 0;
    for(Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; ① If the table is null, it is initializedif(tab == null || (n = tab.length) == 0) tab = initTable(); // The CAS method checks whether the specified location is null. If it is null, create a new node and set it to the specified location through the CAS methodelse if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break; // No lock when adding to empty bin} // The current node is expandingelse if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f); // The specified position is not emptyelse{ V oldVal = null; Synchronized (f) {if(tabAt(TAB, I) == f) {// The node is a linked listif(fh >= 0) { binCount = 1; // Walk through the whole chainfor(Node<K,V> e = f;; ++binCount) { K ek; // If it already exists, replace the original valueif (e.hash == hash&& ((ek = e.key) == key || (ek ! = null && key.equals(ek)))) { oldVal = e.val;if(! onlyIfAbsent) e.val = value;break; } Node<K,V> pred = e; // If a new node is added, it is added with a tail insertif ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break; }}} // The node is a red-black treeelse if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; // Walk through the red black treeif ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) ! = null) { oldVal = p.val;if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                    else if (f instanceof ReservationNode)
                        throw new IllegalStateException("Recursive update"); }}if(binCount ! = 0) {// If the number of nodes in the list exceeds 8, it becomes a red-black treeif (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if(oldVal ! = null)return oldVal;
                break; AddCount (1L, binCount);return null;
}
Copy the code

The code is a bit long and may cause discomfort at first glance, mainly due to the introduction of red-black tree judgment and manipulation, as well as thread-safe manipulation. If key/value is null, a null pointer exception is reported. This is also a significant difference from HashMap.

Note 1.

Call initTable to initialize the array

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while((TAB = table) = = null | | TAB. The length = = 0) {/ / sizeCtl less than zero, the current thread cede executive powerif((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; The just spin //CAS operation changes sizeCtl to -1else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked") / / the default create an array of size for 16 Node < K, V > [] nt = (Node < K, V > []) new Node <? ,? >[n]; table = tab = nt; sc = n - (n >>> 2); SizeCtl = sc; }break; }}return tab;
}
Copy the code

The put method does not lock, so how does it ensure concurrency safety when creating a new table? SizeCtl defaults to 0. When one thread initializes the array, it changes sizeCtl to -1. This change is visible to other threads because it is volatile. The code above sees that the subsequent thread decides sizeCtl is less than 0 and gives up execution.

Note 2.

Java8 does away with segments and instead locks individual positions in arrays. When the node at the specified location is not null, the situation is similar to the Java8 HashMap operation in that new nodes are added tail-inserted.

Note 3.

Java8 ConcunrrentHashMap supports concurrent capacity expansion. Previously, it was always a thread transferring key-value pairs from an old array to a new array. Of course, the corresponding concurrent processing control logic is more complex, expansion transfer by transfer method to complete, Java8 in the method is very long, interested can look at the source…

Use a diagram to represent what the Java8 ConcurrentHashMap looks like

conclusion

Through the analysis of source, compared the difference of the HashMap and ConcurrentHashMap, and Java 7 and Java8 ConcurrentHashMap design is different, of course, a lot of pit without filling, such as the call a lot of UNSAFE methods of CAS, Can reduce the performance of consumption, usually rarely used, understand less; And the specific principle and implementation of red black tree, then slowly fill in…