Introduction to the
ReentrantReadWriteLock supports read/write locks, and StampedLock supports write locks, pessimistic locks, and optimistic locks (without locks). The semantics of write lock and pessimistic read lock are the same as those of ReentrantReadWriteLock. Multiple threads are allowed to obtain the pessimistic read lock at the same time, but only one thread is allowed to obtain the write lock. The write lock and pessimistic read lock are mutually exclusive.
StampedLock write locks and pessimistic read locks return a stamp after being successfully locked. Then when unlocking, you need to pass in the stamp.
The following is an official example
public class Point {
private final StampedLock sl = new StampedLock();
private double x, y;
void move(double deltaX, double deltaY) {
long stamp = sl.writeLock();
try {
x += deltaX;
y += deltaY;
} finally{ sl.unlockWrite(stamp); }}double distanceFromOrigin(a) {
long stamp = sl.tryOptimisticRead();
double currentX = x, currentY = y;
if(! sl.validate(stamp)) { stamp = sl.readLock();try {
currentX = x;
currentY = y;
} finally{ sl.unlockRead(stamp); }}// If you need to keep a mutex, use a mutex, if you don't
// Use read/write locks. Caches generally do not need to remain mutually exclusive and can accept transient inconsistencies
return Math.sqrt(currentX * currentX + currentY * currentY);
}
StampedLock supports degradation of locks (via tryConvertToReadLock()) and upgrade (via tryConvertToWriteLock())
void moveIfAtOrigin(double newX, double newY) {
long stamp = sl.readLock();
try {
while (x == 0.0 && y == 0.0) {
long ws = sl.tryConvertToWriteLock(stamp);
if(ws ! =0L) {
stamp = ws;
x = newX;
y = newY;
break;
} else{ sl.unlockRead(stamp); stamp = sl.writeLock(); }}}finally{ sl.unlock(stamp); }}}Copy the code
Primary data structure
Although StampedLock does not use AQS, it does use CLH queues in its data structure.
WNode is the node of the CLH queue, and its source code is as follows
static final class WNode {
// The precursor node
volatile WNode prev;
// Subsequent nodes
volatile WNode next;
// Get the list of read locks
volatile WNode cowait;
/ / thread
volatile Thread thread;
// Node status. 0, WAITING, or CANCELLED
volatile int status;
// Read or write mode. RMODE or WMODE
final int mode;
WNode(intm, WNode p) { mode = m; prev = p; }}Copy the code
The important attributes of StampedLock are as follows
//CLH queue header
private transient volatile WNode whead;
// CLH queue end node
private transient volatile WNode wtail;
/ / lock state
private transient volatile long state;
// An extra counter for the number of lock reads
private transient int readerOverflow;
// The number of bits to read the lock
private static final int LG_READERS = 7;
// Calculate the state constant
private static final long RUNIT = 1L;
// Write lock flag bit, decimal: 128 binary: 1000 0000
private static final long WBIT = 1L << LG_READERS;
// Read status flag bit. Decimal :127 Binary: 0111 1111
private static final long RBITS = WBIT - 1L;
// state indicates that the read lock is almost full,126
private static final long RFULL = RBITS - 1L;
// Get read/write status. Decimal: 255 Binary: 1111 1111
private static final long ABITS = RBITS | WBIT;
/ / - 128 (1... 1, 1000, 0000)
private static final long SBITS = ~RBITS;
// Initial value of state 256 binary: 00001 0000 0000
private static final long ORIGIN = WBIT << 1;
// Lock status. Use state to control whether the lock is currently read or write
private transient volatile long state;
// Additional read lock counters
private transient int readerOverflow;
// state the initial value is 256
public StampedLock(a) {
state = ORIGIN;
}
Copy the code
Those of you who have seen my previous analysis of AQS should be familiar with this CLH. The same stae variable is used to determine whether stampedlocks are read or written. This state holds three pieces of content
StampedLock state is of type long, 64-bit, and is used in three parts. The lower 7 bits serve as the read lock flag bit and can be shared by multiple threads. For each thread that successfully applies for a read lock, the lower 7 bits add 1. Bit 8 is the write lock bit, which is exclusively owned by the thread. The remaining bits are stamp bits, which record changes in the write lock state (version number). Each time a write lock is used, the stamp bit is incremented by 1.
Also, if the number of locks read exceeds 126, the number of times exceeded is counted using readerOverflow.
What does the CLH queue look like when concurrency occurs?
For example, thread W1 has acquired a write lock and has not released it. At this time, four threads acquire the read lock respectively (R0- >R1- >R2- >R3), another thread W2 acquire the write lock, and finally three threads R4,R5, and R6 acquire the read lock, so the queuing situation of this queue is as follows
Because the read lock can be acquired by multiple threads, if multiple threads attempt to acquire the read lock at the same time but fail to obtain it, the first thread that obtains the read lock will be added to the linked list, and the subsequent thread will be added to the cowait stack. Cowait can be considered as a sub-chain.
Cowait can be understood as cooperative wait, which means that the threads that acquire the read lock as a whole acquire the lock.
Get optimistic read lock
public long tryOptimisticRead(a) {
long s;
return (((s = state) & WBIT) == 0L)? (s & SBITS) :0L;
}
Copy the code
Optimistic read lock logic is relatively simple, which is to determine whether the write lock is occupied (determine whether the value of the 8th bit of state is 1). If the write lock is occupied, 0 is returned; otherwise, stamp bit is returned.
The initial value of state is 256(1 0000 0000) 01000 0000(WBIT) & 10000 0000(state) ============== 00000 0000 10000 0000(state) 1 1000 0000(SBITS) ============== 1 0000 0000Copy the code
Detect optimistic read locks
public boolean validate(long stamp) {
// Inserting memory barriers via UNSafe disables reordering
U.loadFence();
return (stamp & SBITS) == (state & SBITS);
}
Copy the code
Returns true: there is no write lock during the period and no read lock is cared about.
Return false: a write lock occurred during the period
The SBITS value is -128, which is expressed in binary: 1111 1111 1111 1000 0000
x xxxx xxxx(stamp)
1 1000 0000
1 yyyy yyyy(state)
1 1000 0000
Copy the code
The last 7 bits of SBITS are all 0, that is, we do not care about the read lock, we only care about the stamp bit and write bit.
When we get optimistic reads, if there is already a write lock, then stamp value 0 is returned, which must be false for validation.
Gets a normal read lock
public long readLock(a) {
long s = state, next;
// if whead == wtail, the queue is empty, indicating that no thread is currently queuing
return ((whead == wtail && (s & ABITS) < RFULL &&
U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
// The first argument to acquireRead is false, indicating that thread interrupts are not handled
next : acquireRead(false.0L));
}
Copy the code
When the read lock is acquired successfully, the state value is incremented by one. When the condition is not met (the queue is not empty, the CAS failed, or the number of read locks is 126 or greater), the acquireRead() method is entered, which is divided into two large for loops that I won’t post here.
Its main logic is as follows:
- If there are no queued threads on the current queue and the thread is the first element to be queued, there is a good chance of acquiring the lock, so spin attempts to acquire the lock first (spin count 64). If it does, increment the state value by 1. If the number of lock reads has exceeded 126, use readerOverflow to record the number of read locks exceeded. If no lock is obtained, go to Step 2
private long tryIncReaderOverflow(long s) {
// If the number of read locks is 126,
if ((s & ABITS) == RFULL) {
// Increment state by 1
// The last seven bits of state after CAS are 127, and the entire value of state is 383(1 0111 1111).
if (U.compareAndSwapLong(this, STATE, s, s | RBITS)) {
// Increment the value of readOverflow by 1
++readerOverflow;
state = s;
returns; }}else if ((LockSupport.nextSecondarySeed() &
OVERFLOW_YIELD_RATE) == 0)
Thread.yield();
return 0L;
}
Copy the code
- Initialize the queued nodes to form a linked list relationship. As with AQS, the head node is treated as a sentinel node or a thread that is running while holding the lock (virtual). Therefore, thread on the head node is null, and mode is WMODE. This will form
Head –> node <– tail node relationship
-
If a new thread fails to acquire the read lock, the new thread is added to the COwait stack
-
A thread that is not the first to acquire a read lock is added to the cowait stack and then acquired the lock through CAS. If it fails to acquire the lock, it is blocked, waiting to be woken up or cancelled due to a timeout interrupt.
-
The first thread to acquire the read lock is given preferential treatment because there are no queued threads in front of it, so this layer is still spinning to acquire the lock, but the number of spins is increased. The first thread will spin 1024 times to acquire the lock, and if it has not already acquired the lock, at spin 2048.
If the lock has not been released, the current thread blocks and waits to be woken up until the lock is acquired or the timeout interrupt is cancelled.
Steps 1-4 belong to one big cycle, and steps 5 belong to another big cycle.
At the same time, when the thread that acquires the lock is woken up, it also wakes up the thread in the COwait stack hanging on it.
It can be seen from the analysis that StampedLock can avoid thread blocking to a certain extent through a large amount of spin operations. As long as the thread performs the operation fast enough and releases the lock in a timely manner, it can be said that there is almost no blocking.
Release read lock
The code to release the read lock is relatively simple. The main operations are as follows
- First determine whether stamp is legal, if not the law to throw
IllegalMonitorStateException
abnormal - If the number of read locks is less than 126, the state value is subtracted by 1 through CAS. If the last read lock is released, the node in the queue needs to be woken up.
- If the number of read locks has overflowed, subtract 1 from readerOverflow
public void unlockRead(long stamp) {
long s, m; WNode h;
/ / spin
for (;;) {
// Check whether stamp is valid
if(((s = state) & SBITS) ! = (stamp & SBITS) || (stamp & ABITS) ==0L || (m = s & ABITS) == 0L || m == WBIT)
throw new IllegalMonitorStateException();
// The number of read locks is less than 126
if (m < RFULL) {
if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
// To release the last read lock, the next node needs to wake up
if(m == RUNIT && (h = whead) ! =null&& h.status ! =0)
release(h);
break; }}// The number of read locks is saturated with overflow
else if(tryDecReaderOverflow(s) ! =0L)
break; }}Copy the code
Access to write lock
public long writeLock(a) {
long s, next;
return ((((s = state) & ABITS) == 0L &&
U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
next : acquireWrite(false.0L));
}
Copy the code
If bit 8 in state is equal to 0 and CAS set state successfully, the lock was acquired, otherwise enter the acquireWrite method.
AcquireWrite logic is also divided into two parts, two for loops.
The first for loop has the following logic
for (int spins = -1;;) { // spin while enqueuing
long m, s, ns;
// If the current write lock flag bit is still 0, no other thread has acquired the write lock
if ((m = (s = state) & ABITS) == 0L) {
// CAS increses state to 128
if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))
return ns;
}
// If another thread has acquired the write lock, set the spin count to 64
else if (spins < 0)
spins = (m == WBIT && wtail == whead) ? SPINS : 0;
else if (spins > 0) {
// Randomly reduce the number of spins
if (LockSupport.nextSecondarySeed() >= 0)
--spins;
}
// If the spin still does not acquire the write lock, execute the following logic
// Initialize the CLH queue (whead,wtail)
// p points to the wtail node
else if ((p = wtail) == null) {
WNode hd = new WNode(WMODE, null);
if (U.compareAndSwapObject(this, WHEAD, null, hd))
wtail = hd;
}
// Initialize the node
else if (node == null)
node = new WNode(WMODE, p);
Whead --> node <-- wtail
else if(node.prev ! = p) node.prev = p;else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
p.next = node;
// The spin ends after the linked list relationship is formed
break; }}Copy the code
The second for loop is also longer, so I won’t post it. Its core logic is as follows
-
If only the current thread is still waiting for the lock, it spins 1024 times to acquire the write lock. If it fails, it spins 2048 times to acquire the write lock again. If both fail, proceed to the second step.
-
Block the current thread waiting to be woken up
Release the write lock
public void unlockWrite(long stamp) {
WNode h;
// Verify stamp
if(state ! = stamp || (stamp & WBIT) ==0L)
throw new IllegalMonitorStateException();
// Release write lock, stamp bit (version number) increased by 1
// stamp+WBIT sets the 8th position of state to 0, which is equivalent to releasing the write lock
state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
// If the header is not null and the state is not zero, call release to wake up the next node
if((h = whead) ! =null&& h.status ! =0)
release(h);
}
private void release(WNode h) {
if(h ! =null) {
WNode q; Thread w;
U.compareAndSwapInt(h, WSTATUS, WAITING, 0);
// If the next node of the header is empty or its state is cancelled
if ((q = h.next) == null || q.status == CANCELLED) {
// Walk forward from the tail node to find available nodes
for(WNode t = wtail; t ! =null&& t ! = h; t = t.prev)if (t.status <= 0)
q = t;
}
// Wake up the thread where the Q node resides
if(q ! =null&& (w = q.thread) ! =null) U.unpark(w); }}Copy the code
The process for releasing write locks is summarized below
- Set position 8 of state to 0(release write lock) and add 1 to stamp bit (version)
- Wake up the next node
Lock degradation (tryConvertToReadLock)
The tryConvertToReadLock method can be used to degrade a lock. However, this method is not called only when the read lock is acquired again.
public long tryConvertToReadLock(long stamp) {
// A is the lock id, and m is the latest lock id
long a = stamp & ABITS, m, s, next; WNode h;
// The state write lock bit is the same as the version number.
while (((s = state) & SBITS) == (stamp & SBITS)) {
// No lock identifier has been added yet
if ((m = s & ABITS) == 0L) {
if(a ! =0L)
break;
// The number of lock reads is less than 126
else if (m < RFULL) {
// Increment state by 1
if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT))
return next;
}
// The number of lock reads has been exceeded, use additional fields to record
else if((next = tryIncReaderOverflow(s)) ! =0L)
return next;
}
// The current write lock state
else if (m == WBIT) {
if(a ! = m)break;
// Add 1 to read lock count, 0 to write lock mark, 1 to stamp position. Add 129
state = next = s + (WBIT + RUNIT);
/ / releases the lock
if((h = whead) ! =null&& h.status ! =0)
release(h);
return next;
}
// The lock is read
else if(a ! =0L && a < WBIT)
return stamp;
else
break;
}
// Conversion failed
return 0L;
}
Copy the code
Its main logic is as follows: if the current thread has not yet added any locks, add a write lock and return the latest stamp value. If the current thread already has a write lock, the write lock is released and the value of state is updated (read lock + 1, write lock status set to 0, version + 1). If the current thread is a read lock, the stamp value is returned. If none of the above conditions is met, the conversion fails.
Lock upgrade (tryConvertToWriteLock)
public long tryConvertToWriteLock(long stamp) {
long a = stamp & ABITS, m, s, next;
// The state write lock bit is the same as the version number.
while (((s = state) & SBITS) == (stamp & SBITS)) {
// No lock identifier has been added yet
if ((m = s & ABITS) == 0L) {
if(a ! =0L)
break;
// Set the write lock position to 0
if (U.compareAndSwapLong(this, STATE, s, next = s + WBIT))
return next;
}
// If the write lock is already in the write lock state, it is returned directly
else if (m == WBIT) {
if(a ! = m)break;
return stamp;
}
// If it is currently the only read lock, it is converted to a write lock
else if(m == RUNIT && a ! =0L) {
if (U.compareAndSwapLong(this, STATE, s,
next = s - RUNIT + WBIT))
return next;
}
else
break;
}
return 0L;
}
Copy the code
The logic for upgrading locks is similar to that for degrading locks, which I won’t read here.
Summary and Attention
-
Instead of using AQS, StampedLock relies on its own implementation of synchronization state (long state 64-bit) and mutated CLH queues
-
StampedLock uses the lower 7 bits of state to identify the number of read locks (those beyond 126 are recorded using the readerOverflow field), the eighth bit to identify the write lock, and the higher 56 bits to record the version of the lock. Each time the version of the write lock is released/acquired, the version number is increased by 1
-
In StampedLock, read locks and read locks do not block. Read locks and write locks block each other, and write locks and write locks also block each other
-
StampedLock is a sequence of multiple read-lock threads, with only the first one in the CLH queue and the subsequent ones hanging in the cowait stack of the first thread
-
After StampedLock wakes up the first reader thread, the reader thread wakes up all readers of its cowait stack (in the acquireRead() method)
-
StampedLock does not support fair locking, nor does Condition
-
StampedLock supports lock degradation and lock upgrade
-
Optimistic read operations in StampedLock are lockless
-
StampedLock uses a lot of spin +CAS operations and is suitable for tasks that hold locks for short periods of time, which not only wastes CPU but still blocks itself.
-
When a thread blocks on StampedLock’s readLock() or writeLock(), calling interrupt() on that blocking thread causes the CPU to spike. Therefore, StampedLock must not call interrupt operations. If you want to support interrupt functions, always use interruptible pessimistic read locks readLockInterruptibly() and writeLockInterruptibly().