This article has participated in the third “topic writing” track of the Denver Creators Training Camp. For details, check out: Digg Project | Creators Training Camp third is ongoing, “write” to make a personal impact.

In this article, from the perspective of the source code depth analysis of the AQS exclusive lock mode under the acquisition lock and release lock logic, the following continue to analyze the realization principle and condition of the shared lock from the source code.

Acquiring a lock

// Get shared lock, ignore interrupt; Public final void acquireShared(int arg) {// If (tryAcquireShared(arg) < 0) // If (tryAcquireShared(arg) < 0) // If (tryAcquireShared(arg) < 0) // If (tryAcquireShared(arg) < 0) // If (tryAcquireShared(arg) < 0) // If (tryAcquireShared(arG) < 0) // If (tryAcquireShared(arG) < 0) // If (tryAcquireShared(arG) < 0) // }Copy the code

The tryAcquireShared method is left to the implementer to implement the logic of obtaining the lock.

If CAS fails to obtain the shared lock, the current node is enqueued and the current thread is blocked. Private void doAcquireShared(int arg) {// Insert the current Node into the queue final Node Node = addWaiter(node.shared); boolean failed = true; try { boolean interrupted = false; for (; ;) {// Get the precursor node of the current node, if the precursor node is the head node, try to get the lock; // If obtaining the node fails, check the node status. Block Node threads when the Node state is SIGNAL Final Node p = node.predecessor(); If (p == head) {//CAS getlock int r = tryAcquireShared(arg); PROPAGAE setheadPropagate (node, r) {if (r >= 0) {// If (r >= 0) { p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; }} // Check whether the current node should block, if yes, block processing, Until interrupted by the if (shouldParkAfterFailedAcquire (p, node) && parkAndCheckInterrupt ()) interrupted = true; }} finally {// If (failed) cancelAcquire(node); }}Copy the code

It is still the spin mechanism, before the thread suspends, it constantly tries to get the lock. The difference is that once the thread gets the shared lock, the setHeadAndPropagate method is called at the same time to wake up the successor node, so as to realize the shared mode. The following is the code logic for waking up the successor node:

Private void setHeadAndPropagate(Node Node, int propagate) {Node h = head; SetHead (node); / / if the propagate > 0 or head node is empty and head node status to < 0 if (the propagate > 0 | | h = = null | | h.w. aitStatus < 0 | | (h = head) = = null | | h.waitStatus < 0) { Node s = node.next; / / if subsequent head node to sharing mode, if you get head nodes (s = = null | | s.i sShared ()) doReleaseShared (); }}Copy the code

This method mainly does two important steps:

1. Set the current node to the new head node. That is, the front node of the current node (the old head node) has obtained the share lock and is removed from the queue. 2. Call the doReleaseShared method, which invokes the unparkprecursor method to wake up the succeeding nodes.

Release the lock

Public final Boolean releaseShared(int arg) {// Release the lock in cas mode, DoReleaseShared if (tryReleaseShared(arg)) {doReleaseShared(); return true; } return false; }Copy the code

Here is the lock release logic:

private void doReleaseShared() { for (;;) // If the method is called from the setHeadAndPropagate method, then the head is the new head Node Node h = head; if (h ! = null && h ! = tail) { int ws = h.waitStatus; If (ws == node.signal) {// Initialize Node state // CAS atomic operation is required here, Because both the setHeadAndPropagate and releaseShared methods work best with doReleaseShared, avoiding multiple unpark call if (! CompareAndSetWaitStatus (h, node.signal, 0)) // Continue the loop if initialization fails; // Loop to recheck the succeeded (h); } // If the successor node does not need to wake up for the moment, then the current head node state is updated to PROPAGATE, ensuring that the subsequent can be passed to the successor node else if (ws == 0 &&! compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS} // If the head node does not change during the wake process, exit the loop. If (h == head) // loop if head changed break; }}Copy the code

The lock release logic of a shared lock is slightly more complicated than that of an exclusive lock. The reason is that the shared lock needs to release all the nodes of the shared type in the queue, so it requires cyclic operation. As the node state will be modified in several places during the lock release process, CAS atomic operation is required to ensure concurrent security. The main feature of shared lock is that when a thread obtains the lock, it will wake up the nodes that can be shared with it in the waiting queue in turn. Of course, these nodes are also shared lock type.

Condition condition

The Condition interface defines the following methods:

Await (): The current thread enters the wait state until it is notified (siginal) or interrupts.

AwaitUninterruptibly (): The current thread enters the wait state until notified and is not interrupt-sensitive.

AwaitNanos (long timeout): The current thread enters the wait state until it is notified (siginal), interrupted, or timed out.

AwaitUnitil (Date deadTime): The current thread enters the wait state until it is notified (siginal), interrupted, or reaches a certain time.

Signal (): Wakes up a thread waiting on Condition that must acquire the lock associated with the Condition before returning from the wait method

SignalAll (): Wakes up all threads waiting on Condition. Threads that can return from the wait method must acquire the lock associated with Condition.

The implementation of condition: First, there is an internal condition queue, which stores all the threads waiting on the condition.

Await series of methods: get the thread currently holding the lock to release the lock, wake up a thread waiting for the lock on the CLH queue, create a node node for the current thread, insert it into the Condition queue (note not insert it into the CLH queue)

Signal: No thread is actually woken up. Instead, wait nodes on the Condition queue are inserted into the CLH queue, so a thread in the CLH queue is woken up when the thread holding the lock finishes releasing the lock.

Await related methods

// let the thread currently holding the lock block and wait, and release the lock. Public final void await() throws InterruptedException {// If the current thread is interrupted, If (Thread.interrupted()) throw new InterruptedException(); // Create a new Node for the current thread and insert this Node into the Condition queue. // Release the lock held by the current thread and wake up the synchronization queue header int savedState = fullyRelease(node); int interruptMode = 0; // If the current node fills the synchronization queue; // Block the current thread. When the current thread is woken up by signal, it adds the current node to the synchronization queue. // Wait to get the lock while (! isOnSyncQueue(node)) { LockSupport.park(this); / / check whether interruption into team waiting for locks the if ((interruptMode = checkInterruptWhileWaiting (node))! = 0) break; If (acquireQueued(node, savedState) && interruptMode! = THROW_IE) interruptMode = REINTERRUPT; Condition if (node.nextwaiter! = null) // clean up if cancelled unlinkCancelledWaiters(); // Whether to throw an exception or issue an interrupt request if (interruptMode! = 0) reportInterruptAfterWait(interruptMode); }Copy the code
// Create a new Node for the current thread and insert it into the Condition queue. Private Node addConditionWaiter() {Node t = lastWaiter; // If the status of the end node of the wait queue is not CONDITION, the operation is cleared. // Clear the queue if (t! = null && t.waitStatus ! = Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } // Create a new Node(thread.currentThread (), node.condition); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }Copy the code
Private void unlinkCancelledWaiters() {// Run the wait queue from start to finish, remove the waiters that are not in the CONDITION state. Node trail = null; while (t ! = null) { Node next = t.nextWaiter; if (t.waitStatus ! = Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; }}Copy the code

The isOnSyncQueue and reportInterruptAfterWait methods are called to determine whether a node is released in the synchronization queue and whether an exception is thrown or a re-interrupt is performed based on the current mode

Signal related methods

Public final void signal() {// If the current thread is not an exclusive lock, throw an exception if (! isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; If (first! = null) doSignal(first); }Copy the code
Private void doSignal(Node first) {private void doSignal(Condition first) {private void doSignal(Condition first) { // If the new Condition head is null, the Condition queue is empty. If ((firstWaiter = first.nextwaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (! transferForSignal(first) && (first = firstWaiter) ! = null); }Copy the code
// Returns true to indicate that the node node is inserted into the synchronization queue, Final Boolean transferForSignal(node node) {// If the node status cannot be changed from CONDITION to 0, the node is in the synchronization queue. Return false if (! compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; P = enq(node); p = enq(node); int ws = p.waitStatus; // If the previous Node is cancelled, or it cannot be set to node.signal. After p / / that node will not initiate wakes up the next node node thread operation, / / so here direct call LockSupport. Unpark (node. Thread) method, awakens the node node place thread if (ws > 0 | |! compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }Copy the code