We introduced the locking mechanisms of concurrent programming earlier: synchronized and Lock, and an important implementation class for the Lock interface is ReentrantLock. And on the Lock of a concurrent AQS (AbstractQueuedSynchronizer) introduces the AQS, when it comes to already, have to talk about abstract class AbstractQueuedSynchronizer (AQS). AQS defines a synchronizer framework for multi-threaded access to shared resources, and the implementation of ReentrantLock relies on this synchronizer. This paper has introduced AQS, combined with its specific implementation class ReentrantLock analysis of the implementation principle.

Already the class diagram

Already the class diagram

Already realized the Lock interface, there are three inner class inside, Sync, NonfairSync, FairSync, Sync is an abstract type, it inherits AbstractQueuedSynchronizer, The AbstractQueuedSynchronizer is a template class, it implements many lock and related functions, and hook method was provided for the user, such as tryAcquire tryRelease, etc. Sync realized AbstractQueuedSynchronizer tryRelease method. The NonfairSync and FairSync classes inherit from Sync and implement the lock method. Fair preemption and unfair preemption have different implementations for tryAcquire. This article focuses on the default implementation of ReentrantLock, namely the acquisition and release implementation of unfair locks.

The lock method of an unfair lock

The lock method

  • When initializing a ReentrantLock, if we pass no arguments, we default to using an unfair lock, NonfairSync.
public ReentrantLock(a) {   
  sync = new NonfairSync();  
} 
Copy the code
  • When we call the lock method of ReentrantLock, we are actually calling the lock method of NonfairSync, which first attempts to preempt the lock with the CAS operation. If successful, the current thread is set on the lock, indicating a successful preemption. If that fails, the Acquire template method is called and waits for preemption. The code is as follows:
static final class NonfairSync extends Sync {
    final void lock(a) {
        if (compareAndSetState(0.1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        returnnonfairTryAcquire(acquires); }}Copy the code
  • The above callacquire(1)It actually usesAbstractQueuedSynchronizerAcquire method, which is a set of template for lock preemption. The general principle is to try to obtain the lock first. If the lock is not successfully obtained, a node of the current thread will be added to the CLH queue, indicating waiting for preemption. Then the CLH queue enters preemption mode. When entering, the lock is acquired once. If not, the current thread is suspended by calling locksupport. park. So when does the current thread wake up? When the thread that holds the lock calls UNLOCK, it wakes up the thread on the next node on the head of the CLH queue, using the locksupport.unpark method. Acquire code is relatively simple, as follows:
public final void acquire(int arg) {
	if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code
  • Inside acquire method, tryAcquire hook method is used to try to acquire the lock again. This method actually uses nonfairTryAcquire in the class NonfairSync. The implementation principle is to compare whether the current lock state is 0. Is trying to atomic grab the lock (set the status to 1, then the current thread is set to monopolize the thread), if the current state of the lock is not zero, is to compare the current thread and lock the thread is a thread take up, if it is, to increase the value of the state variable, see from here is reentrant lock are reentrant is the same thread lock can use it to take up repeatedly. If neither case passes, false is returned. The code is as follows:
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true; }}else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
Copy the code
  • Once tryAcquire returns false, it enters the acquireQueued process, which is CLH queue-based preemption mode

First, add a wait at the end of the CLH lock queue node, the node to save the current thread, by calling the addWaiter implementation, initialization here need to consider the situation, in the first waiting for node to enter, to initialize a head node and then add the current node to the tail, the follow-up directly join node in the tail.

private Node addWaiter(Node mode) {
	// Initializes a node that holds the current thread
    Node node = new Node(Thread.currentThread(), mode);
    // When the CLH queue is not empty, insert a node directly at the end of the queue
    Node pred = tail;
    if(pred ! =null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            returnnode; }}// When the CLH queue is empty, call enq to initialize the queue
    enq(node);
    return node;
}

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Initialize the node with both ends pointing to an empty node
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {// Consider concurrent initialization
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                returnt; }}}}Copy the code
  • After adding the node to the CLH queue, enter the acquireQueued method.

First, the outer layer is an infinite for loop. If the current node is the next node of the head node, and the lock is obtained through tryAcquire, the head node has released the lock, and the current thread is awakened by the thread of the head node, then you can set the current node as the head node, and set the failed flag to false. And then back. As for the previous node, its next variable is set to NULL and will be cleaned up during the next GC.

If the loop didn’t get the lock, just entered the stage of thread hanging, namely shouldParkAfterFailedAcquire this method.

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true; }}finally {
        if(failed) cancelAcquire(node); }}Copy the code
  • If you try to get the lock fails, will enter shouldParkAfterFailedAcquire method, will determine whether the current thread hangs, if a node is already a SIGNAL before state, the current thread need to hang up. If the previous node is cancelled, the cancelled node needs to be removed from the queue. If the previous node is in a different state, try to set it to SIGNAL state and return no need to suspend, thus performing the second preemption. Enter the suspend phase after completing the above.
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        //
        return true;
    if (ws > 0) {
        //
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        //
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
Copy the code
  • When the suspension phase is entered, the parkAndCheckInterrupt method is entered, and lockSupport.park (this) is called to suspend the current thread.
private final boolean parkAndCheckInterrupt(a) {
    LockSupport.park(this);
    return Thread.interrupted();
}

Copy the code

Unlock method for an unfair lock

  • Call unlock method, it is called directly AbstractQueuedSynchronizer release operation.
public void unlock(a) {
	sync.release(1);
}
Copy the code
  • After entering the release method, the tryRelease operation is tried internally, mainly to remove the exclusive thread of the lock, and then the state is reduced by one. The reason why the state is reduced by one is that the reentrant lock may itself occupy the lock many times. Only when the state becomes 0, the lock is released completely.
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if(h ! =null&& h.waitStatus ! =0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
Copy the code
  • Once tryRelease is successful, the state of the head node of the CHL queue is set to 0, and the next non-canceled node thread is awakened.
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if(Thread.currentThread() ! = getExclusiveOwnerThread())throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
          free = true;
          setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
 }
Copy the code
  • Once the thread of the next node is awakened, the awakened thread enters the acquireQueued code flow to acquire the lock.
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
                s = t;
    }
    if(s ! =null) 
       LockSupport.unpark(s.thread);
}
Copy the code

tryLock

The tryLock(long timeout, TimeUnit Unit) function of ReetrantLock is provided. The semantics are true if the lock was acquired within the specified time, false if it was not. This mechanism prevents threads from waiting indefinitely for locks to be released.

public boolean tryLock(long timeout, TimeUnit unit)
       throws InterruptedException {
   return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
Copy the code

Look at the tryAcquireNanos method in the inner class:

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
       throws InterruptedException {
   if (Thread.interrupted())
       throw new InterruptedException();
   return tryAcquire(arg) ||
       doAcquireNanos(arg, nanosTimeout);
}
Copy the code

If the thread is interrupted, InterruptedException is thrown. If the lock is not interrupted, try to acquire the lock and return to doAcquireNanos if the lock fails. TryAcquire we’ve already seen tryAcquire, but let’s focus on what doAcquireNanos does.

private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    // Start time
    long lastTime = System.nanoTime();
    // The thread is queued
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        // Spin again!
        for (;;) {
            // Get the precursor node
            final Node p = node.predecessor();
            // If the precursor is a head node and the lock succeeds, the current node becomes a head node
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            // If you have timed out, return false
            if (nanosTimeout <= 0)
                return false;
            // The timeout period has not expired and needs to be suspended
            if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                // Block the current thread until the timeout expires
                LockSupport.parkNanos(this, nanosTimeout);
            long now = System.nanoTime();
            / / update the nanosTimeout
            nanosTimeout -= now - lastTime;
            lastTime = now;
            if (Thread.interrupted())
                // Interrupt accordingly
                throw newInterruptedException(); }}finally {
        if(failed) cancelAcquire(node); }}Copy the code

The doAcquireNanos process is described as follows: the program enters the queue, spins, attempts to acquire the lock, returns on success, and suspends itself at a safe point in the queue until the timeout expires. Why do we need a loop here? Because the precursor state of the current thread node may not be SIGNAL, the thread will not be suspended in the current loop, and then the timeout will be updated to start a new attempt.

conclusion

ReentrantLock is a ReentrantLock that uses exclusive mode AQS internally. The difference between a fair lock and an unfair lock is that a fair lock does not first check the state when acquiring the lock, but directly executes aqcuire(1). Fair locks include hasqueueAoxing, a method used to determine whether a CHL queue has a node. With a fair lock, if a CHL queue has a node, a new thread entering the race must queue on the CHL, whereas with a non-fair lock, it ignores the node in the CHL queue and preempts it directly. This can result in the node on the CHL queue never acquiring the lock, which is why the unfair lock is unfair and will not be discussed here.

Subscribe to the latest articles, welcome to follow my official account

reference

  1. Java ReentrantLock principle analysis
  2. Implementation principles of ReentrantLock