This is the third day of my participation in the August More text Challenge. For details, see:August is more challenging
LongAdder is similar to AtomicLong, but with multithreaded updates LongAdder has higher performance. LongAdder is more suitable for statistics scenarios, such as monitoring statistics, count statistics, etc. For example, if we want to implement a word counter, we can do it with the following code
// Define a ConcurrentHashMap to hold the mapping between the word and the count. Key is the word and value is LongAdder
private Map<String, LongAdder> wordCounterMap = new ConcurrentHashMap<>();
// This is the word count operation
public void addCount(String word) {
// Read it once first. This reduces lock conflicts in the case of word repetition, because computeIfAbsent has locks inside it
LongAdder counter = wordCounterMap.get(word);
if (counter == null) {
// If the current map does not have a Word map, create the map using computeIfAbsent atomicity
counter = wordCounterMap.computeIfAbsent(word, new Function<String, LongAdder>() {
@Override
public LongAdder apply(String s) {
return newLongAdder(); }}); }// Call the corresponding counter. Add method atomically incrementally
counter.add(1L);
}
Copy the code
The LongAdder class inherits from Striped64. There are no fields in the LongAdder class. The state is stored in Striped64. LongAdder encapsulates add and sum methods. Add changes the count and sum reads the count.
Striped64 adopts the sharding idea to improve the concurrency. It stores a long base field and a Cell array, and each Cell also has a Value field. When there is no conflict, the CAS base field is updated. When there is a conflict, the cell array is updated. When there is a conflict, the cell array is expanded to reduce the conflict.
LongAdder’s add update logic is
public class LongAdder extends Striped64 {
public void add(long x) {
Cell[] cs; long b, v; int m; Cell c;
// The cells statement is null. If the cells statement is not null, it indicates that a multi-threaded CAS base conflict has occurred
// If the value is null, the cas base value is attempted. If the update is successful, the cas base value is returned. Otherwise, the cas base value goes to the if statement
if((cs = cells) ! =null| |! casBase(b = base, b + x)) {boolean uncontended = true;
// If the cells are empty or cas updates the current thread for a cell value conflict, then the longAccumulate method is called. The longAccumulate method is responsible for the initialization, expansion, and attempt to reduce the conflict
if (cs == null || (m = cs.length - 1) < 0 ||
(c = cs[getProbe() & m]) == null| |! (uncontended = c.cas(v = c.value, v + x))) longAccumulate(x,null, uncontended); }}// The sum method is simple. Add the values of the base and all the cells.
public long sum(a) {
Cell[] cs = cells;
long sum = base;
if(cs ! =null) {
for (Cell c : cs)
if(c ! =null)
sum += c.value;
}
returnsum; }}Copy the code
Looking at class Striped64, we defined a Cell class and annotated it with @Contended to avoid pseudo sharing problems. A value field in a Cell indicates the value of the current Cell. All the cells and bases add up to the final value. The Cell class also provides cas methods, internally implemented through the VarHandle class, which is similar to the previous version of Unsafe. The Stripe64 class then defines the Cells array, the base variable, and the cellsBusy spin-lock field. The cell array is used to store the values within each fragment. The base variable is used when no threads are competing, reducing memory usage. CellsBusy acts as a simple spin lock, and when a thread changes from 0 to 1 via CAS, the thread has acquired the lock.
abstract class Striped64 extends Number {
@jdk.internal.vm.annotation.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return VALUE.compareAndSet(this, cmp, val);
}
final void reset(a) {
VALUE.setVolatile(this.0L);
}
final void reset(long identity) {
VALUE.setVolatile(this, identity);
}
final long getAndSet(long val) {
return (long)VALUE.getAndSet(this, val);
}
// VarHandle mechanics
private static final VarHandle VALUE;
static {
try {
MethodHandles.Lookup l = MethodHandles.lookup();
VALUE = l.findVarHandle(Cell.class, "value".long.class);
} catch (ReflectiveOperationException e) {
throw newExceptionInInitializerError(e); }}}Copy the code
static final int NCPU = Runtime.getRuntime().availableProcessors();
/** * Table of cells. When non-null, size is a power of 2. */
transient volatile Cell[] cells;
/** * Base value, used mainly when there is no contention, but also as * a fallback during table initialization races. Updated via CAS. */
transient volatile long base;
/** * Spinlock (locked via CAS) used when resizing and/or creating Cells. */
transient volatile int cellsBusy;
Copy the code
The core method of class Striped64 is longAccumulate, which will try to find the cell corresponding to the current thread, and complete the logic such as cells array initialization, cell object initialization, capacity expansion, probe value adjustment and conflict resolution when necessary. The flow chart is as follows
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
/ / by the current Thread probe value judgment without initialization, call ThreadLocalRandom. The current () method to trigger the initialization, set the probe after initialization values.
if ((h = getProbe()) == 0) {
// int p = probeGenerator.addAndGet(PROBE_INCREMENT);
// int probe = (p == 0) ? 1 : p; // skip 0
// U.putInt(t, PROBE, probe);
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
done: for (;;) {
Cell[] cs; Cell c; int n; long v;
// If the current cells are not empty, cells will be used first for update
// TOOD: cells! Cells. Length == 0?
if((cs = cells) ! =null && (n = cs.length) > 0) {
// Use probe and n-1 to find the subscript of the cells array corresponding to the current thread. If the subscript is empty, we need to initialize the Cell object by locking it
if ((c = cs[(n - 1) & h]) == null) {
// Optimistic lock judgment
if (cellsBusy == 0) { // Try to attach new Cell
// Create the Cell object optimally. Why create the Cell object here instead of after the lock is acquired below? The main purpose is to reduce the operation overhead in locking and reduce the lock granularity
Cell r = new Cell(x); // Optimistically create
// Get the spin lock
if (cellsBusy == 0 && casCellsBusy()) {
try { // Recheck under lock
// After casCellsBusy has obtained the lock, you need to check again, because another thread may have executed the following code before casCellsBusy.
Cell[] rs; int m, j;
Cells are not null and length is not greater than 0. Can cells be set to null again after an if comes in from the outer layer?
if((rs = cells) ! =null &&
(m = rs.length) > 0 &&
// Check whether the corresponding cell has been created
rs[j = (m - 1) & h] == null) {
// If not, use the cell we created with the initial value x to assign the value
rs[j] = r;
// Exit the loop
breakdone; }}finally {
/ / releases the lock
cellsBusy = 0;
}
continue; // Slot is now non-empty
}
}
collide = false;
}
// Cells is created when wasUncontended is true
If add cell CAS fails and wasUncontended is false, then cas will not be tried again
else if(! wasUncontended)// CAS already known to fail
wasUncontended = true; // Continue after rehash
// Perform an attempt on the current CAS cell
else if (c.cas(v = c.value,
(fn == null)? v + x : fn.applyAsLong(v, x)))break;
// If cas fails and the current array number is greater than or equal to the number of cpus or has been expanded, then
else if(n >= NCPU || cells ! = cs)If the size of the cell array is greater than or equal to the number of cpus on it, or if it was already tapped by another thread, reset the "primed" variable to make it "not collide"
collide = false; // At max size or stale
else if(! collide)If "primed" is set to "false", the probe is changed to "primed". If "primed" is set to "false", the probe is changed to "primed". If "primed" is set to "false", the probe is changed to "primed".
collide = true;
// If there is a collision with another thread, it will be expanded
else if (cellsBusy == 0 && casCellsBusy()) {
try {
// Double check to prevent other threads from already making changes
if (cells == cs) // Expand table unless stale
// Expand the Cells array to double the current size. Unlike HashMap, the cells array does not need to transfer data, because Striped64 represents the sum.
cells = Arrays.copyOf(cs, n << 1);
} finally {
/ / releases the lock
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
// Try to reduce the conflict by changing the probe value of the current thread, like a random number
h = advanceProbe(h);
}
// When cells are empty, the cells array needs to be initialized. The cellsBusy variable functions as a spin-lock, and the successful CAS cellsBusy threads from 0 to 1 are responsible for initializing the cells array
else if (cellsBusy == 0 && cells == cs && casCellsBusy()) {
try { // Initialize table
CellsBusy ==0 && cells==cs (casCellsBusy ==0 && cells==cs
// If this is the case, cells and cs are not the same value, then the next loop will be processed in the above if statement
if (cells == cs) {
// The default size of the array is 2
Cell[] rs = new Cell[2];
// Use prob to obtain the cell corresponding to the current thread, and create the cell object
rs[h & 1] = new Cell(x);
// Assign the newly created array to the Cells field Striped64
cells = rs;
breakdone; }}finally {
/ / releases the lock
cellsBusy = 0; }}If cas is successful, return, otherwise continue the retry cycle
// The time of the occurrence
// Fall back on using base
else if (casBase(v = base,
(fn == null)? v + x : fn.applyAsLong(v, x)))breakdone; }}}Copy the code
AtomicLong and LongAdder options
- In most cases AtomicLong will do the job. With AtomicLong we can implement atomic updates (incrementAndGet, compareAndSet, etc.), thread safety, etc
- When there are more writes than reads, and there may be more write races, LongAdder can be used in monitoring scenarios such as request count statistics
- AtomicLong is more suitable for memory footprint sensitive situations
More References
- Optimistic lock, pessimistic lock, CAS lock, spin lock, do you know?