1. Introduce AQS


AQS for (AbstractQueuedSynchronizer) in the whole process of the class in Java. The util. Concurrent. The locks under the package


public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable 
Copy the code


AQS is a framework for implementing blocking locks and synchronators, relying on first-in, first-out (FIFO) wait queues, known as CLH synchronization queues, that follow FIFO principles, such as ReentrantLock, ReentranSemaphoretLock, Others such as ReentrantReadWriteLock, SynchronousQueue, FutureTask(JDK1.7), and so on are based on AQS. Of course, we can also use AQS to construct synchronizers for our own needs very easily and easily.


2. The principle of AQS


The core idea of AQS is that if the requested shared resource is idle, the thread of the current requested resource is set as a valid worker thread, and the shared resource is set as a locked state. If the requested shared resource is occupied, then a thread blocking wait and wakening lock allocation mechanism is required. This mechanism is implemented with CLH queue lock, which is to queue temporarily unavailable locks.

CLH (Craig,Landin,and Hagersten) queue is a virtual bidirectional queue (virtual bidirectional queue has no queue instance, only the association relationship between nodes). The native CLH queue works for spin locks, but Doug Lea changed it to block locks. AQS encapsulates each thread requesting a shared resource into a Node of the CLH lock queue to allocate locks



AQS uses an int member variable to identify synchronization status, and uses built-in but FIFO queues to queue up resources.


private volatile int state;



protected final int getState(a) {
        return state;
}

    
protected final void setState(int newState) {
       state = newState;
}
Copy the code

AQS defines two resource sharing modes

  • Exclusive execution can be performed by only one thread, such as ReentrantLock

  • Share can be executed simultaneously by multiple threads, such as Semaphore/CountDownLatCh, etc

3. Source code analysis


AQS template method mode


Synchronizer design is based on the template method pattern, if the need to customize the synchronizer general way

  1. Users inherit AbstractQueuedSynchronizer and rewrite the specified method, for the Shared resource acquisition and release of the state
  2. Combine AQS in an implementation of a custom synchronous component and call its template methods, which invoke user-overridden methods

Template mode Portal: Template method mode


AQS uses the template method pattern. To customize the synchronizer, you need to rewrite the following template methods provided by AQS:


SHeldExclusively () // Whether the thread is monopolizing the resource. You only need to implement it if you use condition. TryAcquire (int) // Exclusive. Attempts to obtain the resource return true on success and false on failure. TryRelease (int) // Exclusive. Attempts to free resources return true on success and false on failure. TryAcquireShared (int) // Share mode. Attempt to obtain resources. Negative numbers indicate failure; 0 indicates success, but no available resources are available. A positive number indicates success and free resources. TryReleaseShared (int) // Share mode. Attempts to free resources return true on success and false on failure.

In the case of ReentranLock, state is initialized to 0, indicating that the state is not locked. When thread A locks (), it calls tryAcquire() exclusively and sets state +1. After that, another thread will fail to tryAcquire() until the unlock() of thread A reaches state = 0. Of course, before releasing the lock, thread A can acquire the lock repeatedly (state will accumulate), which is the concept of reentrant. However, it should be noted that as many times as the lock is acquired, it should release as many times as possible, so as to ensure that the state can return to 0.

In general, custom synchronizers are either exclusive or shared methods, and they only need to implement either == tryacquire-TryRelease == or == TryAcquireshared-TryReleaseshared ==. However, AQS also supports both exclusive and shared custom synchronizers, such as ReentrantReadWriteLock.


Lock (exclusive lock)


ReentranLock as an example ReentranLock lock method source (implementation of unfair lock)

static final class NonfairSync extends Sync {

        private static final long serialVersionUID = 7316153563782823691L;

        /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */
        final void lock(a) {
        	// CAS lock grab changes the state value
            if (compareAndSetState(0.1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            returnnonfairTryAcquire(acquires); }}Copy the code

AQS internal acquire() method



public final void acquire(int arg) {
        if(! tryAcquire(arg)// tryAcquire is overridden by subclasses
        	&&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

Copy the code
  • The tryAcquire method is overridden by subclasses. TryAcquire attempts to acquire an exclusive lock, and returns on success (a representation of an unfair lock).

  • The addWaiter method will wrap the current thread as a Node, add it to the end of the queue, and return the Node Node after tryAcquire returns false.

  • AcquireQueued adds the Node returned by addWaiter to the end of the queue, acquires the lock and decides whether to suspend the current thread.

  • SelfInterrupt is the thread.currentThread ().interrupt() method call in AQS. Its main function is to perform the interrupt operation before completing acquire

The ReentranLock subclass overrides the tryAcquire() method

 protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
}
Copy the code
final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true; }}else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
Copy the code
  • If c == 0, no thread is competing for the lock. Obtain the lock through CAS and return true

  • If current == getExclusiveOwnerThread() is the current thread that holds the lock, it is simply ++acquires and modifies the status value.

AQS internal addWaiter() method

 private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if(pred ! =null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
Copy the code
  • The addWaiter method wraps the currently unavailable thread as a Node and adds it to the end of the queue.
  • If the current queue tail already exists (tail! = null), CAS is used to update the current thread to tail
  • CAS may not always succeed, because in concurrent scenarios, operations may fail and enQ methods need to be called, which spin to enqueue nodes

AQS internal ENQ () method

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    returnt; }}}}Copy the code
  • Spin for loop + CAS enqueue.
  • When the queue is empty, a new node is created, the tail node points to the head node, and the loop continues.
  • On the second loop, the current thread is added to the end of the queue as a node.

AQS internal acquireQueued () method

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true; }}finally {
            if(failed) cancelAcquire(node); }}Copy the code
  • With infinite loops, if the p == head && tryAcquire(ARG) condition is not met the loop will never end, and there will be no infinite loops because the parkAndCheckInterrupt() method suspends the current thread, thus blocking the thread’s call stack.
private final boolean parkAndCheckInterrupt(a) {
        LockSupport.park(this);
        return Thread.interrupted();
    }
Copy the code
  • Locksupport. park ultimately hands off threads to the system (Linux) kernel for blocking. Also not immediately lock request is less than the threads blocked, but check the status of this thread, check in shouldParkAfterFailedAcquire concrete

AQS internal status waitStatus

Static final int CANCELLED = 1

Static final int SIGNAL = -1; static final int CONDITION = -2; static final int SIGNAL = -1; When another thread calls Condition’s signal() method, the node of the Condition state is moved from the wait queue to the synchronization queue, waiting for the synchronization lock.



In the shared mode with static final int PROPAGATE = -3, the first parent not only wakes up the next parent, but also may wake up the next parent.

AQS internal shouldParkAfterFailedAcquire () method

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /* * This node has already set status asking a release * to signal it, so it can safely park. */
            return true;
        if (ws > 0) {
            /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
Copy the code
  • If the state of the preceding node is SIGNAL, the current node requires unpark and is returned successfully, the acquireQueued parkAndCheckInterrupt method blocks the thread
  • If the status of the successor node is CANCELLED(WS >0), the preceding node has been abandoned, false is returned, and the infinite loop of acquireQueued method will recursively call this method until the status of the successor node is SIGNAL, and true is returned. Cause the thread to block
  • If it works, set its status to SIGNAL and tell it to notify itself when it’s done.


Release lock (exclusive lock)

public void unlock(a) {
        sync.release(1);
    }
Copy the code

AQS internal Release () method

public final boolean release(int arg) {
		// tryRelease determines the release of the resource subclass overwrite
        if (tryRelease(arg)) {
            Node h = head;
            / / h.w. aitStatus! = 0 indicates that the standby node is waiting to wake up
            if(h ! =null&& h.waitStatus ! =0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
Copy the code
  • The release() method, the top-level interface for threads to release resources in exclusive mode, will release a specified amount of resources


The ReentranLock subclass overrides the tryRelease() method

protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if(Thread.currentThread() ! = getExclusiveOwnerThread())throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
Copy the code
  • Thread.currentThread() ! = getExclusiveOwnerThread () to determine whether the current thread in the call, not throw IllegalMonitorStateException anomalies
  • If c ==0 releases the lock and sets the current lock holding thread to NULL
  • SetState (c) Sets the state value

AQS internal unparkSuccessor ()

private void unparkSuccessor(Node node) {
        /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
                    s = t;
        }
        if(s ! =null)
            LockSupport.unpark(s.thread);
    }
Copy the code
  • If WS < 0, CAS is used to clear the current node
  • If s = = null | | s. aitStatus > 0 for subsequent threads
  • If s! = null wakes up subsequent threads with locksupport. unpark(s.read)


Locking (shared locking)


Semaphore for example Semaphore shared locks use a portal: Semaphore Semaphore


Semaphore semaphore = new Semaphore(2.false);


public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

Copy the code

By default, you just need to pass into permits license number is allowed to release a few threads, structure can be fulfilled through the way of true/false fair lock, not fair lock


Initial number of licenses:

NonfairSync(int permits) {
            super(permits);
        }

FairSync(int permits) {
            super(permits);
        }


Sync(int permits) {
            setState(permits);
        }

protected final void setState(int newState) {
        state = newState;
    }
Copy the code

When the constructor is initialized, the number of states in the AQS is set for both fair and unfair locks. This value is the value deducted and increased for the semaphore that can be obtained below.

Acquire () method

Semaphore semaphore = new Semaphore(2.false);
semaphore.acquire();


public void acquire(a) throws InterruptedException {
		// call a method inside AQS
        sync.acquireSharedInterruptibly(1);
    }

Copy the code

AQS internal acquireSharedInterruptibly () method

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
         // Determine whether the thread is aborted
        if (Thread.interrupted())
            throw new InterruptedException();
          The tryAcquireShared method is implemented as follows
        if (tryAcquireShared(arg) < 0)
         	/ / doAcquireSharedInterruptibly method is as follows
            doAcquireSharedInterruptibly(arg);
    }
Copy the code

Semaphore subclass override tryAcquireShared (fair lock)

static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            super(permits);
        }
		// Override the AQS tryAcquireShared() method
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    returnremaining; }}}Copy the code
  • TryAcquireShared, a shared attempt to obtain the resource. Negative numbers indicate failure; 0 indicates success, but no available resources are available. A positive number indicates success and free resources.

  • Hasqueuedboomthe main implementation of a fair lock, whose purpose is to determine whether a thread is in front of it and to add a thread to the queue logical implementation.

  • In the process of for spin, CAS is used to set the corresponding value of state offset, so as to avoid the conflict of competing semaphore acquisition in multiple threads.

  • GetState (), which initializes the state value in the constructor, obtains the semaphore using continuous CAS deductions.

AQS internal doAcquireSharedInterruptibly method

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        // SHARED serves as a marker to determine whether it is SHARED
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                	// tryAcquireShared is used to get the synchronization status
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return; }}if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw newInterruptedException(); }}finally {
            if(failed) cancelAcquire(node); }}Copy the code
  • DoAcquireSharedInterruptibly method from AQS internal first.
  • For spin If p == head is in tryAcquireShared to see if the thread has acquired the lock.
  • If r > 0 setHeadAndPropagate(node, r), the current thread is set as the head node after successful synchronization, and helpGC, p.next = null, the chain is broken.
  • ShouldParkAfterFailedAcquire (p, node), adjust the synchronization state of node nodes in the queue, and determine whether it should be hung.
  • ParkAndCheckInterrupt (), which determines whether an interrupt is required. If the interrupt throws an exception, the current node request ends.
  • CancelAcquire (node), cancels the thread request for the node.

Release the lock (shared lock)


public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }
Copy the code
  • Call release to release resources

AQS internal releaseShared method

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
Copy the code
  • TryReleaseShared attempts to obtain the resource
protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true; }}Copy the code
  • GetState () gets the current resource status
  • If next < current is negative, an error is returned otherwise the state variable is set through the CAS operation

AQS internal doReleaseShared method

private void doReleaseShared(a) {
        
        for (;;) {
            Node h = head;
            if(h ! =null&& h ! = tail) {int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if(! compareAndSetWaitStatus(h, Node.SIGNAL,0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break; }}Copy the code
  • If h! = null && h ! There are nodes in the queue if the head node is not empty and the head node is not the tail node
  • H. waitStatus Check Current waitStatus If the current node is SIGNAL and the waitStatus of h fails to be set to 0, the CAS spin is set until successful
  • The unparkprecursor (H) starts the unpark precursor node
  • If ws == 0 &&! CompareAndSetWaitStatus (h, 0, PROPAGATE) If waitStatus is 0, the CAS is set to PROPAGATE for the next loop
  • Break if h == head if the head nodes are the same, otherwise loop continuously

“When there is a high wind, life does not give up”