put()
Method is the focus of concurrent HashMap source analysis method, which involves concurrent expansion, bucket addressing and so on…- JDK1.8 ConcurrentHashMap
1, put method source code analysis
// Put a data to the concurrent Map
public V put(K key, V value) {
return putVal(key, value, false);
}
// Put a data to the concurrent Map
// Key: data Key
// value: indicates the data value
// onlyIfAbsent: Whether to replace data:
// If the value is false, the Map with the same K and V will be replaced when the Map is put
// If the value is true, data with the same K and V in the Map will not be replaced or inserted when put data
final V putVal(K key, V value, boolean onlyIfAbsent) {
// Control k and v cannot be null
if (key == null || value == null) throw new NullPointerException();
// With the spread method, you can make the higher order part of the forward addressing operation, so that the resulting hash value is more scattered
int hash = spread(key.hashCode());
// binCount Indicates the subscript position of the linked list in the bucket after k-V is encapsulated as node and inserted into the specified bucket
// =0 indicates that the bucket bit is null and node can be directly added
// >0 indicates that there are nodes in the bucket. Iterate through the linked list, add the new node encapsulating K-V to the end of the list, and record the subscript binCount at the end of the list
// =2 indicates that the current bucket bit ** may be a red-black tree
int binCount = 0;
// TAB refers to the table of the map object
/ / spin
for (Node<K,V>[] tab = table;;) {
// f is the head of the bucket
// n indicates the length of the hash table array
// I indicates the bucket bit index obtained after the key is addressed
// fh indicates the hash value of the bucket header
Node<K,V> f; int n, i, fh;
// -----------------------------------------------------------------------------
// CASE1: yes, indicating that the table in the current map is not initialized...
if (tab == null || (n = tab.length) == 0)
// Initialize the table in the map
tab = initTable();
// -----------------------------------------------------------------------------
// CASE2: the table has been initialized, and a bucket is identified according to the addressing algorithm, and the bucket's head node F is null
// I indicates key. The routing addressing algorithm is used to obtain the subscript position of the table array corresponding to key. The tabAt method is used to obtain the head node F of specified bucket bit I
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// The node that encapsulates k-V can be placed directly into the bucket
// casTabAt uses CAS to set the Node value to the specified location I in the Node array. Returns true on success and false otherwise
if (casTabAt(tab, i, null.new Node<K,V>(hash, key, value, null)))
break;
}
// -----------------------------------------------------------------------------
// CASE3: the table has been initialized, and the addressed bucket header f is not null, if the hash value of the header f is -1:
// Indicates that the current node is FWD(Forwarding)
// If CASE3 is true, the head of the current bucket is FWD, and the map is being expanded.
else if ((fh = f.hash) == MOVED)
// If f is a FWD node, the current node has the obligation to help the current map to complete the data migration...
// We'll do it again
tab = helpTransfer(tab, f);
// -----------------------------------------------------------------------------
// CASE4: Case1-3 does not meet the requirements, then the current bucket may be a linked list or a red-black tree agent node TreeBin
else {
// Preserve the data reference before the replacement: When the newly inserted key exists, the old value is assigned to OldVal, which is returned to the put method call
V oldVal = null;
// Use synchronized to lock "head nodes"
synchronized (f) {
// -----------------------------------------------------------------------
// CASE5: tabAt(TAB, I) == f
// Check whether the current bucket's head node is the same as the previously obtained head node.
// If another thread changes the head of the bucket before the current thread, then the current thread can use sync to lock the bucket f.
if (tabAt(tab, i) == f) {// If the condition is true, there is no problem with the lock object f.
// ------------------------------------------------------------------
// CASE6: fh >= 0)
// If this condition is true, the current bucket level is an ordinary bucket level.
// static final int MOVED = -1; Indicates that the current node is FWD(Forwarding)
// static final int TREEBIN = -2; Indicates that the current node is treed
if (fh >= 0) {
// 1. If the current insert key is inconsistent with the key of all elements in the list, the current insert operation is appended to the end of the list. BinCount indicates the length of the list
// 2. If the current insert key is the same as the key of an element in the linked list, the current insert operation is probably a replacement. BinCount indicates the location of the conflict (bincoun-1)
binCount = 1;
// The list of buckets in the current iteration loop, e is the node processed for each iteration.
for (Node<K,V> e = f;; ++binCount) {
// The current loop iterates over the key of the node
K ek;
// --------------------------------------------------------
/ / CASE7:
// Condition 1: e.hash == hash
// true: indicates that the hash value of the current element of the loop is the same as the hash value of the inserted node
/ / condition 2: (= e.k (ek ey) = = key | | (ek! = null && key.equals(ek)))
// true: the current node of the loop is the same as the key of the inserted node
if(e.hash == hash && ((ek = e.key) == key || (ek ! =null && key.equals(ek)))) {
// Assign the value of the current loop element to oldVal
oldVal = e.val;
// The onlyIfAbsent argument passed to the putVal method: Whether to replace data:
// false, if the Map contains the same K and V, the Map will be replaced
// true, the Map does not replace or insert data with the same K and V when put data
if(! onlyIfAbsent) e.val = value;break;
}
// --------------------------------------------------------
/ / CASE8:
// If the current element does not match the key of the inserted element, the following procedure is performed:
// 1. The node traversed by the update loop is the next node of the current node
Check whether the next node is null. If the value is null, the current node is at the end of the queue, and data needs to be appended to the end of the node.
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break; }}}// ------------------------------------------------------------------
// CASE9: f instanceof TreeBin
// If the condition is true, the current bucket bit is the red-black tree agent node TreeBin
// (precondition: the bucket must not be a linked list)
else if (f instanceof TreeBin) {
// p means that putTreeVal returns a reference to a node in a red-black tree that conflicts with the key you inserted.
Node<K,V> p;
// Force binCount to be 2, because binCount <= 1 has other meanings, so set it to 2 (more on this later when we talk about addCount)
binCount = 2;
// The key of the node being inserted is the same as that of a node in the red-black tree.
// putTreeVal inserts a node into the red-black tree, returns null on success, otherwise returns a conflicting node object
if((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) ! =null) {
// Assign the value of the conflicting node to oldVal
oldVal = p.val;
if(! onlyIfAbsent) p.val = value; }}}}// ------------------------------------------------------------------
// CASE10: binCount! = 0
// Indicates that the bucket bit is not null and may be a red-black tree or a linked list
if(binCount ! =0) {
// If binCount>=8, the bucket bit must be a linked list
if (binCount >= TREEIFY_THRESHOLD)
// Because the bucket's list length is greater than 8, we need to tree it:
// Call the method that converts the linked list to a red-black tree
treeifyBin(tab, i);
The data key inserted by the current thread conflicts with the original k-v, and the original data v needs to be returned to the caller.
if(oldVal ! =null)
return oldVal;
break; }}}// Map element data volume accumulation method: has the following additional functions
// 1. Count the number of data in the table
// 2. Check whether the capacity expansion threshold is reached.
addCount(1L, binCount);
return null;
}
Copy the code
\
2, initTable method source code analysis
Initialize the bucket array the first time you place an element:
/** * table initialization */
private final Node<K,V>[] initTable() {
// TAB: references map.table
// sc: sizeCtl temporary value
// sizeCtl: defaults to 0 and controls table status, initialization, and expansion operations:
// sizeCtl<0 specifies the table status:
// (1) =-1, indicating that another thread is initializing. (Other threads can no longer initialize, equivalent to a lock.)
// (2) =-(1 + nThreads) : n threads are being added together.
// sizeCtl>=0 indicates table initialization and capacity expansion operations:
// (3) =0, the default value, which is used when the table is initialized. DEFAULT_CAPACITY --> 16.
// (4) >0, set sizeCtl to the threshold of the initial table capacity or the next capacity expansion after table expansion.
Node<K,V>[] tab; int sc;
// Conditional spin: conditional map.table is not initialized yet...
while ((tab = table) == null || tab.length == 0) {
// -----------------------------------------------------------------------------
// CASE1: sizeCtl) < 0
// sizeCtl < 0 can be one of the following:
// (1) -1 indicates that another thread is initializing the table.
// (2) -(1 + nThreads) : n threads are being added together.
if ((sc = sizeCtl) < 0)
// sizeCtl = -1, indicating that another thread is creating a table, the current thread does not compete for the lock that initializes the table, and the current thread is forced to wait...
Thread.yield();
// -----------------------------------------------------------------------------
// CASE2: sizeCtl) >= 0 and U.compareAndSwapInt(this, sizeCtl, sc, -1) results in true
// U.compareAndSwapInt(this, SIZECTL, sc, -1) : CAS to change the SIZECTL of the current thread to -1,
// sizeCtl returns true if successfully set to -1, false otherwise.
SizeCtl =-1 equals a lock, indicating that the following else if block is occupied by the thread and cannot be accessed by other threads
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// ----------------------------------------------------------------------
// CASE3: Why judge here?
// To prevent other threads from initializing the table, the current thread initializes it again.. Data is lost.
// If this condition is true, no other thread has entered the if block, and the current thread has the right to initialize the table.
if ((tab = table) == null || tab.length == 0) {
// if sc is greater than or equal to 0
// (3) =0, the default value, which is used when the table is initialized. DEFAULT_CAPACITY --> 16.
// (4) >0, set sizeCtl to the threshold of the initial table capacity or the next capacity expansion after table expansion.
// If sc is greater than 0, use sc as the initial capacity of the table.
// Otherwise, use the default value of 16 DEFAULT_CAPACITY
int n = (sc > 0)? sc : DEFAULT_CAPACITY;@SuppressWarnings("unchecked")
// Create a new array nt
Node<K,V>[] nt = (Node<K,V>[])newNode<? ,? >[n];// Assign the new array nt to table and TAB
table = tab = nt;
// Set sc to 0.75N for next hash table expansion
sc = n - (n >>> 2); }}finally {
// Assign sc to sizeCtl in the following two cases:
// the current thread passes both CASE2 and CASE3:
// If the current thread is the thread that created the map.table for the first time, sc is the threshold for the next expansion.
// if the thread passes CASE2, but does not pass CASE3:
// The current thread is not the thread that created the map.table for the first time
// sizeCtl set to -1, sc needs to be changed back to the sc value before CASE2 after the thread ends the above code logic.
sizeCtl = sc;
}
break; }}return tab;
}
Copy the code
Section:
(1) Use CAS lock to control only one thread to initialize bucket array;
(2) sizeCtl stores expansion thresholds after initialization;
(3) The threshold for expansion is 0.75 times the size of the bucket array, which is the capacity of the map, that is, how many elements can be stored at most. \
3. AddCount method (difficult point)
Before reading the addCount method source code, it is best to familiarize yourself with the LongAdder source code: LongAdder source code parsing.
The addCount method increases the number of elements by 1 after each element is added and determines whether the threshold for capacity expansion is reached. If the threshold is reached, expand the capacity or assist in capacity expansion.
/** * Map element data volume accumulation method: has the following additional features * 1. Figure out how much data exists in the table * 2. Determine whether the table reaches the capacity expansion threshold
private final void addCount(long x, int check) {
// as represents longadder.cells
// b stands for longadder.base
// s indicates the number of elements in the current map.table
CounterCell[] as; long b, s;
/ / -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- the current table how many data -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --
// CASE1:
// (as = counterCells) ! = null
// Condition 1: true-> indicates that cells have been initialized and the current thread should use hash addressing to find the appropriate cell to add data to
// false-> indicates that the current thread should add data directly to base (no thread contention)
/ /! U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)
False -> Write base successfully, add data to base, do not create cells
// true-> failed to write base, contention with other threads on base, current thread should try to create cells.
if((as = counterCells) ! =null| |! U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
// How many cases go into the if block?
// True -> indicates that cells have been initialized, and the current thread should use hash addressing to find the appropriate cell to add data to
// If condition 2 is true-> it means that writing to base failed, that there was a race on base with another thread, and that the current thread should try to create cells.
// A indicates the cell matching the hash address of the current thread
CounterCell a;
// v represents the expected value of the current thread when it writes the cell
long v;
// m represents the length of the current cells array
int m;
// true -> No thread contention occurred false-> Thread contention occurred
boolean uncontended = true;
// ---------------------------------------------------------------------------
// CASE2:
A: / / conditions as = = null | | (m = as length - 1) < 0
// true-> indicates that the current thread failed to compete by writing to base (condition 2 of CASE1), and then entered the if block, need to call the fullAddCount method to expand or retry..
// the fullAddCount method is similar to the longadder.longaccumulate method.
/ / condition 2: a = as [ThreadLocalRandom. GetProbe () & m]) = = null precondition: cells has been initialized
// true-> Indicates that the cell table hit by the current thread is empty, and the current thread needs to enter the fullAddCount method to initialize the cell and place it in the current position.
// If (! (uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)
// Precondition: In condition 2, the cell table hit by the current thread is not empty ~
False -> If false is obtained, the current thread successfully updates the matched cell in CAS mode
// true-> If the value is reversed, the current thread fails to update the matched cell in CAS mode. You need to enter fullAddCount for retries or expand cells.
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null| |! (uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) ) { fullAddCount(x, uncontended);// In the case of fullAddCount, the current thread does not participate in the expansion logic, and directly returns to the call point.
return;
}
if (check <= 1)
return;
Sum = Base + cells[0] + cells[0] +... + cells[n], that's an expected value
s = sumCount();
}
/ / -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- determine whether reach the standard expansion threshold, triggering expansion -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --
// CASE3:
// Check >= 0 indicates that a put operation must call the addCount, and check < 0 is the addCount method called by the remove operation
if (check >= 0) {
// TAB indicates map.table
// nt indicates map.nexttable
// n indicates the length of the map.table array
// sc represents the temporary value of sizeCtl
Node<K,V>[] tab, nt; int n, sc;
/** * sizeCtl < 0 * 1. -1 indicates that the current table is being initialized (a thread is creating an array of tables). * 2. Indicates that the current table array is being expanded. 16 bits higher indicates that the identifier of the expansion is 16 bits lower. SizeCtl = 0, indicating that the DEFAULT_CAPACITY of the table array is set to size > 0 * * 1. If the table is not initialized, it indicates the initialization size * 2. If the table is initialized, it indicates the triggering condition (threshold) for the next capacity expansion */
// Conditional spin ~
// if s >= (long)(sc = sizeCtl)
// true -> 1. If the current sizeCtl is a negative value, it indicates that the capacity is being expanded.
// 2. Current sizeCtl is a positive number, indicating the capacity expansion threshold, but S has reached the capacity expansion threshold (need to be expanded).
False -> Indicates that the current table does not reach the capacity expansion threshold (no capacity expansion is required).
// if (TAB = table)! = null constant true
(n = tab.length) < MAXIMUM_CAPACITY
// true -> If the current table length is smaller than the maximum value, you can expand the table.
while (s >= (long)(sc = sizeCtl) && (tab = table) ! =null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
// Obtain the unique identifier of capacity expansion
// eg: 16 -> 32 Expansion id: 1000 0000 0001 1011
// What does that mean?
// All threads that expand the map capacity from 16 to 32 get the unique identifier 1000 0000 0001 1011
int rs = resizeStamp(n);
// --------------------------------------------------------------------------
// CASE4:
// If the condition is true, the table is being expanded. In this case, the current thread should assist the table in expanding the capacity
if (sc < 0) {
// --------------------------------------------------------------
// CASE2: conditions 1~4 are true and the loop is broken
// condition 1 :(sc >>> RESIZE_STAMP_SHIFT)! = rs
// true -> Indicates that the unique capacity expansion id obtained by the current thread is not the current batch capacity expansion id
// false -> Indicates that the unique identification stamp obtained by the current thread is the current batch capacity expansion
Jira == sc == (rs << 16) + 1
// true -> The current thread does not need to participate in the capacity expansion
// false -> Expansion is still in progress, the current thread can participate
// There is a bug in JDK1.8. Jira has been mentioned.
// sc == (rs << 16) + MAX_RESIZERS
// true-> Indicates that the number of concurrent expansion threads reaches the maximum value of 655,5-1
// false-> Indicates that the current thread can participate
// condition 4 :(nt = nextTable) == null
// true -> Indicates that capacity expansion is complete
// false -> Expansion in progress
if((sc >>> RESIZE_STAMP_SHIFT) ! = rs || sc == rs +1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
// If one of conditions 1 to 4 is true, the loop will be broken
break;
// --------------------------------------------------------------
// CASE5:
// Prerequisites: The table is being expanded (that is, CASE4 is not passed) and the current thread has the opportunity to participate in the expansion.
// If the condition is true, the current thread successfully participates in the capacity expansion task. If the value of the lower 16 bits of sc is increased by 1, another thread is working ~
// Condition failed:
// 1. There are currently many threads trying to modify sizeCtl, and another thread succeeded
// Your SC expectation is inconsistent with the memory value, and CAS modification fails. (Next time the probability will not come here ~)
// 2. Transfer task internal threads sizeCtl has also been modified.
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
// Capacity expansion (migrating data) : To assist capacity expansion threads, enter this method with nextTable to assist capacity expansion ~
transfer(tab, nt);
}
// --------------------------------------------------------------------------
// CASE6: update sc as CAS, return true on success, false otherwise
// If the condition is true, it indicates that the current thread is the first ** thread triggering capacity expansion, and some preparatory work needs to be done in the transfer method:
// rs << RESIZE_STAMP_SHIFT: shift the unique identifier of the expansion to the left 16 bits eg:
// 1000 0000 0001 1011 << 16 get 1000 0000 0001 1011 0000 0000 0000 0000 0000 0000 0000
// Immediately after this (RS << RESIZE_STAMP_SHIFT) + 2) operation:
// 1000 0000 0001 1011 0000 0000 0000 0000 + 2
// => 1000 0000 0001 1011 0000 0000 0000 0000 0010
// sizeCtl 16 bits: 1000 0000 0001 1011 Specifies the current capacity expansion identifier
// sizeCtl lower 16 bits: 0000 0000 0010 specifies (1 + nThread), that is, how many threads are currently participating in capacity expansion ~
// Where does n come from?
// If (rs << RESIZE_STAMP_SHIFT) + 2, (rs << resize_shift) + 2
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
// Expand (migrate data) : The thread that triggers the expansion condition does not hold nextTable
transfer(tab, null);
// Recalculate the number of elements in tables = sumCount(); }}}Copy the code
Section:
(1) The storage mode of the number of elements is similar to that of LongAdder class, which is stored in different segments to reduce the conflict when different threads update size at the same time;
(2) When calculating the number of elements, add the values of these segments and baseCount to calculate the total number of elements;
(3) In normal cases, sizeCtl stores capacity expansion threshold, which is 0.75 times of capacity;
(4) For capacity expansion, the number of threads for capacity expansion for low storage is increased by 1 (1+nThreads).
(5) If other threads find expansion after adding elements, they will also join the expansion line;
The above is the method of put storing data and related methods of ConcurrentHashMap source code. Due to the complexity of this method, we will put it in the next article for separate analysis ~