Wechat public account: the landlord’s small black road is far away, the future is better learning is boundless, we come on together!
In this article I will begin introducing ReentrantReadWriteLock to source code analysis.
I will briefly introduce the meaning of variables, and then analyze the process of acquiring and releasing read and write locks.
The relevant variables
ReadLock
WriteLock
Thread firstReader
int firstReaderHoldCount
HoldCounter cachedHoldCounter
ThreadLocalHolderCounter readHolds
static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread());
}
Copy the code
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue(a) {
return new HoldCounter();
}
} Copy the code
Read and write state control
In Sync, which controls the lock logic, you can define whether to use fair or unfair locks. The default is unfair.
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
Copy the code
ReadLock uses shared mode and WriteLock uses exclusive mode.
The same AQS instance can use shared mode and exclusive mode at the same time, WriteLock and ReadLock two locks maintain the same synchronization queue, the synchronization queue only has an int type state variable to represent the current synchronization state, so how to separate the two read and write state, and achieve the purpose of thread control?
You essentially split state into two parts, with the high 16 bits representing read state and the low 16 bits representing write state. So the maximum number that can be expressed is 2^ 16-1 = 65535 times.
So how do you calculate that?
static final int SHARED_SHIFT = 16;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// Count read locks
// Move 16 bits to the right, keeping the high 16 bits
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
// it is the values c and 0x0000FFFF that do the and operation static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } Copy the code
Acquiring and releasing write locks is relatively simple, but we’ll start with this one.
Write lock acquisition and release
Write lock acquisition
When a write lock is added to a lock:
public void lock(a) {
sync.acquire(1);
}
// If the write lock fails to be acquired, it is placed in the synchronous wait queue
public final void acquire(int arg) {
if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } Copy the code
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
// Get the current number of write locks. If it is not 0, it indicates that a thread has acquired the write lock
int w = exclusiveCount(c);
if(c ! =0) { // In both cases, the write lock is denied // 1) w == 0 indicates that no thread is using a write lock, and c! = 0 indicates that a read lock exists // 2) If there is a write lock but it is not the current thread if (w == 0|| current ! = getExclusiveOwnerThread()) return false; // If the number of write locks exceeds the maximum value 2^ 16-1, an exception is thrown if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // CAS is not required here, only write lock reentrant is known here setState(c + acquires); return true; } // There was no thread to acquire the read/write lock, // CAS fails to modify state, and is added to the synchronization queue if (writerShouldBlock() || ! compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; } Copy the code
In fair lock:
// If the current thread has queued threads before it, the write lock cannot be acquired
final boolean writerShouldBlock(a) {
return hasQueuedPredecessors();
}
Copy the code
In an unfair lock:
// You can preempt the thread without checking whether there is a waiting thread
final boolean writerShouldBlock(a) {
return false;
}
Copy the code
Write lock release
public void unlock(a) {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) { Node h = head; if(h ! =null&& h.waitStatus ! =0) unparkSuccessor(h); return true; } return false; } Copy the code
protected final boolean isHeldExclusively(a) {
return getExclusiveOwnerThread() == Thread.currentThread();
}
protected final boolean tryRelease(int releases) {
// If the current thread is not the thread holding the write lock if(! isHeldExclusively()) throw new IllegalMonitorStateException(); // Subtract the write lock value int nextc = getState() - releases; // Number of write locks is 0 boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); // Returns true when all write locks have been released, waking up subsequent nodes return free; } Copy the code
Read lock acquisition and release
Read locks are shared locks
public void lock(a) {
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
// Return a value less than 0 to indicate that no read lock has been acquired, if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } Copy the code
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// If there is a write lock and the holder of the write lock is not the current thread
// Reject attempts to acquire read locks
// It is possible to own a read lock when the current thread holds a write lock, which will be explained later if(exclusiveCount(c) ! =0 && getExclusiveOwnerThread() ! = current) return -1; // Get the current number of read locks int r = sharedCount(c); // The readerShuoldBlock logic is explained later. // When the method can be entered and the CAS is successful if(! readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { // If the current number of read locks is 0, this is the first read if (r == 0) { // Will set the current thread as the first read thread firstReader = current; // The number of first read threads firstReaderHoldCount = 1; // Support reentrant, when the first read thread enters again } else if (firstReader == current) { firstReaderHoldCount++; } else { // Cache the last thread to acquire the read lock HoldCounter rh = cachedHoldCounter; // If the cache is not the current thread, the value of the current thread is retrieved through ThreadLocal and modified if (rh == null|| rh.tid ! = getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); } Copy the code
ReaderShouldBlock is implemented differently for fair and unfair locks
In fair locks: return true if there is a waiting queue, then the lock cannot be acquired directly
final boolean readerShouldBlock(a) {
return hasQueuedPredecessors();
}
Copy the code
In unfair locks: If the first successor to the head in the blocking queue is a write lock, the current thread cannot attempt to acquire the lock
final boolean readerShouldBlock(a) {
return apparentlyFirstQueuedIsExclusive();
}
Return true if the driver of the head node is in exclusive mode
final boolean apparentlyFirstQueuedIsExclusive(a) { Node h, s; return(h = head) ! =null && (s = h.next) ! =null && ! s.isShared() &&s.thread ! =null; } Copy the code
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
// When there is a write lock and the holder is not the current thread
if(exclusiveCount(c) ! =0) { if(getExclusiveOwnerThread() ! = current) return -1; // If there is no write lock, or the write lock holder is the current thread // There is a queue waiting for a fair lock // An unfair lock. The first successor to the wait queue is a write lock } else if (readerShouldBlock()) { // If the thread is present for the first time, it is not processed if (firstReader == current) { // assert firstReaderHoldCount > 0; // If it is another reading thread, get the number of reads of the current thread // If it is 0, it is removed from ThreadLOcalMap // And refuses to attempt to acquire the read lock } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null|| rh.tid ! = getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } // Read lock out of limit, throw exception if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); // CAS updates the status if (compareAndSetState(c, c + SHARED_UNIT)) { // The logic here is the same as before, so I won't go into detail if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null|| rh.tid ! = getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } } Copy the code
The doAcquireShared method is called when fetching fails.
The main logic is to add the node to the synchronous wait queue and then enter the for loop. If the forerunner of the current node is the head node, then call tryAcquireShared to try to obtain the read lock.
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } Copy the code
Read lock release
public void unlock(a) {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } Copy the code
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// If the thread to be released is the current thread
if (firstReader == current) {
Reset to null if the current thread will have no read lock
if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { // Get the number of read locks held by the current thread based on ThreadLocal HoldCounter rh = cachedHoldCounter; if (rh == null|| rh.tid ! = getThreadId(current)) rh = readHolds.get(); int count = rh.count; // If the current thread has no reentrant count, remove it from ThreadLocalMap if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } // CAS resets state to 0 to wake up subsequent nodes for (;;) { int c = getState(); // The lock status will be read int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // There are no read locks or write locks return nextc == 0; } } Copy the code
When the above method returns true, the doReleaseShared method is called.
private void doReleaseShared(a) {
for (;;) {
Node h = head;
if(h ! =null&& h ! = tail) { int ws = h.waitStatus;
if (ws == Node.SIGNAL) { if(! compareAndSetWaitStatus(h, Node.SIGNAL,0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && ! compareAndSetWaitStatus(h,0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } } Copy the code
Wake up the subsequent nodes of the current node. If the subsequent nodes are null, or the status of the subsequent nodes is greater than 0, it indicates that the node is canceled. Therefore, look forward from the tail node to find the earliest node that is not canceled, and then wake up
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for(Node t = tail; t ! =null&& t ! = node; t = t.prev) if (t.waitStatus <= 0) s = t; } if(s ! =null) LockSupport.unpark(s.thread); } Copy the code
ReentrantReadWriteLock Is a lock degradation mechanism for concurrent programming in Java.
This process of acquiring read locks and then releasing write locks is called lock degradation.
So why does a thread acquire a write lock after modifying data, instead of releasing the write lock?
If the current thread releases the write lock directly, then if another thread acquires the write lock and changes the data, the current thread will not be aware of the data change. The purpose of obtaining the read lock first is to ensure that no other thread will modify the data.