preface
Synchronized ReentrantLock ReentrantReadWriteLock Synchronized ReentrantLock ReentrantReadWriteLock
1. ReentrantReadWriteLock structure diagram
2. Called method diagram
3. Obtain the shared lock
- ReadLock lock method, source:
public void lock(a) {
//Sync inherits AQS. This method is implemented in AQS
sync.acquireShared(1);
}
Copy the code
- AcquireShared method in AQS, source code as follows
public final void acquireShared(int arg) {
// Try to obtain the lock. If the lock succeeds, return. If it fails, execute the doAcquireShared method to obtain the lock
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
Copy the code
- TryAcquireShared () defined in ReentrantReadWriteLock. Java’s Sync, source code is as follows:
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
// In read-write lock mode, the higher 16 bits store the number of times a shared lock (read lock) was acquired, and the lower 16 bits store the number of times a mutex lock (write lock) was acquired
int c = getState();
// Returns -1 if the exclusive lock (write lock) has been acquired and the thread that acquired the exclusive lock is not the current thread
// If an exclusive lock is acquired by the current thread, the current thread can also acquire the read lock, and the lock degrades
if(exclusiveCount(c) ! =0&& getExclusiveOwnerThread() ! = current)return -1;
// Get the share count for read lock
int r = sharedCount(c);
// If no blocking wait is required and the share count of "read lock" is less than MAX_COUNT;
// The CAS function updates the share count for "read lock" +1
if(! readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {// Get the read lock for the first time
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
// If the current thread is the first thread to acquire the lock
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
// HoldCounter counts the number of times the current thread has acquired the "read lock"
// The following lines set cachedHoldCounter to the current thread
HoldCounter rh = cachedHoldCounter;
if (rh == null|| rh.tid ! = getThreadId(current))// Whether cachedHoldCounter caches the current thread. If not, go to ThreadLocal
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
// If the condition fails, the method is entered
return fullTryAcquireShared(current);
}
Copy the code
- FullTryAcquireShared () in ReentrantReadWriteLock. Java’s Sync, source code is as follows:
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
// If the lock is held by a write lock, and the thread that acquired the lock is not the current thread; Returns -1.
if(exclusiveCount(c) ! =0) {
if(getExclusiveOwnerThread() ! = current)return -1;
// Need to block wait
} else if (readerShouldBlock()) {
// Write lock is released and read lock is blocked
// Reentrant ()
if (firstReader == current) {
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null|| rh.tid ! = current.getId()) { rh = readHolds.get();if (rh.count == 0) readHolds.remove(); }}// If the count of locks acquired by the current thread =0, return -1 to queue.
if (rh.count == 0)
return -1; }}if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// The number of times the thread acquires the "read lock" +1.
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null|| rh.tid ! = current.getId()) rh = readHolds.get();else if (rh.count == 0)
readHolds.set(rh);
// The update thread acquires the "read lock" share count
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1; }}}Copy the code
- DoAcquireShared () is defined in the AQS function, source code:
private void doAcquireShared(int arg) {
// addWaiter(node.shared)
// Create the node corresponding to "current thread" and add the thread to the CLH queue
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// Get a node from the current node
final Node p = node.predecessor();
// If p is the head node, try to obtain the shared lock.
if (p == head) {
// Here is the tryAcquireShared method analysed above.
// The doAcquireShared method is just a call to tryAcquireShared in the loop to try to get the shared lock
int r = tryAcquireShared(arg);
// If successful
if (r >= 0) {
// The head node moves back and propagates
// Propagation wakes up successive read nodes
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return; }}If the thread has been interrupted during the blocking wait, set interrupted to true.
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true; }}finally {
if(failed) cancelAcquire(node); }}Copy the code
4. Release the shared lock
- ReadLock unLock ();
public void unlock(a) {
//Sync inherits AQS. This method is implemented in AQS
sync.releaseShared(1);
}
Copy the code
- ReleaseShared () ¶
public final boolean releaseShared(int arg) {
// This is the same as acquiring a lock:
// Use tryReleaseShared() to try to release the shared lock
// If doReleaseShared fails, the shared lock is released
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
Copy the code
- TryReleaseShared () defined in ReentrantReadWriteLock. Java’s Sync, source code is as follows:
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// If the current thread is the first thread to acquire the lock
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
// If firstReader is equal to 1, the lock will not be held after this release, and firstReader will be null for subsequent threads
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
// Check whether cachedHoldCounter caches the current thread. If not, go to ThreadLocal
HoldCounter rh = cachedHoldCounter;
if (rh == null|| rh.tid ! = current.getId()) rh = readHolds.get();int count = rh.count;
if (count <= 1) {
// This step removes ThreadLocal to prevent memory leaks. Because the read lock is no longer held
readHolds.remove();
if (count <= 0)
// Here, lock() once, unlock() several times
throw unmatchedUnlockException();
}
// The number of times the current thread acquires the "read lock" -1
--rh.count;
}
for (;;) {
// Get the lock status
int c = getState();
// Set the lock count to -1.
int nextc = c - SHARED_UNIT;
// Update the lock status through CAS.
if (compareAndSetState(c, nextc))
return nextc == 0; }}Copy the code
- DoReleaseShared () is defined in AQS.
private void doReleaseShared(a) {
for (;;) {
// Get the head node of the CLH queue
Node h = head;
// If the head node is not null and the head node is not the last node in the queue
if(h ! =null&& h ! = tail) {// Get the thread state corresponding to the head node
int ws = h.waitStatus;
// Check whether the head node is in SIGNAL state
// If so, the thread corresponding to the next node needs to be woken up.
if (ws == Node.SIGNAL) {
// Set thread state of the head node to 0. If it fails, the cycle continues.
if(! compareAndSetWaitStatus(h, Node.SIGNAL,0))
continue;
// Wake up the thread corresponding to the next node of the head node.
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// If the head node changes, the loop continues. Otherwise, exit the loop.
if (h == head) // loop if head changed
break; }}Copy the code
5, the instance,
class CachedData {
Object data;
volatile boolean cacheValid;
// Read and write the lock instance
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData(a) {
// Get read lock
rwl.readLock().lock();
if(! cacheValid) {// If the cache is expired, or null
// Unlock the read lock and then acquire the write lock.
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
if(! cacheValid) {// Check again, because while waiting for the write lock, another writer thread may have executed before
data = ...
cacheValid = true;
}
// Acquire read locks (if a write lock is held, it is allowed to acquire read locks, called "lock degradation", and vice versa.)
rwl.readLock().lock();
} finally {
// Release the write lock, there is still a read lock
rwl.writeLock().unlock(); // Unlock write, still hold read}}try {
use(data);
} finally {
// Release the read lockrwl.readLock().unlock(); }}}Copy the code
conclusion
This article to this ReentrantReadWriteLock shared lock acquisition and release source code is finished analysis, the next will be Semaphore analysis.
If you found this post helpful, please give it a thumbs up and a follow.