Wechat public account: the landlord’s small black road is far away, the future is better learning is boundless, we come on together!

In this article I will begin introducing ReentrantReadWriteLock to source code analysis.

I will briefly introduce the meaning of variables, and then analyze the process of acquiring and releasing read and write locks.

The relevant variables

ReadLock



WriteLock



Thread firstReader



int firstReaderHoldCount
HoldCounter cachedHoldCounter



ThreadLocalHolderCounter readHolds

static final class HoldCounter {
       int count = 0;
       // Use id, not reference, to avoid garbage retention
       final long tid = getThreadId(Thread.currentThread());
}
Copy the code
static final class ThreadLocalHoldCounter
            extends ThreadLocal<HoldCounter> {
            public HoldCounter initialValue(a) {
                return new HoldCounter();
            }
} Copy the code

Read and write state control

In Sync, which controls the lock logic, you can define whether to use fair or unfair locks. The default is unfair.

public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
}
Copy the code

ReadLock uses shared mode and WriteLock uses exclusive mode.

The same AQS instance can use shared mode and exclusive mode at the same time, WriteLock and ReadLock two locks maintain the same synchronization queue, the synchronization queue only has an int type state variable to represent the current synchronization state, so how to separate the two read and write state, and achieve the purpose of thread control?

You essentially split state into two parts, with the high 16 bits representing read state and the low 16 bits representing write state. So the maximum number that can be expressed is 2^ 16-1 = 65535 times.

So how do you calculate that?

static final int SHARED_SHIFT   = 16;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// Count read locks
// Move 16 bits to the right, keeping the high 16 bits
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
// it is the values c and 0x0000FFFF that do the and operation static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } Copy the code

Acquiring and releasing write locks is relatively simple, but we’ll start with this one.

Write lock acquisition and release

Write lock acquisition

When a write lock is added to a lock:

public void lock(a) {
            sync.acquire(1);
}
// If the write lock fails to be acquired, it is placed in the synchronous wait queue
public final void acquire(int arg) {
 if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))  selfInterrupt(); } Copy the code
protected final boolean tryAcquire(int acquires) {     
       Thread current = Thread.currentThread();
       int c = getState();
       // Get the current number of write locks. If it is not 0, it indicates that a thread has acquired the write lock
       int w = exclusiveCount(c);
 if(c ! =0) {  // In both cases, the write lock is denied  // 1) w == 0 indicates that no thread is using a write lock, and c! = 0 indicates that a read lock exists  // 2) If there is a write lock but it is not the current thread  if (w == 0|| current ! = getExclusiveOwnerThread()) return false;  // If the number of write locks exceeds the maximum value 2^ 16-1, an exception is thrown  if (w + exclusiveCount(acquires) > MAX_COUNT)  throw new Error("Maximum lock count exceeded");  // CAS is not required here, only write lock reentrant is known here  setState(c + acquires);  return true;  }  // There was no thread to acquire the read/write lock,  // CAS fails to modify state, and is added to the synchronization queue  if (writerShouldBlock() || ! compareAndSetState(c, c + acquires)) return false;  setExclusiveOwnerThread(current);  return true; } Copy the code

In fair lock:

// If the current thread has queued threads before it, the write lock cannot be acquired
final boolean writerShouldBlock(a) {
            return hasQueuedPredecessors();
}
Copy the code

In an unfair lock:

// You can preempt the thread without checking whether there is a waiting thread
final boolean writerShouldBlock(a) {
      return false; 
}
Copy the code

Write lock release

public void unlock(a) {
       sync.release(1);
}

public final boolean release(int arg) {
 if (tryRelease(arg)) {  Node h = head;  if(h ! =null&& h.waitStatus ! =0)  unparkSuccessor(h);  return true;  }  return false; } Copy the code
protected final boolean isHeldExclusively(a) {
     return getExclusiveOwnerThread() == Thread.currentThread();
}

protected final boolean tryRelease(int releases) {
 // If the current thread is not the thread holding the write lock  if(! isHeldExclusively()) throw new IllegalMonitorStateException();  // Subtract the write lock value  int nextc = getState() - releases;  // Number of write locks is 0  boolean free = exclusiveCount(nextc) == 0;  if (free)  setExclusiveOwnerThread(null);  setState(nextc);  // Returns true when all write locks have been released, waking up subsequent nodes  return free; } Copy the code

Read lock acquisition and release

Read locks are shared locks

public void lock(a) {
       sync.acquireShared(1);
}
 
public final void acquireShared(int arg) {
 // Return a value less than 0 to indicate that no read lock has been acquired,  if (tryAcquireShared(arg) < 0)  doAcquireShared(arg); } Copy the code
protected final int tryAcquireShared(int unused) {
            Thread current = Thread.currentThread();
            int c = getState();
            // If there is a write lock and the holder of the write lock is not the current thread
            // Reject attempts to acquire read locks
 // It is possible to own a read lock when the current thread holds a write lock, which will be explained later  if(exclusiveCount(c) ! =0 && getExclusiveOwnerThread() ! = current) return -1;  // Get the current number of read locks  int r = sharedCount(c);  // The readerShuoldBlock logic is explained later.  // When the method can be entered and the CAS is successful  if(! readerShouldBlock() && r < MAX_COUNT &&  compareAndSetState(c, c + SHARED_UNIT)) {  // If the current number of read locks is 0, this is the first read  if (r == 0) {  // Will set the current thread as the first read thread  firstReader = current;  // The number of first read threads  firstReaderHoldCount = 1;  // Support reentrant, when the first read thread enters again  } else if (firstReader == current) {  firstReaderHoldCount++;  } else {  // Cache the last thread to acquire the read lock  HoldCounter rh = cachedHoldCounter;  // If the cache is not the current thread, the value of the current thread is retrieved through ThreadLocal and modified  if (rh == null|| rh.tid ! = getThreadId(current)) cachedHoldCounter = rh = readHolds.get();  else if (rh.count == 0)  readHolds.set(rh);  rh.count++;  }  return 1;  }  return fullTryAcquireShared(current); } Copy the code

ReaderShouldBlock is implemented differently for fair and unfair locks

In fair locks: return true if there is a waiting queue, then the lock cannot be acquired directly

final boolean readerShouldBlock(a) {
      return hasQueuedPredecessors();
}
Copy the code

In unfair locks: If the first successor to the head in the blocking queue is a write lock, the current thread cannot attempt to acquire the lock

final boolean readerShouldBlock(a) {
      return apparentlyFirstQueuedIsExclusive();
}

Return true if the driver of the head node is in exclusive mode
final boolean apparentlyFirstQueuedIsExclusive(a) {  Node h, s;  return(h = head) ! =null && (s = h.next) ! =null && ! s.isShared() &&s.thread ! =null; } Copy the code
final int fullTryAcquireShared(Thread current) {
            HoldCounter rh = null;
            for (;;) {
                int c = getState();
                // When there is a write lock and the holder is not the current thread
 if(exclusiveCount(c) ! =0) {  if(getExclusiveOwnerThread() ! = current) return -1;  // If there is no write lock, or the write lock holder is the current thread  // There is a queue waiting for a fair lock  // An unfair lock. The first successor to the wait queue is a write lock  } else if (readerShouldBlock()) {  // If the thread is present for the first time, it is not processed  if (firstReader == current) {  // assert firstReaderHoldCount > 0;  // If it is another reading thread, get the number of reads of the current thread  // If it is 0, it is removed from ThreadLOcalMap  // And refuses to attempt to acquire the read lock  } else {  if (rh == null) {  rh = cachedHoldCounter;  if (rh == null|| rh.tid ! = getThreadId(current)) { rh = readHolds.get();  if (rh.count == 0)  readHolds.remove();  }  }  if (rh.count == 0)  return -1;  }  }  // Read lock out of limit, throw exception  if (sharedCount(c) == MAX_COUNT)  throw new Error("Maximum lock count exceeded");  // CAS updates the status  if (compareAndSetState(c, c + SHARED_UNIT)) {  // The logic here is the same as before, so I won't go into detail  if (sharedCount(c) == 0) {  firstReader = current;  firstReaderHoldCount = 1;  } else if (firstReader == current) {  firstReaderHoldCount++;  } else {  if (rh == null)  rh = cachedHoldCounter;  if (rh == null|| rh.tid ! = getThreadId(current)) rh = readHolds.get();  else if (rh.count == 0)  readHolds.set(rh);  rh.count++;  cachedHoldCounter = rh; // cache for release  }  return 1;  }  } } Copy the code

The doAcquireShared method is called when fetching fails.

The main logic is to add the node to the synchronous wait queue and then enter the for loop. If the forerunner of the current node is the head node, then call tryAcquireShared to try to obtain the read lock.

private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
 for (;;) {  final Node p = node.predecessor();  if (p == head) {  int r = tryAcquireShared(arg);  if (r >= 0) {  setHeadAndPropagate(node, r);  p.next = null; // help GC  if (interrupted)  selfInterrupt();  failed = false;  return;  }  }  if (shouldParkAfterFailedAcquire(p, node) &&  parkAndCheckInterrupt())  interrupted = true;  }  } finally {  if (failed)  cancelAcquire(node);  }  } Copy the code

Read lock release

public void unlock(a) {
       sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
 if (tryReleaseShared(arg)) {  doReleaseShared();  return true;  }  return false; } Copy the code
protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread();
            // If the thread to be released is the current thread
            if (firstReader == current) {
                Reset to null if the current thread will have no read lock
 if (firstReaderHoldCount == 1)  firstReader = null;  else  firstReaderHoldCount--;  } else {  // Get the number of read locks held by the current thread based on ThreadLocal  HoldCounter rh = cachedHoldCounter;  if (rh == null|| rh.tid ! = getThreadId(current)) rh = readHolds.get();  int count = rh.count;  // If the current thread has no reentrant count, remove it from ThreadLocalMap  if (count <= 1) {  readHolds.remove();  if (count <= 0)  throw unmatchedUnlockException();  }  --rh.count;  }  // CAS resets state to 0 to wake up subsequent nodes  for (;;) {  int c = getState();  // The lock status will be read  int nextc = c - SHARED_UNIT;  if (compareAndSetState(c, nextc))  // There are no read locks or write locks  return nextc == 0;  } } Copy the code

When the above method returns true, the doReleaseShared method is called.

private void doReleaseShared(a) {
        for (;;) {
            Node h = head;
            if(h ! =null&& h ! = tail) {                int ws = h.waitStatus;
 if (ws == Node.SIGNAL) {  if(! compareAndSetWaitStatus(h, Node.SIGNAL,0))  continue; // loop to recheck cases  unparkSuccessor(h);  }  else if (ws == 0 && ! compareAndSetWaitStatus(h,0, Node.PROPAGATE))  continue; // loop on failed CAS  }  if (h == head) // loop if head changed  break;  } } Copy the code

Wake up the subsequent nodes of the current node. If the subsequent nodes are null, or the status of the subsequent nodes is greater than 0, it indicates that the node is canceled. Therefore, look forward from the tail node to find the earliest node that is not canceled, and then wake up

private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

 Node s = node.next;  if (s == null || s.waitStatus > 0) {  s = null;  for(Node t = tail; t ! =null&& t ! = node; t = t.prev) if (t.waitStatus <= 0)  s = t;  }  if(s ! =null)  LockSupport.unpark(s.thread);  } Copy the code

ReentrantReadWriteLock Is a lock degradation mechanism for concurrent programming in Java.

This process of acquiring read locks and then releasing write locks is called lock degradation.

So why does a thread acquire a write lock after modifying data, instead of releasing the write lock?

If the current thread releases the write lock directly, then if another thread acquires the write lock and changes the data, the current thread will not be aware of the data change. The purpose of obtaining the read lock first is to ensure that no other thread will modify the data.