Mind mapping

Star: learningSummary

What is AQS

AQS about concurrent programming, have to say (AbstractQueuedSynchronizer), it is one of the master son in Doug Lea. AQS is the abstract queue synchronizer, which is the basic framework used to build locks and synchronization components. Many well-known locks and synchronization components are built based on AQS. Examples include ReentrantLock, ReentrantReadWriteLock, CountDownLatch, and Semaphore.

In fact, AQS is an abstract class, we might as well take a look at the source:

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    / / head node
    private transient volatile Node head;
    / / end nodes
    private transient volatile Node tail;
    // Share status
    private volatile int state;
    
  	// Inner class to build the linked list of nodes
	static final class Node {
        volatile Node prev;
        volatile Node next;
        volatileThread thread; }}/ / AbstractQueuedSynchronizer parent class
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
    // The thread that occupies the lock
    private transient Thread exclusiveOwnerThread;
}
Copy the code

From the source can be seen that AQS is composed of the following parts:

1.1 State Shared variables

An important field in AQS, state, which indicates synchronization status, is volatile and is used to show the current lock status of critical resources. It is maintained by getState(), setState(), and compareAndSetState().

private volatile int state;

protected final int getState(a) {
    return state;
}
protected final void setState(int newState) {
    state = newState;
}
/ / CAS operation
protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
Copy the code

A few key points about state:

  • Use volatile qualifiers to ensure visibility across multiple threads.
  • GetState (), setState(), and compareAndSetState() use final modifier to restrict subclasses from overwriting them.
  • CompareAndSetState () adopts CAS algorithm with optimistic locking idea to ensure atomic operation.

1.2 the CLH queue

Another important concept in AQS is CLH queue, which is a bidirectional linked list queue. The head and tail record the head and tail respectively inside the queue. The element type of the queue is Node.

A brief description of the function of this queue is that when a thread fails to obtain the synchronization state, AQS will encapsulate the thread and the waiting state and other information into Node and add it to the queue, and at the same time block the thread and wait for the subsequent wake up.

The elements of a queue are Node nodes. Here is the composition of Node nodes:

static final class Node {
    // Wait flags in shared mode
    static final Node SHARED = new Node();
    // Wait flags in exclusive mode
    static final Node EXCLUSIVE = null;
    // Indicates that the thread of the current node is cancelled due to timeout or interruption
    static final int CANCELLED =  1;
    // Indicates that threads from subsequent nodes of the current node need to run, i.e., through the unpark operation
    static final int SIGNAL    = -1;
    // Indicates that the current node is in the condition queue
    static final int CONDITION = -2;
    // In shared mode, which means that subsequent nodes propagate wakeup operations
    static final int PROPAGATE = -3;
    // State, including the above four state values, the initial value is 0, generally is the initial state of the node
    volatile int waitStatus;
    // A reference to the previous node
    volatile Node prev;
    // Reference to the next node
    volatile Node next;
    // The thread reference stored at the current node
    volatile Thread thread;
    // Subsequent nodes of the condition queue
    Node nextWaiter;
}
Copy the code

1.3 exclusiveOwnerThread

AQS through inheritance AbstractOwnableSynchronizer class, has the attribute. Represents the owner of a synchronizer in exclusive mode.

Ii. Realization principle of AQS

There are two modes of AQS, exclusive and shared.

2.1 exclusive

Only one thread holds a synchronous state at a time, meaning that other threads can only compete if the occupying thread is released, such as ReentrantLock. The following from the source code, comb exclusive implementation ideas.

First look at the acquire() method, which is how AQS get the synchronization state in exclusive mode.

public final void acquire(int arg) {
    if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code

Let’s start with the general idea of this approach:

  • TryAcquire () attempts to acquire the resource directly and returns if successful.
  • If this fails, the addWaiter() method is called to insert the current thread wrapped as Node(in EXCLUSIVE state, marked as EXCLUSIVE mode) to the end of the CLH queue.
  • The acquireQueued() method then causes the thread to block in the waiting queue to acquire the resource until it has acquired it and returns true if it has been interrupted during the entire wait, false otherwise.
  • The thread has been interrupted while waiting and is not responding. SelfInterrupt () is performed only after the resource has been retrieved, reclaiming the interrupt.

Let’s expand and look at the tryAcquire() method, which attempts to acquire the resource and returns true on success, false on failure.

// Throw an exception directly, which is implemented by subclasses and reflects the idea of the template pattern
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
Copy the code

Why is it not implemented? This is actually the idea of the template pattern. This method is an attempt to obtain resources, but there are many ways to obtain resources, such as fair locks and unfair locks. (more on that later.) So here’s a method that doesn’t have a concrete implementation, and needs to be implemented by subclasses.

Next, look at the addWaiter() method, which wraps the current thread as a Node and adds it to the queue.

private Node addWaiter(Node mode) {
    // Wrap the current thread as a Node
    Node node = new Node(Thread.currentThread(), mode);
    // Get the end node
    Node pred = tail;
    // Check whether the tail is null, if not, then the queue has been initialized
    if(pred ! =null) {
        // Add the Node Node directly to the end of the queue
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            // Return Node containing the current thread
            returnnode; }}If the queue is not initialized, the enq() method is called
    enq(node);
    return node;
}
Copy the code

Then we look at the enq() method, which is a spin operation that adds the passed node to the end of the queue and initializes the queue if it is not initialized.

private Node enq(final Node node) {
    // The spin operation, i.e., an infinite loop, will return only if it succeeds in joining the queue
    for (;;) {
        // Assign the end to t
        Node t = tail;
        // If null, no initialization is performed
        if (t == null) { // Must initialize
            // Create an empty Node and set it to a header
            if (compareAndSetHead(new Node()))
                // Then assign the head node to the tail node
                tail = head;
        } else {
            // If the first loop is empty, a Node has been created, and the second loop will not be empty
            // If the tail node is not empty, point the precursor node of the passed node to the tail node
            node.prev = t;
            // Cas atomically sets the passed node as a tail
            if (compareAndSetTail(t, node)) {
                // Point the original tail-drive node to the passed node
                t.next = node;
                returnt; }}}}Copy the code

Then we’ll jump back to the top-level method and look at the acquireQueued() method.

// The node in the queue acquireQueued() to obtain the resource, ignoring the interrupt.
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // Spin operation, an infinite loop
        for (;;) {
            // Get the precursor node of the passed node and assign it to p
            final Node p = node.predecessor();
            // If p is the head node and node is the second node, try again to get the resource
            if (p == head && tryAcquire(arg)) {
                // If tryAcquire(arg) succeeds, set node to the head node
                setHead(node);
                // set the backend node of the original header node p to null, waiting for GC garbage collection
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // If p is not the head, or tryAcquire() fails to acquire the resource, determine whether it can be park, i.e. block the thread
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())//&& if true is returned, the current thread is blocked and checked for interruption
                // Set the interrupt flag to true if the blocking process is interrupted.
                interrupted = true; }}finally {
        if(failed) cancelAcquire(node); }}Copy the code

Finally, there’s the selfInterrupt() method, a self-interrupt.

static void selfInterrupt(a) {
    Thread.currentThread().interrupt();
}
Copy the code

It doesn’t matter if you don’t remember the process, let’s draw a picture to sum it up. It’s actually very simple.

2.2 Shared

That is, a shared resource can be occupied by multiple threads until the shared resource is fully occupied. Examples include ReadWriteLock and CountdownLatch. Below we go to analyze its implementation principle from the source code.

Let’s start by looking at the acquireShared() method at the top.

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}
Copy the code

TryAcquireShared (), tryAcquireShared() returns an int. If the value is greater than or equal to 0, the lock has been successfully obtained. If the value is not negative, the lock has failed. Execute the doAcquireShared method.

The tryAcquireShared() method is a template method overridden by subclasses, meaning that the implementation class defines how to get synchronized resources. AQS is just a framework.

Look at the doAcquireShared() method executed if the resource fails to be fetched.

private void doAcquireShared(int arg) {
    // Call the addWaiter() method to wrap the current thread as a Node, mark it shared, and insert it into the queue
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // Get the precursor node of the current node
            final Node p = node.predecessor();
            // Whether the precursor node is a header
            if (p == head) {
                // Call tryAcquireShared() to get synchronized resources if the precursor node is a header
                int r = tryAcquireShared(arg);
                //r>=0 indicates that the synchronization resource is successfully obtained. If the synchronization resource is successfully obtained, a return is executed to exit the for loop
                if (r >= 0) {
                    // Set node to the head node
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return; }}// Check whether the thread can be park, as in exclusive logic, return true, park, block the thread
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true; }}finally {
        if(failed) cancelAcquire(node); }}Copy the code

This logic is basically the same as exclusive logic, except that the joined Node is marked as SHARED and the method of obtaining synchronized resources is tryAcquireShared().

Third, AQS template mode

The application of the template pattern in AQS is one of the key elements. TryAcquireShared () and tryAcquire() mentioned above are important template methods. The common use of AQS is to use an inner class to inherit the AQS and then override the corresponding template methods.

AQS has put some commonly used, such as queue, queue, CAS operation and so on to build a framework, users only need to achieve the acquisition of resources, release of resources, because many locks, and synchronizer, in fact, is the way to obtain resources and release resources are quite different.

So let’s look at what template methods are.

3.1 tryAcquire ()

TryAcquire (), which exclusively obtains synchronous resources. The tryAcquire() method returns true if the synchronization resources have been obtained successfully, and false if the synchronization resources have failed.

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
Copy the code

3.2 tryRelease ()

The tryRelease() method exclusively uses the return value of tryRelease() to determine whether the thread has completed releasing the resource, and subclasses to determine whether the lock has been successfully released.

protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}
Copy the code

3.3 tryAcquireShared ()

The tryAcquireShared() method is used to obtain synchronized resources in shared mode. If the value is greater than or equal to 0, the resource is successfully obtained; if the value is smaller than 0, the resource is failed.

protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}
Copy the code

3.4 tryReleaseShared ()

Method tryReleaseShared(), a shared attempt to release the resource, returns true if it is allowed to wake up subsequent wait nodes, false otherwise.

protected boolean tryReleaseShared(int arg) {
	throw new UnsupportedOperationException();
}
Copy the code

3.5 isHeldExclusively ()

IsHeldExclusively () method to see if the thread is hogging the resource. You only need to implement it if you use condition.

protected boolean isHeldExclusively(a) {
    throw new UnsupportedOperationException();
}
Copy the code

Get to know ReentrantLock

ReentrantLock is a classic example of using AQS as a starting point for further research. ReentrantLock has many features. First, it is a pessimistic lock, second, there are two modes of fair and unfair locking, and finally, it is a ReentrantLock, that is, the ability to repeatedly lock shared resources.

AQS are usually implemented using inner classes, so it’s not hard to imagine two inner classes in the ReentrantLock class. Let’s look at a class diagram.

FairSync is an implementation of a fair lock, while NonfairSync is an implementation of an unfair lock. Judge by the Boolean value passed in by the constructor.

public ReentrantLock(boolean fair) {
    // True uses a fair lock; false uses an unfair lock
    sync = fair ? new FairSync() : new NonfairSync();
}
// Default is unfair lock
public ReentrantLock(a) {
    sync = new NonfairSync();
}
Copy the code

Fair lock is to follow the FIFO (first in, first out) principle, the first to the thread will be the first to obtain resources, the last to the thread will be queued to wait, to ensure that each thread can get the lock, there will be no thread starvation.

Non-fair locks do not comply with the principle of first-in, first-out, there will be threads jumping the queue, can not guarantee that every thread can get the lock, there will be threads starving to death.

Let’s find out the difference between these two locks from source code analysis.

Source code analysis ReentrantLock

5.1 lock

ReentrantLock is locked by the lock() method, so look at the lock() method.

public void lock(a) {
    sync.lock();
}
Copy the code

Sync is NonfairSync or FairSync.

AQS acquire()
final void lock(a) {
    acquire(1);
}
Copy the code

The acquire() method has been parsed previously, looking at FairSync’s tryAcquire() method.

protected final boolean tryAcquire(int acquires) {
    // Get the current thread
    final Thread current = Thread.currentThread();
    // Get the synchronization status
    int c = getState();
    // Check whether the synchronization status is 0
    if (c == 0) {
        // The key here is that the fair lock determines whether a queue is needed
        if(! hasQueuedPredecessors() &&// If no queue is required, the cas operation updates the synchronization status to 1
            compareAndSetState(0, acquires)) {
            // Set the thread holding the lock to the current thread
            setExclusiveOwnerThread(current);
            // Returns true, indicating that the lock is successful
            return true; }}// Determine whether the current thread is a thread with a lock, mainly reentrant lock logic
    else if (current == getExclusiveOwnerThread()) {
        // If it is the current thread, the synchronization status is +1
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        // Set the synchronization status
        setState(nextc);
        return true;
    }
    // If none of the above is true, return false, indicating that the lock failed. Lock failure according to the AQS framework design, will join the queue
    return false;
}
Copy the code

If the tryAcquire() of NonfairSync is not a fair lock, we continue analysis.

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
// Get the lock unfairly
final boolean nonfairTryAcquire(int acquires) {
    // This section is the same operation as the fair lock
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // The key here is not to check whether you need to queue or not, but to update the synchronization status directly
        if (compareAndSetState(0, acquires)) {
            // If the synchronization status is successfully obtained, the thread holding the lock is set to the current thread
            setExclusiveOwnerThread(current);
            // Return true to obtain the lock successfully
            return true; }}// The following logic is the same as fair lock logic
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
Copy the code

In fact, the key difference is that when trying to acquire the lock, the fair lock will determine whether the synchronization status needs to be queued before updating, while the unfair lock will directly update the synchronization without determining whether the synchronization status needs to be queued.

In terms of performance, fair locks perform worse than non-fair locks because fair locks adhere to FIFO(first-in, first-out) principles, which increase context switching and state change time for waiting threads.

The disadvantages of unfair locking are also obvious, since queue jumping is allowed, and there are threads that starve to death.

5.2 the unlock

The corresponding method to unlock is unlock().

public void unlock(a) {
    Call the release() method in AQS
    sync.release(1);
}
// This is the release() method defined by the AQS framework
public final boolean release(int arg) {
    // If the lock is not held by any thread, return true to indicate that the lock is not held by any thread
    if (tryRelease(arg)) {
        // get the header h
        Node h = head;
        // Check whether the header is null and waitStatus is not the initialization node state
        if(h ! =null&& h.waitStatus ! =0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
Copy the code

The key is tryRelease(), which eliminates the need for fair and unfair locking cases, just reentrant logic.

protected final boolean tryRelease(int releases) {
    // Reduce the number of reentrants
    int c = getState() - releases;
    Throw an exception if the current thread is not the thread holding the lock
    if(Thread.currentThread() ! = getExclusiveOwnerThread())throw new IllegalMonitorStateException();
    boolean free = false;
    // If all holding threads are released, set all threads of the current exclusive lock to NULL and update state
    if (c == 0) {
        // The state is 0, indicating that all threads are released. Set to true
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}
Copy the code

conclusion

JUC can be said to be a difficulty in learning Java, while the key to learning AQS lies in concurrent thinking, because there are many situations to consider, and secondly, it is necessary to understand the idea of template pattern, so as to understand why AQS functions as a framework. I think ReentrantLock is a good starting point for understanding AQS, and it should be much easier to look at other AQS application classes.

So that’s all for this article. I hope I can get something out of it. Thank you for reading.

Please give me a thumbs-up if you think it is useful. Your thumbs-up is the biggest motivation for my creation

I’m a programmer who tries to be remembered. See you next time!!

Ability is limited, if there is any mistake or improper place, please criticize and correct, study together!