From: http://www.importnew.com/26049.html
preface
I’ve written about HashMaps before, and I’ve mentioned that when a HashMap is put, if an element is inserted beyond its capacity (determined by the load factor), it triggers a HashMap expansion operation, called a rehash, which hashes the contents of the original array back into the new expanded array. In a multi-threaded environment, If there are other elements that are also being put, if they have the same hash value, they may be represented in a linked list in the same array at the same time, resulting in a closed loop and an infinite loop in the case of get, so a HashMap is thread unsafe.
Let’s look at another set of key-value stores, HashTable, which is thread-safe. It uses the synchronized keyword to lock the entire table in all instances involving multi-threaded operations. This means that all threads are competing for a lock, which is safe in a multi-threaded environment, but inefficient.
For example, in a multi-threaded environment, there is no need to compete for a lock on different data sets, because they have different hash values, so there is no thread safety due to rehash, so it does not affect each other. This is the lock separation technology, reduce the granularity of the lock, using multiple locks to control multiple small tables, this is the main role of this article ConcurrentHashMap JDK1.7 core idea.
ConcurrentHashMap
The realization of JDK1.7
In JDK1.7, ConcurrentHashMap consists of a Segment array and multiple Hashentries, as shown in the figure below:
The Segment array is used to split a large table into several smaller tables for locking. Each Segment element is stored as a HashEntry array + linked list, which is similar to the data storage structure of a HashMap
Initialize the
The initialization of ConcurrentHashMap is to initialize the Segment size using the bit and operation ssize, as shown below
1
2
3
4
5
6
|
int sshift = 0 ;
int ssize = 1 ;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1 ;
}
|
As shown above, since ssize is calculated by the position operation (ssize <<=1), the Segment size is always 2 ^ N, irrespective of the value of concurrencyLevel. Of course, concurrencyLevel can only be expressed in 16-bit binary at most, 65536. In other words, the Segment size is 65536 at most, concurrencyLevel element initialization is not specified, and the Segment size ssize defaults to 16
The initialization of a HashEntry under each Segment element is also calculated as a location operation, denoted by cap, as shown below
1
2
3
|
int cap = 1 ;
while (cap < c)
cap <<= 1 ;
|
As shown above, the size of a HashEntry is also calculated to the power of 2 to the N (cap <<=1). Cap starts at 1, so the minimum size of a HashEntry is 2
The put operation
For ConcurrentHashMap data insertion, there are two hashes to locate where the data is stored
1
|
static class Segment<K,V> extends ReentrantLock implements Serializable {
|
When the Segment is put, the first key hash will be used to locate the Segment. If the Segment is not initialized, the CAS operation will be used to assign a value to it. If the Segment is not initialized, the CAS operation will be used to assign a value to it. A second hash operation is performed to find the location of the corresponding HashEntry. Using the properties of the inherited lock, when data is inserted into the specified HashEntry location (the end of the list), the tryLock () method inherited from ReentrantLock is used to try to acquire the lock. If the Segment is successfully acquired, insert it directly into the corresponding position. If a thread has already acquired the Segment, then the current thread will continue to call tryLock () spin to acquire the lock, and wait for the specified number of attempts to wake up.
Get operation
The ConcurrentHashMap get operation is similar to that of the HashMap, except that the ConcurrentHashMap must first hash the Segment and then hash it to the specified HashEntry. Traverse the list under the HashEntry for comparison and return null on failure.
The size operation
Calculating the ConcurrentHashMap element size is an interesting problem, because it is a concurrent operation, that is, when you calculate size, it is also inserting data concurrently, which may cause your calculated size to differ from your actual size (when you return size, To solve this problem, JDK1.7 uses two solutions.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for ( int j = 0 ; j < segments.length; ++j) ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0 ;
overflow = false ;
for ( int j = 0 ; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg ! = null ) { sum += seg.modCount; int c = seg.count; if (c < 0 || (size += c) < 0 )
overflow = true ;
}}
if (sum == last) break ;
last = sum; }}
finally {
if (retries > RETRIES_BEFORE_LOCK) {
for ( int j = 0 ; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
|
- In the first scheme, he will use the unlocked mode to try to calculate the size of ConcurrentHashMap for several times, at most three times, and compare the results of the two calculations before and after, if the results are consistent, it is considered that there is no element added and the calculation result is accurate.
- The second option is to lock each Segment and calculate the size of ConcurrentHashMap if the first option does not match.
The realization of the JDK1.8
The implementation of JDK1.8 has abandoned the concept of segments, and instead uses a Node array + linked list + red-black tree data structure. Concurrency control is managed using Synchronized and CAS, making it look like an optimized, thread-safe HashMap. While you can still see the Segment data structure in JDK1.8, the attributes have been simplified to be compatible with older versions.
Before diving into the PUT and GET implementations of JDK1.8, it is important to know some constant designs and data structures that form the basis of the ConcurrentHashMap implementation structure. Let’s take a look at the basic properties:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
// Maximum size of node array: 2^30=1073741824
private static final int MAXIMUM_CAPACITY = 1 << 30 ;
// The default initial value must be 2 acts
private static final int DEFAULT_CAPACITY = 16 ;
// The array may have a maximum value, which needs to be associated with the toArray () related method
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8 ;
// Concurrency level, legacy, for compatibility with previous versions
private static final int DEFAULT_CONCURRENCY_LEVEL = 16 ;
// Load factor
private static final float LOAD_FACTOR = 0 .75f;
// List to red black tree threshold,> 8 list to red black tree
static final int TREEIFY_THRESHOLD = 8 ;
<=UNTREEIFY_THRESHOLD, untreeify(lo))
static final int UNTREEIFY_THRESHOLD = 6 ;
static final int MIN_TREEIFY_CAPACITY = 64 ;
private static final int MIN_TRANSFER_STRIDE = 16 ;
private static int RESIZE_STAMP_BITS = 16 ;
// 2^15-1, help resize maximum number of threads
private static final int MAX_RESIZERS = ( 1 < < ( 32 - RESIZE_STAMP_BITS)) - 1 ;
// 32-16=16, the offset of the size recorded in sizeCtl
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
// Forwarding Nodes hash value
static final int MOVED = - 1 ;
// The hash value of the root node
static final int TREEBIN = - 2 ;
// The hash value of ReservationNode
static final int RESERVED = - 3 ;
// Number of available processors
static final int NCPU = Runtime.getRuntime().availableProcessors();
// Store an array of nodes
transient volatile Node<K,V>[] table;
/* Control identifier, which is used to control initialization and expansion of the table. Different values have different meanings
* When it is negative: - 1 It means initializing. -n means there is N- 1 Four threads are being expanded
* when 0 When: indicates that the table has not been initialized
* When the value is positive, it indicates the initial or next capacity expansion size
private transient volatile int sizeCtl;
|
The basic properties define some of the boundaries of ConcurrentHashMap and some of the controls that govern the operation. Let’s look at some of the internal structural components that are the core of the ConcurrentHashMap data structure.
Node
Node is the basic unit of the ConcurrentHashMap storage structure. It inherits from Entry in HashMap and is used to store data. The source code is as follows
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
static class Node<K,V> implements Map.Entry<K,V> {
// The data structure of the list
final int hash;
final K key;
// Val and next both change during expansion, so volatile is added to preserve visibility and disallow reordering
volatile V val;
volatile Node<K,V> next;
Node( int hash, K key, V val, Node<K,V> next) {
this .hash = hash;
this .key = key;
this .val = val;
this .next = next;
}
public final K getKey() { return key; }
public final V getValue() { return val; }
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
public final String toString(){ return key + "=" + val; }
// Update value is not allowed
public final V setValue(V value) {
throw new UnsupportedOperationException();
}
public final boolean equals(Object o) {
Object k, v, u; Map.Entry<? ,? > e;
return ((o instanceof Map.Entry) &&
(k = (e = (Map.Entry<? ,? >)o).getKey()) ! = null &&
(v = e.getValue()) ! = null &&
(k == key || k.equals(key)) &&
(v == (u = val) || v.equals(u)));
}
// For the get () method in map, overridden by subclasses
Node<K,V> find( int h, Object k) {
Node<K,V> e = this ;
if (k ! = null ) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek ! = null && k.equals(ek))))
return e;
} while ((e = e.next) ! = null );
}
return null ;
}
}
|
The Node data structure is simple. It is a linked list, but only allows data to be searched, not modified.
TreeNode
TreeNode inherits Node, but the data structure is changed into binary tree structure, which is the data storage structure of red-black tree and used to store data in red-black tree. When the number of nodes in the linked list is greater than 8, it will be converted into red-black tree structure.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
static final class TreeNode<K,V> extends Node<K,V> {
// Tree structure attribute definition
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red; // Mark the red node of the red-black tree
TreeNode( int hash, K key, V val, Node<K,V> next,
TreeNode<K,V> parent) {
super (hash, key, val, next);
this .parent = parent;
}
Node<K,V> find( int h, Object k) {
return findTreeNode(h, k, null );
}
// Find the corresponding TreeNode starting with the root node,
final TreeNode<K,V> findTreeNode( int h, Object k, Class<? > kc) {
if (k ! = null ) {
TreeNode<K,V> p = this ;
do {
int ph, dir; K pk; TreeNode<K,V> q;
TreeNode<K,V> pl = p.left, pr = p.right;
if ((ph = p.hash) > h)
p = pl;
else if (ph < h)
p = pr;
else if ((pk = p.key) == k || (pk ! = null && k.equals(pk)))
return p;
else if (pl == null )
p = pr;
else if (pr == null )
p = pl;
else if ((kc ! = null ||
(kc = comparableClassFor(k)) ! = null ) &&
(dir = compareComparables(kc, k, pk)) ! = 0 )
p = (dir < 0 )? pl : pr;
else if ((q = pr.findTreeNode(h, k, kc)) ! = null )
return q;
else
p = pl;
} while (p ! = null );
}
return null ;
}
}
|
TreeBin
TreeBin can be understood as a container that stores the tree structure, and the tree structure refers to the TreeNode, so TreeBin is the container that encapsulates the TreeNode. It provides some conditions and lock control for converting the black mangrove. Part of the source structure is as follows:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
static final class TreeBin<K,V> extends Node<K,V> {
// Points to the list of Treenodes and the root node
TreeNode<K,V> root;
volatile TreeNode<K,V> first;
volatile Thread waiter;
volatile int lockState;
// Read/write lock status
static final int WRITER = 1 ; // Obtain the write lock status
static final int WAITER = 2 ; // Wait for the write lock status
static final int READER = 4 ; // Add the state of the read lock
/ * *
* Initialize the red-black tree
* /
TreeBin(TreeNode<K,V> b) {
super (TREEBIN, null . null . null );
this .first = b;
TreeNode<K,V> r = null ;
for (TreeNode<K,V> x = b, next; x ! = null ; x = next) {
next = (TreeNode<K,V>)x.next;
x.left = x.right = null ;
if (r == null ) {
x.parent = null ;
x.red = false ;
r = x;
}
else {
K k = x.key;
int h = x.hash;
Class<? > kc = null ;
for (TreeNode<K,V> p = r;;) {
int dir, ph;
K pk = p.key;
if ((ph = p.hash) > h)
dir = - 1 ;
else if (ph < h)
dir = 1 ;
else if ((kc == null &&
(kc = comparableClassFor(k)) == null ) | |
(dir = compareComparables(kc, k, pk)) == 0 )
dir = tieBreakOrder(k, pk);
TreeNode<K,V> xp = p;
if ((p = (dir <= 0 )? p.left : p.right) == null ) {
x.parent = xp;
if (dir <= 0 )
xp.left = x;
else
xp.right = x;
r = balanceInsertion(r, x);
break ;
}
}
}
}
this .root = r;
assert checkInvariants(root);
}
.
}
|
Introduced ConcurrentHashMap main properties and internal data structure, now through a simple example to debug view to see the specific operation details of ConcurrentHashMap.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
public class TestConcurrentHashMap{
public static void main(String[] args){
ConcurrentHashMap<String,String> map = new ConcurrentHashMap(); // Initialize ConcurrentHashMap
// Add personal information
map.put( "id" . "1" );
map.put( "name" . "andy" );
map.put( "sex" . "Male" );
// Get the name
String name = map.get( "name" );
Assert.assertEquals(name, "andy" );
// Calculate the size
int size = map.size();
Assert.assertEquals(size, 3 );
}
}
|
We first initialize it with new ConcurrentHashMap()
1
2
|
public ConcurrentHashMap() {
}
|
As you can see from above, the initialization of ConcurrentHashMap is actually an empty implementation, it doesn’t do anything, as we’ll see later, and this is what makes it different from other collection classes, the initialization is not done in the constructor, it’s done in the PUT operation, Of course, ConcurrentHashMap provides other constructors that specify the size or load factor, just like HashMap, which will not be covered here.
The put operation
In the example above we added the personal information to call the put method, so let’s see.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
public V put(K key, V value) {
return putVal(key, value, false );
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null ) throw new NullPointerException();
int hash = spread(key.hashCode()); // Hash twice to reduce hash conflicts and distribute evenly
int binCount = 0 ;
for (Node<K,V>[] tab = table;;) { // Iterate over the table
Node<K,V> f; int n, i, fh;
// Null is used to call initTable for initialization, which belongs to lazy mode initialization
if (tab == null || (n = tab.length) == 0 )
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1 ) & hash)) == null ) { // If there is no data in the I position, insert directly without a lock
if (casTabAt(tab, i, null .
new Node<K,V>(hash, key, value, null )))
break ; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED) // If expansion is being performed, perform expansion first
tab = helpTransfer(tab, f);
else {
V oldVal = null ;
// If none of the above conditions are met, then a lock operation is performed, i.e. there is a hash conflict and the head node of the linked list or red-black tree is locked
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0 ) { // Indicates that the node is a linked list structure
binCount = 1 ;
for (Node<K,V> e = f;; ++binCount) {
K ek;
// Put overwrites the original value when the same key is involved
if (e.hash == hash &&
((ek = e.key) == key ||
(ek ! = null && key.equals(ek)))) {
oldVal = e.val;
if (! onlyIfAbsent)
e.val = value;
break ;
}
Node<K,V> pred = e;
if ((e = e.next) == null ) { // Insert the end of the list
pred.next = new Node<K,V>(hash, key,
value, null );
break ;
}
}
}
else if (f instanceof TreeBin) { // Red black tree structure
Node<K,V> p;
binCount = 2 ;
// The red-black tree structure is rotated to insert
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) ! = null ) {
oldVal = p.val;
if (! onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount ! = 0 ) { // If the length of the list is greater than 8, the red black tree will be transformed
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal ! = null )
return oldVal;
break ;
}
}
}
addCount(1L, binCount); // Check whether the capacity needs to be expanded
return null ;
}
|
The process of the PUT is clear. The current table is self-looping unconditionally until the PUT is successful. It can be summarized as a six-step process.
- If it is not initialized, the initTable () method is called to do the initialization
- If there is no hash conflict, CAS inserts directly
- If you are still expanding the capacity, expand the capacity first
- If there’s a hash conflict, we lock it to make it thread-safe, and there are two cases, one is a linked list and we go through the end and insert, the other is a red-black tree and we insert as a red-black tree,
- Finally, if the number of lists is greater than the threshold of 8, the list is converted to a black mangrove structure and the break enters the loop again
- If the addition is successful, call the addCount () method to count the size and check whether the capacity needs to be expanded
Now let’s go through the details of each step in the source code analysis. In the first step, the condition is initialized. Let’s look at the initTable () method
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
/ * *
* Initializes table, using the size recorded in sizeCtl.
* /
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0 ) { // Only empty tables can be initialized
if ((sc = sizeCtl) < 0 ) //sizeCtl<0 indicates that other threads are already initializing or expanding, suspending the current thread
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt( this , SIZECTL, sc, - 1 )) { // The CAS operation SIZECTL is -1, indicating the initialization status
try {
if ((tab = table) == null || tab.length == 0 ) {
int n = (sc > 0 )? sc : DEFAULT_CAPACITY;
@SuppressWarnings ( "unchecked" )
Node<K,V>[] nt = (Node<K,V>[]) new Node<? ,? >[n]; / / initialization
table = tab = nt;
sc = n - (n >>> 2 ); // Record the size of the next capacity expansion
}
} finally {
sizeCtl = sc;
}
break ;
}
}
return tab;
}
|
In step 2, without hash conflicts, you just call the CAS method on Unsafe and insert that element, and in step 3 if the container is expanding, you call the helpTransfer () method to help expand, so let’s follow up with the helpTransfer () method
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
/ * *
* Helps copy elements from the old table to the new table
* /
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab ! = null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) ! = null ) { // The new table nextTba can be expanded only when it already exists
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0 ) {
if ((sc >>> RESIZE_STAMP_SHIFT) ! = rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0 )
break ;
if (U.compareAndSwapInt( this , SIZECTL, sc, sc + 1 )) {
transfer(tab, nextTab); // Call the expansion method
break ;
}
}
return nextTab;
}
return table;
}
|
In fact, the purpose of the helpTransfer () method is to call multiple worker threads to help with the expansion so that it is more efficient, rather than just checking for the one thread to expand while the other threads wait for the expansion to complete.
Since we are involved in the expansion operation, let’s also look at the expansion method transfer ().
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
|
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// If the amount of processing per kernel is less than 16, the value of 16 is forcibly assigned
if ((stride = (NCPU > 1 )? (n >>> 3 ) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null ) { // initiating
try {
@SuppressWarnings ( "unchecked" )
Node<K,V>[] nt = (Node<K,V>[]) new Node<? ,? >[n << 1 ]; // Build a nextTable object with twice the capacity
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return ;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
// Join point pointer for flag bits (FWD hash value -1, fwD.nexttable =nextTab)
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// Advance == true indicates that the node has already been processed
boolean advance = true ;
boolean finishing = false ; // to ensure sweep before committing nextTab
for ( int i = 0 , bound = 0 ;;) {
Node<K,V> f; int fh;
// Control -- I to iterate over the nodes in the original hash table
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false ;
else if ((nextIndex = transferIndex) <= 0 ) {
i = - 1 ;
advance = false ;
}
// transferIndex calculated using CAS
else if (U.compareAndSwapInt
( this , TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0 ))) {
bound = nextBound;
i = nextIndex - 1 ;
advance = false ;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// Replication of all nodes is complete
if (finishing) {
nextTable = null ;
table = nextTab; // Table points to nextTable
sizeCtl = (n << 1 ) - (n >>> 1 ); // The sizeCtl threshold is 1.5 times the original size
return ; // Break the loop,
}
// CAS expands the size threshold. In this value, subtract one from the value of SizectL, indicating that a new thread is added to the capacity expansion operation
if (U.compareAndSwapInt( this , SIZECTL, sc = sizeCtl, sc - 1 )) {
if ((sc - 2 ) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return ;
finishing = advance = true ;
i = n; // recheck before commit
}
}
// If the traversed node is null, put it into the ForwardingNode pointer node
else if ((f = tabAt(tab, i)) == null )
advance = casTabAt(tab, i, null , fwd);
// f.ash == -1 indicates that the ForwardingNode is traversed, which means that the node has already been processed
// This is the core of controlling concurrent expansion
else if ((fh = f.hash) == MOVED)
advance = true ; // already processed
else {
// Lock the node
synchronized (f) {
// Node replication works
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// fh >= 0, indicating a linked list node
if (fh >= 0 ) {
// Construct two lists, one is the original list and the other is the reverse order of the original list
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p ! = null ; p = p.next) {
int b = p.hash & n;
if (b ! = runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0 ) {
ln = lastRun;
hn = null ;
}
else {
hn = lastRun;
ln = null ;
}
for (Node<K,V> p = f; p ! = lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0 )
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// Insert the linked list at nextTable
setTabAt(nextTab, i, ln);
// Insert the linked list at nextTable + n
setTabAt(nextTab, i + n, hn);
// Insert ForwardingNode at table I to indicate that the node has been processed
setTabAt(tab, i, fwd);
// advance = true can perform the -- I action, traversing the node
advance = true ;
}
// If it is a TreeBin, it is processed as a red-black tree, and the processing logic is the same as above
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null , loTail = null ;
TreeNode<K,V> hi = null , hiTail = null ;
int lc = 0 , hc = 0 ;
for (Node<K,V> e = t.first; e ! = null ; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null . null );
if ((h & n) == 0 ) {
if ((p.prev = loTail) == null )
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null )
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// If the number of nodes in the tree is less than or equal to 6 after capacity expansion, the tree is transferred to a linked list
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc ! = 0 )? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc ! = 0 )? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true ;
}
}
}
}
}
}
|
The capacity expansion process is a bit complicated, mainly involving multi-threaded concurrent capacity expansion. ForwardingNode is used to support capacity expansion, and the processed nodes and empty nodes are set as ForwardingNode. When multiple threads pass through ForwardingNode in concurrent processing, it means that they have been traversed. The following figure is the process of multi-threaded cooperation:
In the fourth step, we add nodes to the list or red-black tree. In the fifth step, we call the treeifyBin () method to turn the list into a red-black tree.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab ! = null ) {
// If the total number of tables is less than 64, then double the size of the table
// Because this threshold expansion reduces hash collisions, there is no need to go to the red-black tree
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1 );
else if ((b = tabAt(tab, index)) ! = null && b.hash >= 0 ) {
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null , tl = null ;
for (Node<K,V> e = b; e ! = null ; e = e.next) {
// Encapsulate TreeNode
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null . null );
if ((p.prev = tl) == null )
hd = p;
else
tl.next = p;
tl = p;
}
// Convert the TreeNode to a red-black tree using the TreeBin object
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
|
Step 6 indicates that the data has been added successfully. Now call addCount() to calculate the size of ConcurrentHashMap and add one to the original. Now look at addCount().
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
private final void addCount( long x, int check) {
CounterCell[] as; long b, s;
// Update baseCount, the number of tables, and counterCells to indicate the number of elements changed
if ((as = counterCells) ! = null ||
! U.compareAndSwapLong( this , BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true ;
// If more than one thread is executing, CAS fails. Run fullAddCount to add all threads to count
if (as == null || (m = as.length - 1 ) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
! (uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return ;
}
if (check <= 1 )
return ;
s = sumCount();
}
//check>=0 indicates that capacity expansion needs to be performed
if (check >= 0 ) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= ( long )(sc = sizeCtl) && (tab = table) ! = null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0 ) {
if ((sc >>> RESIZE_STAMP_SHIFT) ! = rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0 )
break ;
if (U.compareAndSwapInt( this , SIZECTL, sc, sc + 1 ))
transfer(tab, nt);
}
// The current thread initiates the library oh-oh let the operation, nextTable=null
else if (U.compareAndSwapInt( this , SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2 ))
transfer(tab, null );
s = sumCount();
}
}
}
|
The process of PUT is now analyzed, and you can see that it uses optimistic locking for concurrent processing. The process steps are clear, but the details are complex, because multi-threaded scenarios are complex.
Get operation
Let’s go back to the example we started with. After we added the personal information, we want to get the new information, using String name = map.get(” name “) to get the new name information. ConcurrentHashMap get();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode()); // Compute the hash twice
if ((tab = table) ! = null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1 ) & h)) ! = null ) { // Read the Node element of the first Node
if ((eh = e.hash) == h) { // Return if this is the first node
if ((ek = e.key) == key || (ek ! = null && key.equals(ek)))
return e.val;
}
// If the hash value is negative, it indicates that expansion is under way. In this case, the find method of ForwardingNode is checked to locate nextTable
// Find, find, return
else if (eh < 0 )
return (p = e.find(h, key)) ! = null ? p.val : null ;
while ((e = e.next) ! = null ) { // If it is not the first node or ForwardingNode, then go down
if (e.hash == h &&
((ek = e.key) == key || (ek ! = null && key.equals(ek))))
return e.val;
}
}
return null ;
}
|
The get operation of ConcurrentHashMap is simple and clear, and can be described in three steps
- Compute the hash value, locate the table index, and return if the first node matches
- If a node is being expanded, the find method that marks the node being expanded is called to look for the node, and a match is returned
- If none of the above is true, then we iterate through the nodes and return a match, otherwise we return null
The size operation
Int size = map.size(); Now let’s look at the size() method
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > ( long )Integer.MAX_VALUE) ? Integer.MAX_VALUE :
( int )n);
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a; // The number of changes
long sum = baseCount;
if (as ! = null ) {
for ( int i = 0 ; i < as.length; ++i) {
if ((a = as[i]) ! = null )
sum += a.value;
}
}
return sum;
}
|
In the JDK1.8 version, the size calculation is already handled in the expansion and addCount() methods. In the JDK1.7 version, the size() method is called to calculate the size. In fact, it does not make sense to calculate the size in the concurrent set, because the size changes in real time and can only be calculated at a certain moment. But a moment is too soon, and human perception is a period of time, so it’s not very precise.
Summary and reflection
ConcurrentHashMap’s data structure is similar to that of HashMap. In contrast, ConcurrentHashMap only adds synchronization operations to control concurrency. From JDK1.7 ReentrantLock+Segment+HashEntry to JDK1.8 synchronized+CAS+HashEntry+ red-black tree
- The implementation of JDK1.8 reduces the granularity of the lock. The granularity of the JDK1.7 version lock is based on the Segment and contains multiple HashEntries, whereas the granularity of the JDK1.8 lock is HashEntry (first node).
- The new version of JDK1.8 makes the data structures much simpler and easier to operate. With the use of synchronized, there is no need for the concept of Segment locking, and the implementation complexity is increased due to the reduced granularity
- JDK1.8 uses the red-black tree to optimize the list, based on the long list traversal is a very long process, and the red-black tree traversal efficiency is very fast, instead of a certain threshold of the list, so as to form an optimal partner
- JDK1.8 uses synchronized built-in locks instead of ReentrantLock.
- Due to the reduced granularity, synchronized is not worse than ReentrantLock in the relatively low granularity locking mode. In the coarser granularity locking mode, ReentrantLock may control various low granularity boundaries through the Condition, which is more flexible. In the low granularity locking mode, Condition’s advantage is gone
- The JVM development team never gave up on synchronized, and there is more room for synchronized optimization based on the JVM, and using embedded keywords is more natural than using the API
- Api-based ReentrantLock costs more memory for JVM memory stress under large data operations, not a bottleneck, but a choice
reference
- http://blog.csdn.net/u010412719/article/details/52145145
- http://www.jianshu.com/p/e694f1e868ec
- https://my.oschina.net/liuxiaomian/blog/880088
- https://bentang.me/tech/2016/12/01/jdk8-concurrenthashmap-1/
- http://cmsblogs.com/?p=2283