ReentrantLock

Introduction

Lock appears after JavaSe5. Compared with Synchronized, Lock acquisition and release are operable and can be interrupted. Synchronized implements method and code synchronization based on entering and exiting monitor objects (Monitorenter and Monitorexit), but what does Lock implement based on? That’s right, AQS. First, take a look at the ReentrantLock class diagram below the Current package

The entire class structure of ReentrantLock is relatively simple, implementing the Lock interface, and internal combination of the internal class Sync and Sync two subclasses. NonfairSync and FairSync. Lock the implementation of the interface is already call ability outside of encapsulation, and the concrete implementation is dependent on the inner class Sync parent AbstractQueuedSynchronizer provide encapsulation. Let’s start by looking at a class structure diagram of Sync

ReentrantLock’s implementation of the Lock interface is really just a call to the internal class Sync’s methods, for example

    public boolean hasWaiters(Condition condition) {
        if (condition == null)
            throw new NullPointerException();
        if(! (conditioninstanceof AbstractQueuedSynchronizer.ConditionObject))
            throw new IllegalArgumentException("not owner");
        return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);
    }
        public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
        public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
Copy the code

FairSync and NonFairSync

Before we get to AQS, let’s briefly review the difference between fair and unfair ReentrantLock. ReentrantLock provides two constructors

 public ReentrantLock(a) {
        sync = new NonfairSync();
    }
 public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
Copy the code

The codes for FairSync and NonFairSync are as follows

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
        final void lock(a) {
            if (compareAndSetState(0.1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
        protected final boolean tryAcquire(int acquires) {
            returnnonfairTryAcquire(acquires); }}static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock(a) {
            acquire(1);
        }

        /** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if(! hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true; }}else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false; }}Copy the code

It can be seen that no matter lock or tryAcquire method, the unfair lock actually has only one more step than the fair lock, that is, to apply for CAS directly at the beginning of the lock, while the fair lock directly enters the FIFO queue for orderly acquisition. An unfair lock is the same as a fair lock when it fails to apply for a lock. There is no unlock in this lock, which means there is no difference between the two locks.

Sync

Let’s take a look at how the Lock interface is implemented by Sync and pick a few core methods

  abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;
        // This lock method is implemented by the first two subclasses
        abstract void lock(a);
        // called by the AQS template method release
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if(Thread.currentThread() ! = getExclusiveOwnerThread())throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
        // tryAcquire with an unfair lock calls this method directly
         final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true; }}else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
        final ConditionObject newCondition(a) {
            return newConditionObject(); }}public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    //ReentrantLock's unlock calls this method directly
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if(h ! =null&& h.waitStatus ! =0)
                unparkSuccessor(h);
            return true;
        }
        return false; }}Copy the code

As you can see, the Sync method implementation is basically an on-demand call to the parent AQS method, as well as writing a lock to implement some protected AQS methods on demand, such as tryAcquire above.

AQS

Here we can take a look at what AQS provides for our lock implementation. What do you think we need if we don’t rely on AQS? 1. A state variable, which records the state of the lock, must have at least 0,1 states. Meanwhile, thread safety should be ensured for state operation, i.e. CAS 2 is required. Thread 3 that holds the current lock needs to be recorded. 4. There needs to be a queue to maintain all blocked threads, and this queue must be thread safe and need to support CAS.

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    static final class Node {... Temporarily not table....... }// Queue head node
    private transient volatile Node head;
    // Queue end
    private transient volatile Node tail;
    // State can be not only 0, but also greater than 1 to support reentrant locking
    private volatile int state;
    }
    / / state of CAS
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
    // CAS for the head node
    private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }
    // Tail CAS
    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update); }}/** * Unpark /unpark /unpark /unpark /unpark /unpark /unpark /unpark /unpark /unpark /unpark /unpark /unpark
    public class LockSupport {
      public static void unpark(Thread thread) {
        if(thread ! =null)
            UNSAFE.unpark(thread);
      }
        public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false.0L);
        setBlocker(t, null); }}Copy the code

The details of AQS implementation will not be discussed in detail here. Later, I will write another article. Next, we will look at the Condition inside Lock.

Condition

At the beginning, we said that Synchronized is realized by entering and leaving the corresponding monitor Object. Meanwhile, wait(), notify and other methods exist to realize the wait/notification mode. Condition interface also provides methods similar to those on Object. The wait/notification mode can be implemented in conjunction with Lock. Condition must be obtained through Lock’s newCondition() method.

public interface Condition {
    void await(a) throws InterruptedException;
    void awaitUninterruptibly(a);
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    boolean awaitUntil(Date deadline) throws InterruptedException;
    void signal(a);
    void signalAll(a);
}
Copy the code

ConditionObject implements the Condition interface and maintains a wait queue. The node types reuse the node definitions in AQS

 public class ConditionObject implements Condition.java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;
        /** * Queues the current thread, releases the lock, and changes the state to wait. Adds the first * node of AQS to the wait queue of Condition */
        public final void await(a) throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            // The current thread joins the queue
            Node node = addConditionWaiter();
            / / releases the lock
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            AQS queue AQS queue AQS queue AQS queue AQS queue AQS queue
            while(! isOnSyncQueue(node)) {// block yourself
                LockSupport.park(this);
                if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
                    break;
            }
            // Pick up the lock again
            if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! =null) // clean up if cancelled
                unlinkCancelledWaiters();
            if(interruptMode ! =0)
                reportInterruptAfterWait(interruptMode);
        }
        
          public final void signal(a) {
            if(! isHeldExclusively())throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if(first ! =null)
                doSignal(first);
        }
        // Wake up the first thread of the synchronization queue
        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while(! transferForSignal(first) && (first = firstWaiter) ! =null);
        }
        
        final boolean transferForSignal(Node node) {
            if(! compareAndSetWaitStatus(node, Node.CONDITION,0))
                return false;
            // Put node into the AQS queue and call unpark
            Node p = enq(node);
            int ws = p.waitStatus;
            if (ws > 0| |! compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread);return true; }... Other ways to achieve temporary savings............... }Copy the code

ConditionObject is similar to the other conditionObject methods, but ArrayBlockingQueue is the condition in action

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
      /** The queued items */
    final Object[] items;
    
    final ReentrantLock lock;
    private final Condition notEmpty;
    /** Condition for waiting puts */
    private final Condition notFull;
    
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
    
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();// when the queue is full, it blocks on a non-full condition
            enqueue(e);
        } finally{ lock.unlock(); }}private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();//put Indicates a non-empty condition for success notification
    }
      public E take(a) throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();// Take blocks on a non-empty condition when the queue is empty
            return dequeue();
        } finally{ lock.unlock(); }}}Copy the code

Condition is a good way to implement blocking queues.