1 introduction

AbstractQueuedSynchronizer abstract queue type synchronizer (abstract class). FIFO queue provides a FIFO queue, which can be regarded as a core component used to achieve synchronization lock and other synchronization functions, such as ReentrantLock and CountDownLatch.

Among the more important concepts are:

  • State Shared resource
  • FIFO
  • CAS
  • park  unpark

2 AQS thread queue

Node is the most core part of the whole queue, which contains CANCELLED, SIGNAL, CONDITION and PROPAGATE, Pointers to nodes before and after, and thread is the value stored by nodes.

Both fair and unfair locks in ReentrantLock inherit from this abstract class.

ReentrantLock contains fair and unfair locks. Each lock also contains the abstract lock Sync, which performs AQS.

If thread 1 currently holds the lock, threads 2,3, and 4 call the park operation to be suspended and enqueued.

The unpark method is called when Thread1, which owns the lock, is done, head pointing to the next node. Since ReentrantLock is a ReentrantLock, the number of reentrants is recorded by aqs. state, with a value of + 1 for each reentrant and -1 for each exit.

3 ReentrantLock instance construction

The ReentrantLock default constructor creates an unfair lock, and you need to pass true in the constructor if you want a fair lock.

4 Multi-thread Lock grab icon (Unfair lock)

The thread grabs the lock using CAS.

At the beginning, there is only one Head node in the waiting queue, in which the Thread stored is null, which is used to hold the position. Threads B and C are connected with Head in the queue in the way of bidirectional linked list, and call park to suspend.

See below for a step-by-step process of the figure above.

4.1 Thread A locks

When thread A takes the lock, change the state to 1 by CAS operation and set the exclusiveOwnerThread attribute of AQS to A, indicating the owner of the current lock.

4.2B thread locks

At this time, thread B will grab the lock and find that the state is already 1, so the CAS operation fails and it enters the queue. WaitStatus is set to 0

The first time a new Node is created, the head is set to NULL, and waitStatus is set to SIGNAL

4.3 Lock the C thread

If thread C sets waitStatus of thread B to SIGNAL, it will set waitStatus of thread B to SIGNAL

The reason why the waitStatus value of the newly inserted node is set to 0 is that it is used to mark the last node in the queue. In this case, unpark operation on subsequent nodes is unnecessary. When there is no node in the wait queue, a null node will be used as a placeholder.

4.4 A Thread is unlocked

After thread A performs the unlocking operation, thread B can grab the lock. The process is as follows:

Unlock operation of thread A:

  • The state is set to 0
  • ExclusiveOwnerThread Set to null: The lock is released
  • The head node moves back one bit to unpark thread B
  • The previous head node will be freed

4.5B thread is unlocked

4.6 Unlocking the C Thread

5 Source code analysis (taking unfair lock as an example)

5.1 Thread A preempts the lock first

Construction method:

public ReentrantLock(a) {
    sync = new NonfairSync();
}
Copy the code

The lock () :

public void lock(a) {
    sync.lock();
}
Copy the code

Suppose we now have two threads, A and B, as follows:

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    // Thread A comes in
    Thread B comes in
    final void lock(a) {
        State is set to 1, indicating that the lock has been preempted
        // Thread B, CAS is false because thread A has AQS. State set to 1
        if (compareAndSetState(0.1))
            / / thread A, will AbstractOwnableSynchronizer exclusiveOwnerThread Settings for the current thread
            setExclusiveOwnerThread(Thread.currentThread());
        else
            Thread B performs this part, attempting to acquire a lock
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        returnnonfairTryAcquire(acquires); }}Copy the code

Acquire (1) :

Thread B calls: arg = 1
public final void acquire(int arg) {
    / / thread B
    TryAcquire (arg) = false; // Try to acquire an unfair lock because it was grabbed by thread A
    // addWaiter method to build a Node hosting thread B, add it to the end of the queue, and return the Node
    / / acquireQueued method
    if(! tryAcquire(arg) && acquireQueued(addWaiter(AbstractQueuedSynchronizer.Node.EXCLUSIVE), arg)) {// Sets the interrupt flag for the current threadselfInterrupt(); }}Copy the code

TryAcquire (ARG) finally calls: tryAcquire(ARG)

* 1 If the lock is grabbed, returns true * 1.1 if the current thread grabbed the lock for the first time: * aqs.status changes from 0 to 1 * aqs.exclusiveownerthread = thread.currentthread () * returns true * 1.2 if the currentThread snatches the lock again (reentre) * aqs.status ++ * Returns true * 2 failed to capture the lock and false * */
Thread B acquires = 1
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    Thread A has set state to 1, so c=1
    int c = getState();
    if (c == 0) {   // Thread B will not execute if it does not meet the requirement
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true; }}else if (current == getExclusiveOwnerThread()) {    // Thread B will not execute if it does not meet the requirement
        /** * get the current exclusive thread, if it is the current thread, then reentrant * when tryLock() is executed: * If you enter for the second time, nexTC = 0 + 1 = 1 * If you enter for the third time, nexTC = 1 + 1 = 2 * If you enter for the fourth time, nexTC = 2 + 1 = 3 */
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    Thread B, return false
    return false;
}
Copy the code

Back to the addWaiter method in the Acquire code:

Thread B: mode = node. EXCLUSIVE = null
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail; 
    // Thread B: Tail = null does not enter the if method since thread A is not in the list
    if(pred ! =null) {
        node.prev = pred;   // (old tail node) <-- new node.prev
        if (compareAndSetTail(pred, node)) {
            pred.next = node;   // (old tail node) next --> (new node)
            returnnode; }}// thread B: node = the node hosting thread B
    enq(node);
    return node;
}
Copy the code

The addWaiter method uses the Node constructor:

Thread B: thread = thread.currentThread () mode = null
Node(Thread thread, Node mode) {     
    this.nextWaiter = mode;
    this.thread = thread;
}
Copy the code

NextWaiter is null. The Node constructor does not assign waitStatus, so it defaults to 0

Back to the addWaiter method, which also uses the ENQ method:

If the queue is empty, initialize an empty content node as the first node, and then append the input node to the end of the queue. If the queue is not empty, append the input node to the end of the queue@param node
 * @return* /
// thread B: node = the node hosting thread B
private Node enq(final Node node) {
    for (;;) {
        // Thread B: the first loop tail = null
        // Thread B: tail = head = empty content node
        Node t = tail;
        /** * the queue has been initialized by 1 node, so t = null. = null */
        if (t == null) {    // Thread B: the first loop satisfies t == null and enters the module
            if (compareAndSetHead(new Node()))  // Initialize an empty content node as the aqs.head node
                tail = head;
        } else {    // Thread B: the second loop satisfies t! = null into the module, i.e. empty content node <-> new node
            node.prev = t;  // The old tail node <-- the entry node.prev
            if (compareAndSetTail(t, node)) {   // Update the aqs. tailOffset content to the input node
                t.next = node;  // Old tail node --> entry node
                returnt; }}}}Copy the code

AcquireQueued: addWaiter () acquireQueued: acquireQueued () acquireQueued () acquireQueued ();

// thread B: node = the node hosting B, arg = 1
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        Thread B: loops until the lock is acquired
        for (;;) {
            Thread B: p= empty content node (head)
            final Node p = node.predecessor();
            Thread B: p == head = true; But since thread A grabbed the lock first, tryAcquire(ARg)=false, so the following if is not entered
            if (p == head && tryAcquire(arg)) { // tryAcquire(ARG) : lock grab
                setHead(node);  // Update the header to an input node
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            / / thread B: shouldParkAfterFailedAcquire method, set up the waitStatus = p Node Node. The SIGNAL (the original is 0), if the spin two didn't get the lock will return true hung thread
            // Thread B: the parkAndCheckInterrupt method suspends thread B
            if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                interrupted = true; }}finally {
        if(failed) cancelAcquire(node); }}Copy the code

5.2 Thread A Releases the lock

If A calls unlock:

public void unlock(a) {
    sync.release(1);
}
Copy the code

The lock can be released as follows:

Thread A: arg = 1
public final boolean release(int arg) {
    if (tryRelease(arg)) {  / / thread A: AQS. State = 0, AQS. ExclusiveOwnerThread = null
        Node h = head;
        Thread A: meet the condition h.waitStatus == SIGNAL(value -1)
        if(h ! =null&& h.waitStatus ! =0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
Copy the code

The tryRelease method is called:

Thread A: release = 1
protected final boolean tryRelease(int releases) {
    Thread A: c = 1-1 = 0
    int c = getState() - releases;
    // If the current thread is not preempted, an exception is thrown
    if(Thread.currentThread() ! = getExclusiveOwnerThread())throw new IllegalMonitorStateException();
    boolean free = false;
    Thread A: c == 0 is true
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}
Copy the code

Corresponding:

TryRelease followed by going back to the release method and running the unparkprecursor method:

private void unparkSuccessor(Node node) {
    // thread A: node.waitStatus == SIGNAL(-1)
    int ws = node.waitStatus;
    if (ws < 0) {
        Thread A: Set waitStatus == 0
        compareAndSetWaitStatus(node, ws, 0);
    }
    Node s = node.next;
    // thread A: s = thread B, s.waitStatus == SIGNAL(-1)
    if (s == null || s.waitStatus > 0) {    // Node is tail or the tail node of node waitStatus > 0
        s = null;   // Disconnect Node from Node.next
        for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
                s = t;
    }
    // Thread A: s == thread B, activate thread B
    if(s ! =null)
        LockSupport.unpark(s.thread);
}
Copy the code

In some column calls to thread B’s lock method after the release, there is the parkAndCheckInterrupt() method, that is, thread B is suspended, and thread A is released and returns to the suspended position to continue execution, so thread B’s acquireQueued() method can grab the lock

Corresponding: