1. Basic concepts

Most of the Java synchronization (Lock, Semaphore, already, etc.) are based on AbstractQueuedSynchronized (AQS). AQS uses a member variable of type INT to indicate the synchronization status, and queues the resource acquisition thread through the built-in FIFO queue.

2. Implementation principle

2.1 Data structure analysis

The core idea of AQS: If the requested resource is free, the current thread is set as a valid worker thread, and the state of the shared resource is set as a locked state; If shared resources are occupied, some wait-and-wake mechanism is required to ensure allocation. This mechanism is implemented primarily using a variant of CLH queues to queue threads that are temporarily not needed.

2.1.1 Synchronizing queues

Queues in ASQ are a virtual bidirectional queue (FIFO) variant of CLH. AQS allocate locks by encapsulating each thread requesting shared resources into a node.

The basic structure of a synchronization queue is as follows:

  • Synchronizes the queue enqueue operation

The enqueue operation for synchronous queues must be atomic, so a CAS-based method is used: the compareAndSetTail() method.

  • Synchronizes the queue dequeueing operation

The first node is set by the successful thread that gets the synchronization status, and onlyA thread can obtain synchronization statusTherefore, the method of setting headers does not require CAS.

2.1.2 Node structure

Each Node in AQS is a Node. A Node is a Node on the CLH queue, which contains information such as the current thread and wait state.

  • Node
Method and property values describe
waitStatus Wait state
thread The thread that gets the synchronization state
pre Precursor node
predecessor Return to the precursor node, otherwise run out of the NPE
nextWaiter Points to the next node in CONDITION
next The subsequent nodes
  • waitStatus
The name of the meaning
0 The initial state
CANCELLED 1. The current thread’s request to acquire the lock has been cancelled
CONDITION -2: indicates that the node is in the waiting queue and the node thread request is awakened
PROGATE -3: this field is used only when the current thread is SHARED
SIGNAL -1: indicates that the current thread is ready for resource release
  • Thread two locking modes
model meaning
SHARED Indicates that the current thread is waiting for a lock in shared mode
EXCLUSIVE Indicates that the current thread is waiting for the lock in an exclusive manner

2.2 Synchronization Status Analysis

Lock acquisition can be divided into exclusive and shared. Exclusive means that only one thread can acquire the lock, such as ReentrantLock, while shared means that multiple threads can acquire the lock at the same time, such as ReadWriteLock.

2.2.1 Exclusive Mode

2.2.1.1 lock

  • Aquire () method

The aquire() method acquires the resource exclusively, returns if the resource is obtained, or enters a wait queue and spins until the lock is obtained

    public final void acquire(int arg) {
        if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code

  • AddWaiter () method

The addWaiter() method will be invoked if the lock fails to be acquired to add a node to the CLH queue (CAS)

   private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if(pred ! =null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
Copy the code
  • AcuqireQueued () method

The thread entering the LCH queue obtains resources in the way of spin. After obtaining resources successfully, the queue will be dequeued.

final boolean acquireQueued(final Node node, int arg) {
        // It is used to mark whether the resource has been obtained
        boolean failed = true;
        try {
            // Flag whether the wait is interrupted
            boolean interrupted = false;
            // Start acquiring the lock, either successfully or interrupted
            for (;;) {
            // Get the precursor node of the current node
                final Node p = node.predecessor();
                // If the precursor node is head, the current thread attempts to acquire the lock
                if (p == head && tryAcquire(arg)) {
                    // Set the current node to a virtual node
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //p indicates that the head node does not acquire the lock or p is not the head node. Determine whether the current node needs to be blocked (blocked condition: waitStatus of the precursor node is -1) to prevent infinite loop waste of resources
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt()) / / call LockSupport. Park ()
                    interrupted = true; }}finally {
            if(failed) cancelAcquire(node); }}Copy the code
  • setHead() && shouldParkAfterFailedAcuqire()

SetHead () sets the current node to a virtual node.

    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }
    
    
// Determine whether the current thread should be blocked by the precursor node
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // Get the state of the head node
        int ws = pred.waitStatus;
        // The head node is awake
        if (ws == Node.SIGNAL)
            return true;
        // The head node is in the cancelled state
        if (ws > 0) {
            do {
                // Loop forward to find the node, if the node is cancelled out of the queue
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // Set the wait status of the precursor node to signal
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false; },Copy the code

This is how an exclusive lock acquires resources, and how an exclusive lock frees resources

2.2.1.2 unlock

The unlock operation essentially sets the state of the current thread to 0 and wakes up the next thread

  • release()
    public final boolean release(int arg) {
    // Custom tryRease() returns true, indicating that the lock is not held by any thread
        if (tryRelease(arg)) {
            // Get the head node
            Node h = head;
            // If the head node is empty and the waitStatus of the head node is not the initialization node, the thread is suspended
            if(h ! =null&& h.waitStatus ! =0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
Copy the code
  • unparkSuccessor()

If a node’s successor node is not empty and not canceled, it wakes up the successor node. Otherwise, start at the end to find the appropriate node and wake up if found.

    private void unparkSuccessor(Node node) {
        // Get the head waitStuatus
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        // Get the next node of the current node
        Node s = node.next;
        // If the next node is cancelled, or the next node is cancelled, the non-cancelled node at the beginning of the queue is found
        if (s == null || s.waitStatus > 0) {
            s = null;
            // Start at the end of the queue and find the first waitStuatus<0
            for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
                    s = t;
        }
        // If the next node of the current node is not empty and the state is <=0, unpark the current node ().
        if(s ! =null)
            LockSupport.unpark(s.thread);
    }
Copy the code

Reasons to look back:

  1. The disconnected node is the next pointer that is disconnected first
  2. Node. prev = pred; CompareAndSetTail (pred, node), which is not an atomic operation, cannot be found if node.prev = pred followed by unparksucceeded.

2.2.2 Sharing Mode

Shared mode, which allows multiple threads to obtain synchronization status at the same time without interruption.

  • AcuqiredShared () method

Obtain a lock in shared mode. If the lock is obtained successfully, the system returns the lock. Otherwise, the system joins the waiting queue.

  1. First try to get the resource through the custom tryAcquireShared() method
  2. Execute the doAcquireShared() method,
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
Copy the code
  • doAcquireShared()

As in exclusive mode, the current thread is placed at the end of the queue and spins until the lock is acquired. The difference is that in shared mode, the current thread decides

    private void doAcquireShared(int arg) {
        // Join the queue in shared mode
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // Get the precursor node
                final Node p = node.predecessor();
                // The precursor node is empty, and the current thread is trying to get the resource
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    // Succeeded
                    if (r >= 0) {
                        // If you point the head at yourself, there are still resources left to wake up other threads
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt(); // If the operation is interrupted, the response is interrupted.
                        failed = false;
                        return; }}The current thread is in a waiting state, waiting to be waked up or interrupted
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true; }}finally {
            if(failed) cancelAcquire(node); }}Copy the code
  • releaseShared()

In shared mode, a specified number of resources are released and other nodes are awakened.

In exclusive mode, other threads are woken up only when state=0, while in shared mode, other threads are woken up directly.

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            // Wake up the successor node
            doReleaseShared();
            return true;
        }
        return false;
    }
Copy the code
  • doRealseShared()

It is used to wake up other nodes. When state>0, it obtains the remaining public resources. When state=0, the shared resource is obtained

    private void doReleaseShared(a) {
        for (;;) {
            Node h = head;
            if(h ! =null&& h ! = tail) {int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if(! compareAndSetWaitStatus(h, Node.SIGNAL,0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break; }}Copy the code

Reference article:

  1. Tech.meituan.com/2019/12/05/…
  2. Juejin. Cn/post / 684490…
  3. Juejin. Cn/post / 684490…