preface
My last article, Locks and Memory Models for Concurrent Programming, introduced Synchronize and various types of locks (biased, spin, lightweight, and heavyweight), introduced the Java memory model’s three main features, and introduced the keyword Volitile to explain its functions and principles. In addition, introduced JUC many places used in the idea or algorithm: CAS (comparison and exchange), today’s focus is JUC difficult AQS (I think), this had to look at the source.
The interview process
-
Interviewer: I remember that there is another CAS with a similar name called AQS, can you tell me?
-
Me: Yes. 1, AQS (AbstractQueuedSynchronizer) queue synchronizer, it is the foundation of building lock or other synchronous component framework (e.g., already, ReentrantReadWriteLock, Semaphore, etc.), JUC and packet publishing author Doug Lea expected it to be the basis for implementing most of the synchronization requirements, but as he expected, AQS is the core fundamental component of JUC and packet publishing. 2. AQS solves a lot of details involved in the implementation of synchronizer, such as obtaining synchronization state, FIFO synchronization queue. There are many benefits to building synchronizers based on AQS. Not only does it greatly reduce the implementation effort, but it also does not have to deal with the problem of competing in multiple locations. In a synchronizer built on AQS, blocking occurs at only one moment, reducing the overhead of context switching and improving throughput. At the same time, AQS are designed with scalability in mind. Therefore, all synchronizers built on AQS in JUC can gain this advantage. 3. The primary use of AQS is inheritance. Subclasses manage synchronization state by inheriting the synchronizer and implementing its abstract methods. AQS uses a member variable state of type int to indicate synchronization status: 1. If state>0, the lock has been acquired. 2. When state=0, the lock is released. It provides three methods to operate on synchronized state, and AQS can ensure that operations on state are safe: getState(); setState(int newState); compareAndSetState(int expect, int update); 4. In addition, AQS completes the queuing of resource acquisition threads through the built-in FIFO synchronization queue. (1) If the current thread fails to acquire the synchronization state (lock), AQS will construct a Node (Node) based on the information of the current thread and the waiting state and add it to the synchronization queue, while blocking the current thread. (2) When the synchronization state (lock) is released, it will wake up the thread in the node and make it try to obtain the synchronization state again.
-
Interviewer: Can you tell me what useful methods AQS provides?
-
Me: AQS mainly provides the following methods (API) : 1. GetState () : returns the value of the synchronization status. 2. SetState (int newState) : Sets the current synchronization status. CompareAndSetState (int expect, int update) : use CAS to set the current state. This method can ensure the atomicity of the state setting. 4. TryAcquire (int ARG) : Exclusive acquisition of the synchronization status. After the synchronization status is successfully obtained, other threads can obtain the synchronization status only after the synchronization status is released by the thread. TryRelease (int arg) : exclusive release synchronization state. TryAcquireShared (int arg) : The shared synchronization status is obtained. If the returned value is greater than or equal to 0, the synchronization status is obtained successfully. Otherwise, the synchronization status fails. 7, tryReleaseShared(int ARG) : shared release synchronization status. 8. IsHeldExclusively () : Indicates whether the current synchronizer is occupied by the current thread in exclusive mode. Acquire (int ARg) : Exclusive access to the synchronization status. If the current thread succeeded in obtaining the synchronization status, it will be returned by this method, otherwise it will enter the synchronization queue to wait. This method will call the rewritable tryAcquire(int arg) method. AcquireInterruptibly (int ARg) : Same as acquire(int arg), but this method responds to an interrupt. The method throws InterruptedException and returns if the current thread is interrupted when it enters the synchronization queue to obtain its synchronization status. TryAcquireNanos (int arg, long Nanos) : Time out to obtain the synchronization status. False will be returned if the current thread has not acquired synchronization status within nanos time, and true if it has. AcquireShared (int arg) : Shared acquireShared(int arg) : Shared acquireShared(int arg) : Shared acquireShared(int arg) : Shared acquireShared 13, acquireSharedInterruptibly (int arg) : Shared access to sync, respond to interrupt. 14, tryAcquireSharedNanos(int ARg, long nanosTimeout) : Shared get synchronization state, add timeout limit. Release (int arg) : exclusive release of synchronization state. This method will wake up the thread containing the first node in the synchronization queue after the synchronization state is released. 16. ReleaseShared (int ARG) : Shared release synchronization status.
-
Interviewer: You just mentioned that AQS maintains a FIFO queue internally. Can you talk about this queue?
-
Me: That queue is the CLH queue. The CLH queue is a FIFO bidirectional queue (anyone who has studied data structures should know) that AQS relies on to manage the synchronization state. 1. If the current thread fails to obtain the synchronization status, AQS will construct the current thread’s waiting status and other information into a Node and add it to the CLH synchronization queue, while blocking the current thread. 2. When the synchronization state is released, it will wake up the first node (fair lock) and make it try to obtain the synchronization state again.
-
Me: In the CLH synchronization queue, a Node represents a thread, and it holds thread references, waitStatus, prev, and next. The definition is as follows: the Node is AbstractQueuedSynchronizer static inner class:
Static final class Node {// share static final Node SHARED = new Node(); Static final Node EXCLUSIVE = null; Static final int CANCELLED = 1; static final int CANCELLED = 1; Static final int SIGNAL = -1; static final int SIGNAL = -1; static final int SIGNAL = -1; static final int SIGNAL = -1; /** * The thread of the node waits on Condition. When another thread calls signal() on Condition, the node is moved from the wait queue to the synchronous queue. */ static final int CONDITION = -1; PROPAGATE = PROPAGATE; PROPAGATE = PROPAGATE; PROPAGATE = PROPAGATE /** Wait state */ volatile intwaitStatus; /** Precursor Node, set when the Node is added to the synchronization queue (tail added) */ volatile Node prev; /** Next Node */ volatile Node next; /** Wait for subsequent nodes in the queue. If the current Node is SHARED, the field will be a SHARED constant, meaning that the Node type (exclusive and SHARED) and subsequent nodes in the wait queue share the same field */ Node nextWaiter. /** The Thread whose status is synchronized */ volatile Thread Thread; final booleanisShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus; this.thread = thread; }}Copy the code
Description: 1, waitStatus fields: wait state, used to control the thread blocking and wake up, there are five kinds of state: INITAL/CANCELLED/SINGAL/CONDITION/PROPAGATE. 2. Thread field: indicates the thread corresponding to the Node. 3, nextWaiter field: Node Node to obtain synchronization status model. The tryAcquire(int Args) and tryAcquireShared(int Args) methods are exclusive and shared, respectively. If the fetch fails, they all call the addWaiter(Node mode) method to join the queue. NextWaiter is used to indicate which modes: SHARED: enum SHARED, EXCLUSIVE: enum EXCLUSIVE. 4. Predecessor () method: Obtain the previous Node Node. The local copy of Node p = prev inside the method is to avoid concurrency. The prev will be modified when the prev is determined to be null to ensure thread-safety.
- Interviewer: How does CLH queue work
- Me: Those of you who study data structures know that joining CLH is easy, as shown in the figure below. 1. Tail points to the new node. 2. The rev of the new node points to the current last node. 3. Next of the current last node points to the joining node.
1: private Node addWaiter(Node mode) {2: // Create a new Node 3: Node Node = new Node(thread.currentThread (), mode); 4: // Record the original tail Node 5: Node pred = tail; 6: // Quick try, add new node as tail node 7:if(pred ! = null) {8: // Sets the tail Node of the new Node to the original. 10: // CAS sets the new tail node 11:if(compareAndSetTail(pred, node)) {12: // Successful, the next node of the original end node is the new node 13: pred.next = node; 14:returnnode; 15:} 16:} 17: // Failed, repeated attempts until successful 18: enq(node); 19:returnnode; 20:}Copy the code
Queuing: THE CLH synchronization queue follows THE FIFO, and when the thread on the first Node releases the synchronization state, it wakes up its next Node (Node.next). The successor node will set itself as the head node when the synchronization status is successfully obtained. The process is simple: head executes the node and disconnects next from the original first node and prev from the current node. Note: There is no need to use CAS in this process because only one thread can successfully obtain the synchronization state. SetHead (Node Node) method to implement the above queue exit logic, as shown in the following figure:
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
Copy the code
- Interviewer: Can you tell me how AQS get and release sync?
- : I said earlier AQS design pattern is the template method pattern, subclasses through the way of inheritance, its abstract method to manage the synchronous state, AQS offers plenty of template method to realize the synchronization, mainly divided into three categories: exclusive synchronization acquisition and release status, Shared synchronization acquisition and release status, query queue waiting threads in sync. Exclusive: Only one thread holds the synchronous state at a time. Exclusive access to the synchronization status
public final void acquire(int arg) {
if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code
TryAcquire: Set lock status and return true on success, false otherwise. This method is implemented by a custom synchronization component, which must be thread safe to obtain synchronization state. 2. AddWaiter: If tryAcquire returns false (failed to get synchronization status), call this method to add the current thread to the tail of the CLH synchronization queue. AcquireQueued: The current thread spins in fairness until it acquires the lock. SelfInterrupt: selfInterrupt: generates an interrupt This method is a spin process. After the current thread (Node) enters the synchronization queue, it will enter a spin process. Each Node will observe introspectively and exit the spin process when the condition is met and the synchronization state is obtained. As can be seen from the following code, the current thread will always try to obtain the synchronization state, of course, provided that only its precursor node is the head node can attempt to obtain the synchronization state, reason: keep the FIFO synchronization queue principle. After the head node releases the synchronization state, it wakes up its successor nodes. After waking up, the successor nodes need to check whether they are the head nodes.
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true; // Interrupt flag Boolean interrupted =false; /* * the spin process is just an infinite loop */for(;;) {// Final Node p = node.predecessor(); // The precursor node of the current thread is the head node, and the synchronization status is successfulif (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
returninterrupted; } // Fetch fails, thread waits -- more on that laterif (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if(failed) cancelAcquire(node); }}Copy the code
There are also methods of exclusive fetch response interrupt and exclusive timeout fetch, which are not described in detail here. Exclusive release Synchronization State When a thread obtains the synchronization state, it needs to release the synchronization state after executing the corresponding logic. AQS provides the release(int arg) method to release the synchronization state:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if(h ! = null && h.waitStatus ! = 0) unparkSuccessor(h);return true;
}
return false;
}
Copy the code
The method also calls the tryRelease(int arg) method to release the synchronization state and, if successful, calls the unparkprecursor (Node Node) method to wake up the succeeding nodes. Summary: A FIFO synchronization queue is maintained inside the AQS. When the thread fails to obtain the synchronization status, it will join the end of the CLH queue and keep the spin. The thread in the CLH synchronization queue will determine whether its precursor node is the first node when spinning. If it is the first node, it will continuously try to obtain the synchronization status and exit the CLH synchronization queue if it succeeds in obtaining the synchronization status. When the thread finishes executing the logic, it releases the synchronization state and wakes up subsequent nodes. 2. The main difference between the shared mode and the exclusive mode is that only one thread can obtain the synchronization state at the same time, while the shared mode can have multiple threads obtain the synchronization state at the same time. For example, a read operation can be read by multiple threads at the same time, and a write operation can be written by only one thread at a time. AcquireShared (int ARg)
public final void acquireShared(int arg) {
ifTryAcquireShared (arG) < 0) // Get failed, spin get synchronization statusdoAcquireShared(arg);
}
Copy the code
TryAcquireShared (int arg) and doAcquireShared(int arg) spin to get the state.
private void doAcquireShared(int arg) {/ Final Node Node = addWaiter(node.shared); boolean failed =true;
try {
boolean interrupted = false;
for(;;) {// Final Node p = node.predecessor(); // If its precursor node, get synchronization statusif(p == head) {// Try to get the synchronization int r = tryAcquireShared(arg);if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return; }}if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if(failed) cancelAcquire(node); }}Copy the code
AddWaiter (node.shared) adds the current thread to the end of the CLH synchronization queue and loops (spins) to try to get the synchronization status: Node. predecessor() represents the predecessor nodes of the current node, if (p ==head) If the predecessor node is the first node, then call the tryAcquireShared(int args) method to try to obtain the synchronization state, obtain the success (r >=0) and exit the cycle (spin), Wake up the next waiting node before exiting (that is, set the first node of the next node’s precursor). Release synchronization state in shared mode
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
Copy the code
- Interviewer: from the source you cited above, I see shouldParkAfterFailedAcquire this method, seems to be used to block the thread, you can say the AQS how threads blocked and wake up?
- Me: As mentioned above, when a thread fails to obtain the synchronization state, it joins the CLH synchronization queue and continuously obtains the synchronization state by spinning. However, during the spinning process, it needs to determine whether the current thread needs to block. In acquiring the synchronization state After the failure of the thread is not blocked immediately, need to check the status of this thread, check method for shouldParkAfterFailedAcquire (Node pre, Node Node), the method is mainly by precursor Node to judge whether the current thread should be blocked. For example, part of the source code above:
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
Copy the code
Private static Boolean shouldParkAfterFailedAcquire (Node Mr Pred, Node Node) {int ws / / precursor Node = Mr Pred. WaitStatus; // If the status is signal, the current thread is in the waiting statetrue
if (ws == Node.SIGNAL)
return true; // The status of the precursor node is greater than 0, Cancelled, indicating that the node has timed out or been interrupted and needs to be Cancelled from the synchronization queueif (ws > 0) {
do {
node.prev = pred = pred.prev;
} while(pred.waitStatus > 0); pred.next = node; } // The state of the precursor node is Condition, propagateelse {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
Copy the code
If the status of the current thread is SINNAL, the current thread needs to be blocked. Call unpark() and return true. The current thread is blocked. 2. If the status of the precursor node of the current thread is CANCELLED(WS >0), it indicates that the precursor node of the thread has been CANCELLED due to timeout or has been interrupted. The precursor node needs to be deleted from the CLH queue until the status of the precursor node is <=0, and false is returned. 3. If the precursor node is non-Sinnal, CANCELLED, CAS is used to set the precursor node to SINNAL and return false. If shouldParkAfterFailedAcquire (Node pre, Node Node) method returns false, you invoke parkAndCheckInterrupt () method to block the current thread.
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
Copy the code
The parkAndCheckInterrupt() method essentially blocks the call stack of the current thread by suspending it and returns the interrupted status of the current thread. Internally, it blocks by calling the park() method of the LOckSupport utility class.
- Interviewer: Let’s talk about the application of AQS in JUC -ReentrantLock. What do you know about ReentrantLock?
- Me: ReentrantLock is a ReentrantLock, a recursive non-blocking synchronization mechanism. It can be equivalent to the use of synchronized, but provides a more powerful and flexible mechanism than synchronized, reducing the probability of deadlocks. ReentrantLock will be owned by the thread that recently acquired the lock and has not released the lock. If the lock is not held by the thread, the thread that called the lock successfully acquires the lock. If the current thread already owns the lock, calling the lock() method returns immediately. 2. Reentrant provides a choice of fair and unfair locks. The constructor accepts an optional fair argument (true for a fair lock, otherwise an unfair lock). The difference between a fair lock and an unfair lock is that a fair lock is acquired sequentially. However, the efficiency of fair lock is usually not as high as that of non-fair lock. In the case of multi-threaded access, fair lock shows lower throughput. 1, lock method
public void lock() {
sync.lock();
}
Copy the code
Sync is an inner class in ReentrantLock that inherits AQS and has two subclasses: FairSync and NonFairSync. Most of the functionality in ReenTrantLock is delegated to Sync, and Sync defines the lock() abstract method to be implemented by subclasses. By default, the nonfairTryAcquire(int Acquires) method is implemented. Let’s look at the lock method for unfair locking
final void lock() {// Try to get the lockif(compareAndSetState (0, 1))setExclusiveOwnerThread(Thread.currentThread());
elseAQS acquire(int arg) acquire(1); }Copy the code
First try to acquire the lock, and if it succeeds, set the lock to be exclusive to the current thread. If acquire fails, acquire(1) is called, which is defined in AQS as follows:
public final void acquire(int arg) {
if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code
TryAcquire (int arg) is called first. As explained in AQS, this method needs to be implemented by the synchronization component itself. The implementation in NonfairSync is shown below:
protected final boolean tryAcquire(int acquires) {
returnnonfairTryAcquire(acquires); } final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); Int c = getState(); //state==0 indicates that the lock is idleif(c == 0) {// Use CAS to hold the lockif (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true; }} // Determine whether the thread holding the lock is the current threadelse if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded"); // The thread that successfully acquired the lock acquires the lock again, increasing the synchronization statesetState(nextc);
return true;
}
return false;
}
Copy the code
Unlock method
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if(h ! = null && h.waitStatus ! = 0) unparkSuccessor(h);return true;
}
return false; } protected final Boolean tryRelease(int releases) {// Subtract releases int c = getState() -releases; If the thread holding the lock is not released, throw an exceptionif(Thread.currentThread() ! = getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free =false; //state == 0 indicates that the synchronization is complete and other threads can obtain the synchronization statusif (c == 0) {
free = true;
setExclusiveOwnerThread(null); } // Resets the synchronization statussetState(c);
return free;
}
Copy the code
Unlock () internally releases the lock using Sync’s release(int arg), which is defined in AQS. TryRelease also needs to be implemented by the synchronization component itself. If the first Node is succeeded and there are threads waiting to succeed, the unparksucceeded (Node Node) method is called to wake up the next thread.
-
Interviewer: What are the similarities and differences between ReentrantLock and Synchronized?
-
Me: First they must have the same functionality and memory semantics. The differences are as follows: 1. Compared with synchronized, ReentrantLock provides more comprehensive functions and has stronger expansibility. For example, time lock wait can be interrupted lock wait and lock vote. 2. ReentrantLock also provides conditions, which are more detailed and flexible for wake-up operations on threads, so ReentrantLock is more suitable in places with multiple Condition variables and high lock contention. 3. ReentrantLock provides a pollable lock request that attempts to acquire the lock and continues if it succeeds, otherwise it will be processed at the next run time, whereas synchronized either succeeds or blocks once it enters the lock request. ReentrantLock is more likely to deadlock. 4. ReentrantLock supports a more flexible synchronized block of code, but with synchronized, it can only be retrieved and released in a synchronized block structure. 5. ReentrantLock supports interrupt handling and performs better.
-
Interviewer: Have you used ReentrantReadWirteLock? Can you tell me about it?
-
Me: Look at the source code directly. The first few lines of ReentrantReadWrite are simple. Take a look at the Sync class.
abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 6317671515068378041L; /* The state is split in two: high 16 bits for shared mode and low 16 bits for exclusive mode. */ static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; Static int sharedCount(int c) {static int sharedCount(int c) {returnc >>> SHARED_SHIFT; Static int exclusiveCount(int c) {static int exclusiveCount(int c) {returnc & EXCLUSIVE_MASK; } /** Static final class HoldCounter {int count = 0;} /** Static final class HoldCounter {int count = 0; // Use id, not reference, to avoid garbage retention final long tid = getThreadId(Thread.currentThread()); */ static final Class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {public HoldCounterinitialValue() {
returnnew HoldCounter(); }} /** Private TRANSIENT ThreadLocalHoldCounter private transient ThreadLocalHoldCounterreadHolds; /** is used in the cache to count the readlock reentrant count of the "last thread to acquire the read lock", so that whichever thread acquires the read lock consumes this value so that it does not need to query the map in ThreadLocal. This cache can help improve performance if no other thread has acquired the read lock during the acquisition-release period. */ private transient HoldCounter cachedHoldCounter; /** The thread that first acquired the read lock (and did not acquire the read lock), and the number of read locks it held. */ private transient Thread firstReader = null; private transient int firstReaderHoldCount;Sync() {// initializereadHolds the ThreadLocal propertyreadHolds = new ThreadLocalHoldCounter(); // To make surereadHolds memory visibilitysetState(getState()); // ensures visibility of readHolds
}
Copy the code
Like ReentrantLock, ReentrantReadWriteLock uses Sync as its lock principal. Its read and write locks are implemented by Sync. Therefore, ReentrantReadWriteLock actually has only one lock. The only difference is that the read lock and the write lock are acquired in different ways. Its read-write locks are essentially two classes ReadLock and WriteLock. Synchronization status is represented by a state of type int in ReentrantLock, which represents the number of times the lock has been repeatedly acquired by a thread. The read and write lock ReentrantReadWriteLock maintains a pair of locks. A variable is used to maintain multiple states. Therefore, the read and write lock uses bit-cutting to maintain the variable. After partition, the state of read lock and write lock is determined by bit operation. If the current synchronization state is S, then write state =S & 0x0000FFFF (erase the high 16 bits), read state =S >>>16 (unsigned complement 0 moved 16 bits right).
- Me: Let’s look at the write lock acquisition. A write lock is an exclusive lock that supports reentrant.
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); Int c = getState(); Int w = exclusiveCount(c);if(c ! = 0) { // c ! If =0 &&w == 0, a read lock exists and the current thread is not the thread that has obtained the write lockif(w == 0 || current ! = getExclusiveOwnerThread())return false; // Out of the maximum rangeif (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded"); // Try to get the write locksetState(c + acquires);
return true; } // Whether to blockif(writerShouldBlock() || ! compareAndSetState(c, c + acquires))return false; // Set the thread that acquires the lock to the current threadsetExclusiveOwnerThread(current);
return true;
}
Copy the code
- Me: Let’s look at the release of the write lock
Protected final Boolean tryRelease(int Releases) {// Throw an exception if the released thread is not the lock holderif(! isHeldExclusively()) throw new IllegalMonitorStateException(); Int nexTC = getState() -releases; Boolean Free = exclusiveCount(nexTC) == 0;if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
Copy the code
- Me: Let’s look at the read lock acquisition
public void lock() {
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg); } protected final int tryAcquireShared(int unused) {Thread current = thread.currentThread (); Int c = getState(); // Calculate the write lock, if there is a write lock and the holder is not the current thread, return -1if(exclusiveCount(c) ! = 0 && getExclusiveOwnerThread() ! = current)return- 1; Int r = sharedCount(c); //readerShouldBlock: Whether the lock should wait to read (fair lock principle) and the number of threads is less than the maximum, and the CAS set the lock state to read successfullyif(! ReaderShouldBlock () &&r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {if the lock is not acquired by any thread, the current thread is the first thread to acquire the read lockif(r == 0) { firstReader = current; firstReaderHoldCount = 1; } // firstReaderHoldCount reentrant +1 if the thread that acquired the read lock was the thread that first acquired the read lockelse if (firstReader == current) {
firstReaderHoldCount++;
}
else {
HoldCounter rh = cachedHoldCounter;
if(rh == null || rh.tid ! = getThreadId(current)) cachedHoldCounter = rh =readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
Copy the code
- Me: Let’s look at the release of read locks
public void unlock() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false; } protected final Boolean tryReleaseShared(int unused) {// Current = thread.currentThread (); // The thread that wants to release the lock is the first thread that acquires the lockif(firstReader == current) {// Set firstReader to Null, otherwise -1if (firstReaderHoldCount == 1)
firstReader = null;
elsefirstReaderHoldCount--; } // Get the RH object, and update the "current thread get lock information"else {
HoldCounter rh = cachedHoldCounter;
if(rh == null || rh.tid ! = getThreadId(current)) rh =readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if(count <= 0) throw unmatchedUnlockException(); } --rh.count; } //CAS updates the synchronization statusfor (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
returnnextc == 0; }}Copy the code
- There is a variable RH (HoldCounter) that is important in the read lock that can be seen during lock acquisition and lock release. To better understand a HoldCounter, let’s think of it not as a lock probability, but as a counter. An operation on a shared lock is equivalent to an operation on that counter. If the shared lock is acquired, the counter +1 is released, and the counter -1 is released. A HoldCounter is the number of shared locks held by the current thread. This number must be bound to the thread, otherwise handling another thread lock will raise an exception.
/** The HoldConter definition is relatively simple, is a count and thread ID two variables. */ static final class HoldCounter { int count = 0; final long tid = getThreadId(Thread.currentThread()); } /** A HoldCounter is a thread-bound holdLocal, and a ThreadLocalHoldCounter is a thread-bound holdLocal. */ static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounterinitialValue() {
returnnew HoldCounter(); }}Copy the code
- Interviewer: Can you tell me what other methods are available for synchronization between threads other than Object wait/notify?
- Me: And Condition. Lock provides conditions that make wake-up operations more detailed and flexible. Condition is a generalized Condition queue. It provides a more flexible wait/notification mode for threads, which suspend operations after calling the await method and will not be woken up until some condition on which the thread is waiting is true. Condition must be used with locks because access to shared state variables occurs in a multithreaded environment. An instance of Condition must be bound to a Lock, so Condition is typically an internal implementation of Lock. A diagram comparing the monitor methods of Condition and Object.
-
Interviewer: What methods does Condition provide to block and wake up threads?
-
Condition provides a series of methods to block and wake up a thread: 1, await() : causes the current thread to wait until it receives a signal or is interrupted. 2. Awiat (long time, TimeUnit Unit) : causes the current thread to wait until it receives a signal, is interrupted, or reaches the execution wait time. 3. AwaitNanos (Long nanosTimeout) : causes the current thread to wait until it receives a signal, is interrupted, or reaches the specified wait time. The return value represents the remaining time, if nanosTimeout is awakened before nanosTimeout, then the return value =nanosTimeout- elapsed time, if the return value <=0, it is considered to have timed out. AwaitUninterruptibly () : Causes the current thread to wait until it receives a signal. (This method is interrupt-insensitive). 5. AwaitUntil (Date deadline) : Causes the current thread to wait until it receives a signal, is interrupted, or reaches a specified deadline. Returns true if not notified by the specified time, false otherwise. Signal () : wakes up a waiting thread. The thread must acquire the lock associated with Condition before it returns from the wait method. SignalAll () : wakes up all waiting threads. A thread that can return from a waiting method must acquire the lock associated with Condition.
-
Interviewer: How does the Condition block and wake up the thread? (principle)
-
Me: first look at the source code. A Condition must be obtained through Lock’s newCondition method, which is defined under the interface Lock and returns a newCondition instance bound to this Lock instance. Condition is an interface with only one implementation class ConditionObject. ConditionObject is an inner class of AQS. In the already
public Condition newCondition() {
return sync.newCondition();
}
Copy the code
In Sync
final ConditionObject newCondition() {
return new ConditionObject();
}
Copy the code
In AbstractQueuedSynchronizer:
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
private transient Node firstWaiter;
private transient Node lastWaiter;
public ConditionObject() {}Copy the code
- I continued: the AQS wait queue and the Condition queue are two separate queues. 1. Await () is to release the lock resource based on the lock held by the current thread and add a Condition node to the end of the Condition queue to block the current thread. Signal () moves the head of the Condition to the end of the AQS waiting node to wait for the lock to be acquired again. The following is a schematic diagram of the access nodes of AQS and Condition queues, through which the access relations and conditions of thread nodes in the two queues can be seen. 1. Initialization state: the AQS queue has 3 nodes, and the Condition queue has 1 Node (or none).
2, node 1 executes Condition. Await () (1) to move head back. (2) Release the lock of node 1 and remove it from the AQS waiting queue. (3) Add node 1 to the waiting queue of Condition. (4) Update lastWaiter to node 1.
3. Node 2 performs Condition. Signal (). (6) Node 4 is removed from the Condition queue. (7) Add node 4 to the waiting queue of AQS. (8) Update the tail of AQS wait queue.
- Interviewer: It’s easy to say, but it has to be done. Can you use Condition to realize the producer consumer?
- Me: Picked up a pen and spent five minutes writing a Demo:
Public class ConditionDemo {private LinkedList<String> buffer; Private int maxSize; // Capacity private Lock Lock; private Condition fullCondition; private Condition notFullCondition; ConditionDemo(int maxSize) { this.maxSize = maxSize; buffer = new LinkedList<>(); lock = new ReentrantLock(); fullCondition = lock.newCondition(); notFullCondition = lock.newCondition(); } /** * producer * @param produceStr * @throws InterruptedException */ public voidset(String produceStr) throws InterruptedException {// Obtain the lock lock.lock(); try {while(maxSize == buffer.size()) { notFullCondition.await(); } buffer.add(produceStr); fullCondition.signal(); } finally {// unlock lock.unlock(); }} /** * consumer * @return
* @throws InterruptedException
*/
public String get() throws InterruptedException {
String consumeStr;
lock.lock();
try {
while (buffer.size() == 0) {
fullCondition.await();
}
consumeStr = buffer.pollFirst();
notFullCondition.signal();
} finally {
lock.unlock();
}
returnconsumeStr; }}Copy the code
Highlights from the past
Follow the public account to read more exciting articles
Java Concurrent programming series: juejin.cn/post/684490… Redis: juejin. Cn/post / 684490… Spring: juejin. Cn/post / 684490… Mybatis: juejin. Cn/post / 684490…
Mysql index: juejin.cn/post/684490… Database locks: juejin.cn/post/684490… Sub-database sub-table: juejin.cn/post/684490… Database transactions: juejin.cn/post/684490…
Online questions series juejin.cn/post/684490…
Java based juejin. Cn/post / 684490… Juejin. Cn/post / 684490…