AbstractQueuedSynchronizer (hereinafter referred to as AQS or AQS lock) is already the underlying implementation, it provides a spin, FIFO threads waiting queue, and congestion, and other functions. Common Java concurrent synchronization tools such as Semaphore, CountDownLatch, and ReentrantLock are implemented based on AQS.
The main points of AQS are summarized as follows:
1. Use an atomic int variable to represent the synchronization state
There is an atomic int variable (named state) inside the AQS, which is the core state of the AQS and the only variable associated with synchronization. For example, in ReentrantLock, state≠0 indicates that the lock is occupied, and state=0 indicates that the lock is idle. Subclasses of AQS are responsible for giving specific meaning to state, Operate on state with optimistic locking by overwriting tryAcquire(), tryRelease(), tryAcquireShared(), tryReleaseShared(), isHeldExclusively(). Also, state is only allowed to operate on getState(), setState(), and compareAndSetState() to ensure visibility and atomicity.
2. Use a CLH queue to store waiting threads, with one node for each thread, divided into exclusive mode and shared mode.
A CLH queue (actually a variant of a CLH queue) is a queue data structure used to implement spinlocks, the body of which is a linked list, as shown in the figure below.
CLH queue
In CLH queues, each thread’s waitState (waitState variable) is stored at the previous node, with values of 1 (CANCELED), -1 (SIGNAL), -2 (CONDITION), or CANCELED. Nodes in this state cannot appear on the CLH queue, they can only appear on the Condition queue), -3(PROPAGATE, the next thread waiting in shared mode is unconditionally woken up, that is, the wake action can continue to the queue), 0 (ready, this state means that the thread is not blocked and is ready to request the lock).
3. Modify the CLH queue by performing the loop +CAS operation
Because THE AQS itself is used to implement locking, CLH queues in the AQS are not protected by locks and must support concurrent modification. How to do? The thread-safety of CLH queue operations is achieved through loops combined with CAS operations. For example, enq (),
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; }}}}
4. How to implement lock operation
Much like the CLH queue operation, the lock operation is implemented through the loop +CAS operation, but also uses the thread blocking method locksupport.park (). We take the uninterruptible lock operation as an example to explain its main implementation logic as follows: In an infinite loop, first call tryAcquire(), an optimistic lock operation. If it succeeds, the lock operation will return directly. If this fails, determine the previous waitState of the current thread node: CANCELED: indicates that the CLH queue is CANCELED. CANCELED: indicates that the CLH queue is CANCELED. CANCELED: indicates that the CLH queue is CANCELED. Continue the loop. If the previous blocking operation is woken up by another thread, the loop is executed again. Reference code:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }}Copy the code
There are timing versions and interruptible versions of locking operations, but the general logic is the above, but in the loop again some time judgment, interrupt flag judgment, and so on, and reflected in the return result or throw exceptions.
To summarize, AQS locks in a loop, repeatedly executing CAS locks (return on success, continue on failure), then blocking and waking up (do some CLH queue cleaning along the way each time you wake up), then executing CAS locks again.
5. Lock operation optimization
The lock operation in AQS, acquire(), is a Barge optimization. When the lock is released, one thread requests the lock, and the first thread in the queue wakes up and requests the lock. It is not clear which of the two threads will get the lock. This optimization improves the concurrency of the AQS lock, but makes it an unfair lock (not operating according to THE FIFO principle).
public final void acquire(int arg) { if (! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code
6. How to unlock the account
The implementation of unlock operation is relatively simple. First, call a CAS unlock operation (tryRelease) implemented in the subclass. If it succeeds, the first node in the queue will be removed from the queue to determine whether the next thread needs to be woken up.
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h ! = null && h.waitStatus ! = 0) unparkSuccessor(h); return true; } return false; }Copy the code
conclusion
The above basic AQS of the main implementation of the logic summed up. The main logic of AQS is to operate CLH queue with loop +CAS to ensure the thread safety of concurrent access to CLH queue. The AQS lock (blocking) and AQS unlock operations rely on the non-blocking CAS lock (tryAcquire) and CAS unlock (tryRelease) implementations implemented in subclasses. The lock operation is also performed in a loop, where tryAcquire is called repeatedly for CAS lock operation, blocking wake up until tryAcquire succeeds or the thread cancels the wait.