“This is the 29th day of my participation in the First Challenge 2022. For details: First Challenge 2022”

ReentrantLock (AQS) ReentrantLock (AQS) ReentrantLock (AQS) ReentrantLock

You can learn about ReentrantLock at juejin.cn/post/706288…

AbstractQueuedSynchronizer is blocking type lock, as well as the implementation framework of synchronizer components. Is the core of concurrent programming in JDK, it provides a FIFO based queue, usually we often use in the work of ReentrantLock, CountDownLatch and so on are based on it to achieve.

I. First acquaintance with AQS

Let’s start with ReentrantLock and see what its code structure looks like:

Here’s what you can see from the image above:

  • ReentrantLock Implementation interface Lock (Abstract interface)
  • ReentrantLock has three internal classes, FrairSync, NonfairSync, and Sync, and FrairSync and NonfairSync inherit from Sync.
  • Sync inheritance AbstractQueuedSynchronizer
  • AbstractQueuedSynchronizer has two inner classes, is a Node, ConditionObject respectively.
  • AbstractQueuedSynchronizer inherited from AbstractOwnableSynchronizer (abstract classes, offer exclusive threads get and set).

AQS has the following characteristics:

  • The state attribute is used to represent the resource status, including the exclusive state and the shared state, corresponding to the fair lock and the unfair lock. Subclasses need to define how to maintain this state and control how to acquire and release locks. As shown in the diagram above, fair locks and unfair locks need to maintain this state separately to achieve the purpose of acquiring and releasing locks.

    • GetState – Gets the state status
    • SetState – Sets the state state
    • CompareAndSetState – Sets the state using CAS
    • Exclusive mode: Only one thread can access a resource
    • Shared mode: Allows multiple threads to access resources
  • A FIFO based wait queue is provided, similar to Monitor’s EntryList mentioned earlier in the Synchronized principle

  • Condition variables are used to implement the wait queue and thread wake up mechanism, and multiple condition variables are supported simultaneously, similar to the WaitSet of Monitor mentioned in the previous section of Synchronized principle

  • [Fair lock] and [unfair lock] : the difference between the two mainly lies in whether the lock acquisition is related to the queuing order. When a lock is held by one thread, other threads attempting to acquire the lock are suspended and added to the queue, with the first thread to be suspended at the front of the queue. When the lock is released, the thread in the queue needs to be notified. As a fair lock, the thread at the front of the queue is awakened first; Instead of a fair lock, all threads wake up and compete to acquire the lock, and subsequent threads may acquire the lock.

Second, source code analysis

Let’s take a look at the nature of the source code.

First of all, I have an impression in mind that AQS maintains two pairs of queues, one is synchronous queue, one is blocking queue.

Node can be described as a synchronization queue and a blocking queue Node.

2.1 Node source code analysis

  static final class Node {
    // The mode can be shared or exclusive
    // Share mode
    static final Node SHARED = new Node();
    // Exclusive mode
    static final Node EXCLUSIVE = null;        
    // Node state
    // CANCELLED, with a value of 1, indicating that the current thread was CANCELLED
    // SIGNAL, with a value of -1, indicates that the current node's successors contain threads that need to run, i.e., unpark
    // CONDITION, with a value of -2, indicates that the current node is waiting on CONDITION, i.e. in the CONDITION queue
    PROPAGATE, the value is -3, which means that the subsequent acquireShared in the current scene can be executed
    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;        

    // Node state
    volatile int waitStatus;        
    // The precursor node
    volatile Node prev;    
    // The next node
    volatile Node next;        
    // The thread corresponding to the node
    volatile Thread thread;        
    // The next person waiting
    Node nextWaiter;
    
    // Whether the node is waiting in shared mode
    final boolean isShared(a) {
        return nextWaiter == SHARED;
    }
    
    // Get the precursor. If the precursor is empty, throw an exception
    final Node predecessor(a) throws NullPointerException {
        // Save the precursor
        Node p = prev; 
        if (p == null) // If the driver is empty, an exception is thrown
            throw new NullPointerException();
        else // The precursor node is not empty, return
            return p;
    }
    
    // No argument constructor
    Node() {    // Used to establish initial head or SHARED marker
    }
    
    // Constructor, used by addWaiter
     Node(Thread thread, Node mode) {    // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }
    
    // constructor
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread; }}Copy the code

2.2 ConditionObject source analysis

Condition interface is implemented. Condition learning will be introduced later, and its use is also introduced when learning ReentrantLock.

More code, just look at it from the top down:

 / / inner classes
public class ConditionObject implements Condition.java.io.Serializable {
    / / version number
    private static final long serialVersionUID = 1173984872572414699L;
    
    Condition specifies the head of the condition queue
    private transient Node firstWaiter;
    
    // condition specifies the end of the queue
    private transient Node lastWaiter;

    /** * constructor */
    public ConditionObject(a) {}/** * add a new waiter to the wait queue */
    private Node addConditionWaiter(a) {
        // define the terminator to be t
        Node t = lastWaiter;
       / / end node is not null, and end the state of the node for CONDITION (default is 2, said the current node in conditionObject waiting queue)
        if(t ! =null&& t.waitStatus ! = Node.CONDITION) {// Clear nodes that are not in CONDITION and reassign firstWaiter and lastWaiter
            unlinkCancelledWaiters(); 
            // reassign the last node to t
            t = lastWaiter;
        }
        // Create a new node
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        // The tail is null
        if (t == null) 
            // Sets the head of the condition queue
            firstWaiter = node;
        else 
            // Set the nextWaiter field of the node to node
            t.nextWaiter = node;
        // Update the tail of the condition queue
        lastWaiter = node;
        return node;
    }

    /** * remove or transfer headers to sync queue until there are no cancelled or empty */
    private void doSignal(Node first) {
        / / loop
        do {
            // Set the next node to the first node, if null
            if ( (firstWaiter = first.nextWaiter) == null) 
                // Set the tail to null
                lastWaiter = null;
            // Set the nextWaiter field for first
            first.nextWaiter = null;
        } while(! transferForSignal(first) && (first = firstWaiter) ! =null); // Failed to transfer the node from the condition queue to sync queue and the head node in the condition queue is not empty
    }

    /** * Transfer all waiting nodes to synchronization queue */
    private void doSignalAll(Node first) {
        // condition sets the head and tail of the queue to null
        lastWaiter = firstWaiter = null;
        / / loop
        do {
            // Get the nextWaiter domain of first
            Node next = first.nextWaiter;
            // Set first's nextWaiter field to null
            first.nextWaiter = null;
            // Move the first node from the condition queue to sync queue
            transferForSignal(first);
            // Reset first
            first = next;
        } while(first ! =null);
    }

    /** * Reassign firstWaiter and lastWaiter to all non-condition nodes **/
    private void unlinkCancelledWaiters(a) {
        // Get the condition queue header
        Node t = firstWaiter;
        // Get a null tail
        Node trail = null;
        while(t ! =null) {
            // Get the next node
            Node next = t.nextWaiter;
            // The state of the header is not CONDTION
            if(t.waitStatus ! = Node.CONDITION) {// Set the next wait for t node to null
                t.nextWaiter = null;
                if (trail == null) // trail is empty
                    // Reset the head of the condition queue
                    firstWaiter = next;
                else 
                    // Set trail's nextWaiter field to next
                    trail.nextWaiter = next;
                if (next == null) // Next is empty
                    // Sets the end of the condition queue
                    lastWaiter = trail;
            }
            else // the state of t node is CONDTION
                // Set the trail node
                trail = t;
            // set t nodet = next; }}/** * Implements the Condition signal method */
    public final void signal(a) {
        if(! isHeldExclusively())// Not exclusive by the current thread, throws an exception
            throw new IllegalMonitorStateException();
        // Save the condition queue header
        Node first = firstWaiter;
        if(first ! =null) // The header is not null
            // Wake up a waiting thread, remove the head node from the blocking queue and add it to the synchronization queue
            doSignal(first);
    }

    /** * Implements the Condition signalAll method to wake up all threads */
    public final void signalAll(a) {
        if(! isHeldExclusively())// Not exclusive by the current thread, throws an exception
            throw new IllegalMonitorStateException();
        // Save the condition queue header
        Node first = firstWaiter;
        if(first ! =null) // The header is not null
            // Wake up all waiting threads, remove the head node from the blocking queue and add it to the synchronization queue
            doSignalAll(first);
    }

    The difference between /** * and await() is that with await methods, interrupt() is called and an error is reported, whereas this method does not. * /
    public final void awaitUninterruptibly(a) {
        // Add a node to the queue
        Node node = addConditionWaiter();
        // Get the freed state
        int savedState = fullyRelease(node);
        boolean interrupted = false;
        while(! isOnSyncQueue(node)) {// 
            // Block the current thread
            LockSupport.park(this);
            if (Thread.interrupted()) // The current thread is interrupted
                // Set the status to interrupted
                interrupted = true; 
        }
        if (acquireQueued(node, savedState) || interrupted) // 
            selfInterrupt();
    }

    /** * wait, the current thread is waiting until it receives a signal or is interrupted */
    public final void await(a) throws InterruptedException {
        // The current thread is interrupted, raising an exception
        if (Thread.interrupted()) 
            throw new InterruptedException();
        // Wrap the current thread as Node and insert the tail into the wait queue
        Node node = addConditionWaiter();
        // Release the lock held by the current thread, which wakes up the next node in the synchronization queue
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        while(! isOnSyncQueue(node)) {// The current thread enters the wait state
            LockSupport.park(this);
            if((interruptMode = checkInterruptWhileWaiting(node)) ! =0) // Check the interrupt type while the node is waiting
                break;
        }
        // Spin wait to get synchronization state (i.e. get lock)
        if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! =null) // clean up if cancelled
            unlinkCancelledWaiters();
        // Handle interrupts
        if(interruptMode ! =0)
            reportInterruptAfterWait(interruptMode);
    }

    /** * wait, the current thread is waiting until it receives a signal, is interrupted, or reaches the specified wait time */
    public final long awaitNanos(long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        int savedState = fullyRelease(node);
        final long deadline = System.nanoTime() + nanosTimeout;
        int interruptMode = 0;
        while(! isOnSyncQueue(node)) {if (nanosTimeout <= 0L) {
                transferAfterCancelledWait(node);
                break;
            }
            if (nanosTimeout >= spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
                break;
            nanosTimeout = deadline - System.nanoTime();
        }
        if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! =null)
            unlinkCancelledWaiters();
        if(interruptMode ! =0)
            reportInterruptAfterWait(interruptMode);
        return deadline - System.nanoTime();
    }

    /** * wait, the current thread waits until it receives a signal, is interrupted, or reaches a specified deadline */
    public final boolean awaitUntil(Date deadline)
            throws InterruptedException {
        long abstime = deadline.getTime();
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        int savedState = fullyRelease(node);
        boolean timedout = false;
        int interruptMode = 0;
        while(! isOnSyncQueue(node)) {if (System.currentTimeMillis() > abstime) {
                timedout = transferAfterCancelledWait(node);
                break;
            }
            LockSupport.parkUntil(this, abstime);
            if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
                break;
        }
        if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! =null)
            unlinkCancelledWaiters();
        if(interruptMode ! =0)
            reportInterruptAfterWait(interruptMode);
        return! timedout; }/** * Wait, the current thread waits until it receives a signal, is interrupted, or reaches the specified wait time. AwaitNanos (unit.tonanos (time)) > 0 */
    public final boolean await(long time, TimeUnit unit)
            throws InterruptedException {
        long nanosTimeout = unit.toNanos(time);
        if (Thread.interrupted())
            throw new InterruptedException();
        // 1. Wrap the current thread as Node and insert the end of the thread into the wait queue
        Node node = addConditionWaiter();
        // 2. Release the lock held by the current thread and wake up the next node in the synchronization queue
        int savedState = fullyRelease(node);
        final long deadline = System.nanoTime() + nanosTimeout;
        boolean timedout = false;
        int interruptMode = 0;
        while(! isOnSyncQueue(node)) {if (nanosTimeout <= 0L) {
                timedout = transferAfterCancelledWait(node);
                break;
            }
            if (nanosTimeout >= spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
                break;
            nanosTimeout = deadline - System.nanoTime();
        }
        if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! =null)
            unlinkCancelledWaiters();
        if(interruptMode ! =0)
            reportInterruptAfterWait(interruptMode);
        return! timedout; }Copy the code

2.3 Lock acquisition and release

The whole design concept of AQS is to achieve the acquisition and release of locks through the state field. Locks are mainly divided into fair locks and unfair locks.

2.3.1 fair lock

    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock(a) {
            // Inherit from AQS method, internal first call tryAcquire to obtain the lock, if the acquisition fails, add downtown to wait queue
            acquire(1);
        }

        /** * Fair lock version of tryAcquire */
        protected final boolean tryAcquire(int acquires) {
            // Get the current thread
            final Thread current = Thread.currentThread();
            // Get the lock status
            int c = getState();
            // 0 indicates that the lock is not held
            if (c == 0) {
                // Check whether there are nodes waiting in the current wait queue
                if(! hasQueuedPredecessors() &&// Compare and replace states
                    compareAndSetState(0, acquires)) {
                    // Set the current thread to an exclusive thread
                    setExclusiveOwnerThread(current);
                    return true; }}else if (current == getExclusiveOwnerThread()) {
                // Lock reentrant
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false; }}Copy the code

2.3.2 Unfair Lock

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /** * Obtain the lock immediately, failure will join the wait queue */
        final void lock(a) {
            // Try to obtain the lock through CAS, if successful, set the current thread to exclusive
            if (compareAndSetState(0.1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                // Or join the wait queue · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·
                acquire(1);
        }

        /** * Non-fair lock version of tryAcquire */
        protected final boolean tryAcquire(int acquires) {
            // Follow the default nonfairTryAcquire of its parent Sync class
            returnnonfairTryAcquire(acquires); }}Copy the code

2.3.3 Syc subclass

This is the parent of fair and unfair locks and provides a uniform tryRelease method to release the lock

    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        /** * provides a quick path to the unfair version */
        abstract void lock(a);

        /** * Unfair lock acquisition, default is unfair lock */
        final boolean nonfairTryAcquire(int acquires) {
            // Get the current thread
            final Thread current = Thread.currentThread();
            // Get the status of the current lock
            int c = getState();
            // 0 indicates that it is not occupied
            if (c == 0) {
                // the current thread is locked as an exclusive lock
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true; }}// The current thread has an exclusive lock, indicating lock reentrant
            else if (current == getExclusiveOwnerThread()) {
                // State + 1
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                // Set the current state
                setState(nextc);
                return true;
            }
            return false;
        }

         /** * release lock */
        protected final boolean tryRelease(int releases) {
            // Current state minus the number of releases
            int c = getState() - releases;
            // If the current thread is not the thread holding the lock, throw an exception
            if(Thread.currentThread() ! = getExclusiveOwnerThread())throw new IllegalMonitorStateException();
            boolean free = false;
            // Cancel the exclusive thread when the status is 0
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            // Set the status to 0
            setState(c);
            return free;
        }

        protected final boolean isHeldExclusively(a) {
            // Whether the current thread is a lock holder
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

        // Get the current holder
        final Thread getOwner(a) {
            return getState() == 0 ? null : getExclusiveOwnerThread();
        }

        // Get the number of holds, which only thread holders can get
        final int getHoldCount(a) {
            return isHeldExclusively() ? getState() : 0; }}Copy the code

2.3.4 acquire and release

There are also two core methods in AQS:

  • Acquire () : Acquire the lock, this is the real method of the actual lock call, the previous try knowledge attempts to acquire the lock, even if the failure is not added to the wait queue.
    public final void acquire(int arg) {
      // Try to get
      if(! tryAcquire(arg) &&// Try to get success, add to the wait queue exclusively, and keep trying to hold the lock until it succeeds
          acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
          selfInterrupt();
  }
Copy the code
  • Release () : Releases the lock, which is actually released by calling the tryRelease method of the lock’s custom synchronizer implementation:
        /** * Attempts to release, returns true */ on success
        public final boolean release(int arg) {
          if (tryRelease(arg)) {
              Node h = head;
              if(h ! =null&& h.waitStatus ! =0)
                  unparkSuccessor(h);
              return true;
          }
          return false;
      }
    Copy the code

Let’s customize the non-reentrant lock and see what the relationship between synchronizer and lock is.

2.4 Summary

So far, by reading the previous source content, we can have the following summary:

  • Locks are released and acquired around [state], where 0 means no lock is held, 1 means exclusive, and greater than 1 means lock reentrant
  • The lock can be obtained as follows:
      // If the lock fails to be obtained
      if(! tryAcquire(arg)) {// Join the queue, you can choose to block the current thread park unpark
      }
    Copy the code
  • Release the lock as follows:
      // If the lock is obtained successfully
      if(! tryRelease(arg)) {// Let the blocked thread resume running
      }
    Copy the code

Three, practice

Now that we know the structure of AQS, we might as well try it out for ourselves. Deepen understanding.

To define a non-reentrant lock, you need a synchronizer, a lock, and a test class

Custom synchronizer:

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

/ * * *@description: Implements a non-reentrant lock synchronizer with a maximum state of 1 *@author: weirx *@date: 2022/1/13 13:49 *@version: 3.0 * /
public class MyLockSynchronizer extends AbstractQueuedSynchronizer {

    @Override
    protected boolean tryAcquire(int acquires) {
        int state = getState();
        if (state == 0) {
            if (compareAndSetState(0.1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true; }}return false;
    }

    @Override
    protected boolean tryRelease(int acquires) {
        int c = getState() - acquires;
        if (c == 0) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
        return false;
    }

    @Override
    protected boolean isHeldExclusively(a) {
        return getState() == 1;
    }

    protected ConditionObject newCondition(a) {
        return newConditionObject(); }}Copy the code

Custom locks:

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/ * * *@description: Custom lock *@author: weirx *@date: 2022/1/13 14:05 *@version: 3.0 * /
public class MyLock implements Lock {

    MyLockSynchronizer myLockSynchronizer = new MyLockSynchronizer();

    @Override
    public void lock(a) {
        // Try to obtain the lock, fail to join the wait queue
        myLockSynchronizer.acquire(1);
    }

    @Override
    public void lockInterruptibly(a) throws InterruptedException {
        // Try to obtain the lock, fail to join the queue, can interrupt
        myLockSynchronizer.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock(a) {
        // Try to acquire the lock without joining the queue
        return myLockSynchronizer.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        // Try to get the lock without joining the queue
        return myLockSynchronizer.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock(a) {
        / / releases the lock
        myLockSynchronizer.release(1);
    }

    @Override
    public Condition newCondition(a) {
        // Condition variable
        returnmyLockSynchronizer.newCondition(); }}Copy the code

Test lock effect:

/ * * *@descriptionTest: *@author: weirx *@date: 2022/1/13 * to them@version: 3.0 * /
public class TestMyLock {

    public static void main(String[] args) {
        MyLock myLock = new MyLock();

        new Thread(() -> {
            try {
                myLock.lock();
                System.out.println(DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss")
                        + "" + Thread.currentThread().getName() + " :acquire lock success");

                // Sleep for a second to see the effect
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                myLock.unlock();
                System.out.println(DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss")
                        + "" + Thread.currentThread().getName() + " :release lock success"); }},"t1").start();

        new Thread(() -> {
            try {
                myLock.lock();
                System.out.println(DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss")
                        + "" + Thread.currentThread().getName() + " :acquire lock success");
            } finally {
                myLock.unlock();
                System.out.println(DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss")
                        + "" + Thread.currentThread().getName() + " :release lock success"); }},"t2").start(); }}Copy the code

As a result, it takes TI one second to release the lock before T2 acquires it:

2022-01-13 14:34:56 t1 :acquire lock success
2022-01-13 14:34:57 t2 :acquire lock success
2022-01-13 14:34:57 t1 :release lock success
2022-01-13 14:34:57 t2 :release lock success
Copy the code

Mylock. lock() is used once more in thread T1 of the test code, and the entire program is stuck.

2022-01-13 14:35:56 t1 :acquire lock success
Copy the code

4. The addition of Condition

Conditions are not covered in this article, but they were mentioned earlier in ReentrantLock. You can go back to this article to see how they are used: juejin.cn/post/706288…


Source code learning is really difficult, see others say more than their own follow again, suggest students refer to this article to track their own again.