In the JAVA system, lock is an important mechanism to achieve synchronization in multithreading. JAVA lock includes synchronized, which is implemented at the JVM level. Lock, ReadWriteLock, ReentrantLock, Semaphore, CountDownLatch, CyclicBarrier, The lock used by the Worker in ThreadPoolExecutor is also from the AQS implementation described in this article, so to understand the JAVA lock, you only need to understand the AQS implementation
AQS data structure
AbstractQueuedSynchronizer class as well as the important member variables
Can be seen in the code below AQS class inheritance is simpler, is AbstractOwnableSynchronizer class inheritance And implements the Serializable interface,
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
Copy the code
AbstractOwnableSynchronizer class function is simpler, main is to set up exclusive thread lock, setting thread said it is exclusive lock, not set a Shared lock.
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { protected AbstractOwnableSynchronizer() { } private transient Thread exclusiveOwnerThread; protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; }}Copy the code
The following are three important member variables in AQS: head is the head node in the wait queue, tail is the tail node in the wait queue, and state is the synchronization state value.
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
/**
* The synchronization state.
*/
private volatile int state;
Copy the code
Enumeration of waitStatus in the wait queue: 1.CANCELLED(1) This NOde is due to timeout or interruption. SIGNAL(-1) The successor of this Node is blocked, so when the current Node cancels or releases the lock, unpark is required to wake up the successor Node. 3.CONDITION(-2) This object is in the conditional wait queue. PROPAGATE(-3) is PROPAGATE for the current node operation to other nodes, for example, the release lock of doReleaseShared sharing mode is PROPAGATE for subsequent nodes. 5.0 is none of the above
Static final class Node {/** indicates that it is SHARED */ static final Node SHARED = new Node(); Static final Node EXCLUSIVE = null; /** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3; ``` volatile int waitStatus; /** * Connect to the previous Node of the current Node, */ volatile Node prev; /** * Connect to the next Node of the current Node, */ volatile Node next; volatile Thread thread; /** * indicates the next Node in the conditional queue or the SHARED Node. Because the conditional queue is accessible only in exclusive mode, a simple one-way list is required to store the Node with the conditional variable, because the conditional variable can only be exclusive lock. So the nextWaiter variable can also be * SHARED to indicate that the lock is currently SHARED */ Node nextWaiter; /** * final Boolean isShared() {return nextWaiter == SHARED; } /** * Returns the previous node with the waiting queue */ Final node predecessor() throws NullPointerException {node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; }}Copy the code
Locking process
Take a look at ReentrantLock’s lock acquisition process. Its constructor initializes a NonfairSync, which means that the ReentrantLock is an unfair lock.
public ReentrantLock() {
sync = new NonfairSync();
}
Copy the code
ReentrantLoc creates an unfair lock by default. When you call lock, CAS will directly attempt to grab the lock (where state=0 means that there is no lock, and 1 means that one thread has already grabbed the lock). If the lock is not captured, AQS acquire method default parameter 1 will be normally called to acquire the lock.
/** * Sync object for non-fair locks */ static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, SetExclusiveOwnerThread (thread.currentThread ()); Else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }}Copy the code
1. TryAcquire attempts to acquire locks, 2. If the fetch fails, an exclusive Node is added to the wait queue
public final void acquire(int arg) { if (! AcquireQueued (// Join queue addWaiter(Node.exclusive), arg)) // Interrupt the current thread selfInterrupt(); }Copy the code
An unfair lock will attempt to grab the lock in the first place, regardless of whether there was a thread waiting to grab the lock in the queue before,
- Acquires (ReentrentLock = 1) and set the exclusive thread to itself. Acquires (ReentrentLock = 1)
- Acquires value is incremented and assigned to the state variable, indicating the number of times the same thread concurrently holds the same lock.
/** * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); If (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current); return true; Else if (current == getExclusiveOwnerThread()) {int nexTC = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); // Assign the current state setState(nexTC); return true; } return false; }Copy the code
After tryAcquire fails, execute addWaiter(Node.exclusive) to create a Node in EXCLUSIVE mode and join the wait queue.
Private Node addWaiter(Node mode) {// Create a new Node Node Node = new Node(thread.currentThread (), mode); // Try the fast path of enq; // backup to full enq on failure // If (pred! = null) {// Set the new node prev pointer to the previous tail pointer node.prev = pred; If (compareAndSetTail(pred, Node)) {// Create a new Node pred.next = Node; return node; }} if the above thread fails to pass CAS, it indicates that another thread has joined the queue. Enq will join the queue enq(Node). return node; }Copy the code
The following is the process of the node joining the end of the queue.The prev pointer of the new node points to the tail of the original node
Enq is added to the newly created Node in CAS mode through the infinite loop of for to ensure that the newly created Node will be added to the waiting queue.
private Node enq(final Node node) { for (;;) { Node t = tail; If (t == null) {// Must initialize // Set the virtual head Node, which is empty Node if (compareAndSetHead(new Node())) head; } else {// First make the front node of the new node point to the end node of the current queue node.prev = t; If (compareAndSetTail(t, node)) {// set the new node to the new tail node. return t; }}}}Copy the code
After enqueuing the new tail Node, execute the acquireQueued method with the nodes enqueued above and args (the Acquire variable of tryAcquire). The logic of this method is to process the new Node enqueued. Then try to wake up the thread from the queue to run,
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) Final Node p = node.predecessor(); final Node p = node.predecessor(); If (p == head && tryAcquire(arg)) {// Set the new node as the head node. SetHead (node) status does not change if thread is null. p.next = null; // help GC failed = false; return interrupted; } / / if the node precursor is not head node, call the judge precursor node waitStatus / / is a SIGNAL, it returns true if (shouldParkAfterFailedAcquire (p, node) && / / can go to this step, In the preceding step, if waitStatus of the precursor node is SIGNAL, run // LockSupport's park to make the thread wait until a thread releases the lock. Then run locksupport. unpark and check whether the thread is interrupted ParkAndCheckInterrupt ()) // Set the interrupt flag to true interrupted = true; If (failed) cancelAcquire(node) cancelAcquire(node); if (failed) cancelAcquire(node); }}Copy the code
ShouldParkAfterFailedAcquire is failed, after acquiring a lock need park block waiting for the lock,
private static boolean shouldParkAfterFailedAcquire(Node pred, Int ws = pred.waitStatus; // If a Node waits for a SIGNAL, it returns true, which means it can be called when the lock is released, so it can block to wait for the lock. if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; // If the wait status is greater than 0, the status is CANCELLED (1), If (ws > 0) {* * Predecessor was cancelled. Skip over boomtime and * indicate retry. */ do {// Node.prev = pred = pred.prev; } while (pred.waitStatus > 0); // Next points to the new tail node. Next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ CompareAndSetWaitStatus (pred, ws, node.signal); } return false; }Copy the code
If acquiring the lock fails, cancelAcquire is called when failed is true in the acquireQueued method.
private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; node.thread = null; // Skip cancelled toraise // remove the state of Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, So no further action is necessary. // Get the next Node predNext = pred.next; // Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, We are free of interference from other threads. // Set the current status to "CANCELLED". // If (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; // If (pred! = head && / / when the precursor waitStatus is equal to the SIGNAL condition ((ws = Mr Pred. WaitStatus) = = Node. The SIGNAL | | / / or less than zero, SIGNAL (ws <= 0 && compareAndSetWaitStatus(pred, ws, node.signal)) && Pred. thread! = null) {// Get the next node of node. // If next is not empty and waitStatus is less than 0, there is a subsequent node and the subsequent node is not canceled if (next! = null && next. WaitStatus <= 0) // compareAndSetNext(pred, predNext, next); } else {//1. The precursor is the head node //2. The precursor is not SIGNAL and (waiteStatus is cancelled or waitStatus is successfully set to SIGNAL) // 3. If thread is empty, it means that the antecedent is an antecedent node. // In the preceding three cases, it is necessary to wake up the list to go through from the end to find the first preceding one. //waitStatus is not the unparkprecursor (node); } // Point the next node to the current node node.next = node; // help GC } }Copy the code
The logic is to wake up the thread waiting for the lock from the tail node by finding a node on the list whose waiteStatus is less than 0 (i.e. not the cancelled thread), and then executing unpark of LockSupport to wake up the thread waiting for the lock. There are two steps: the first is the added prev pointer, so you have to go from back to front to make sure you get to all the nodes
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; If (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t ! = null && t ! = node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s ! = null) LockSupport.unpark(s.thread); }Copy the code
Release the lock
Now let’s look at how to release the lock, which again comes from the AQS parent’s release method,
Public final Boolean release(int arg) {public final Boolean release(int arg) { If (tryRelease(arg)) {// Get the virtual head Node h = head; // If the head node is not empty and waitStatus is not 0, then there are threads waiting for locks. = null && h.waitStatus ! Succeeded = 0) // Wake up the first thread waiting for the lock unparkprecursor (h); return true; } return false; }Copy the code
After subtracting release from state, if c = 0, there is no lock free, set the exclusive thread to NULL, and set state to 0
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() ! = getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }Copy the code
Summary this article mainly for ReentrantLock to add exclusive lock and release exclusive lock process to do a detailed analysis, mainly around the state variable and CLH double-ended queue. (locksupport. park) (locksupport. park) (locksupport. park)