preface

Before Java SE 5, synchronized was used to control multithreaded access to shared resources. After using synchronized modification methods or code blocks, the lock would be implicitly acquired, and then the lock would be implicitly released when the method exits. After Java SE 5, synchronized was used to control multithreaded access to shared resources. The Lock interface (and related implementation classes) is added in the package to realize the Lock function. It provides the synchronization function similar to the synchronized keyword, but requires explicit Lock acquisition and release when used. Lock Lock has the operability of Lock acquisition and release, fair Lock and unfair Lock, interrupted Lock acquisition and timeout Lock acquisition, and other synchronization features that are not provided by synchronized keywords.

example

Using Lock to synchronize code is very simple. Just call Lock where Lock is locked to Lock, and unlock where unlock is unlocked to release the Lock, usually in finally to ensure that the Lock is released correctly. As follows:

private static Lock lock = new ReentrantLock(); public void testLcok() { new Thread(() -> { lock.lock(); // try {for (int j = 1; j <= 3; j++) { System.out.println("A = " + j); } } catch (InterruptedException e) { e.printStackTrace(); }finally { lock.unlock(); // unlock}}).start(); }Copy the code

Do not write the lock acquisition process in a try block, because if an exception occurs while acquiring the lock (the implementation of a custom lock), the exception is thrown and the lock is released without reason.

Analysis of the source code

Lock is an interface that defines a set of methods implemented by subclasses, such as ReentrantLock,

Public interface Lock {// Get the Lock, call the current thread to get the Lock, when the Lock, return void Lock (); Void lockInterruptibly() throws InterruptedException; Return false Boolean tryLock(); return false Boolean tryLock(); return false Boolean tryLock(); // The current thread gets the lock in three cases: // 1. False Boolean tryLock(long time, TimeUnit Unit) throws InterruptedException; False Boolean tryLock(long time, TimeUnit unit) throws InterruptedException; Void unlock(); Condition newCondition(); Condition newCondition(); Condition newCondition(); Condition newCondition(); Condition newCondition(); Condition newCondition(); }Copy the code

Lock subclass is extremely thread through a synchronizer of access control, and synchronizer itself does not implement any synchronization interfaces, it is only defines the number of state synchronization acquisition and release methods for use by the custom synchronous components, synchronizer can support to exclusive access to sync, can also support to Shared access to sync.

Let’s look at the Lock of the synchronizer AbstractOwnableSynchronizer a class diagram:

Synchronizer AbstractQueuedSynchronizer inheritance in AbstractOwnableSynchronizer, and AbstractOwnableSynchronizer class has only one attribute and setter/getter methods, Represents the current thread.

public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { private static final long serialVersionUID = 3737899427754241961L; protected AbstractOwnableSynchronizer() { } private transient Thread exclusiveOwnerThread; protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; }}Copy the code

Then look at the realization of the synchronizer AbstractQueuedSynchronizer AbstractQueuedSynchronizer is used to construct the lock or other synchronous component based framework, it USES an int member variable synchronous state, Through the built-in FIFO queue to complete the queuing of resource acquisition threads.

The relationship between locks and synchronizers can be immediately understood as follows: locks are consumer-oriented, defining the interface between the user and the lock while hiding implementation details; Synchronizer is for the implementor of the lock, it simplifies the implementation of the lock, shielding the synchronization state management, thread queuing, waiting and wake up and other low-level operations.

Synchronizer AbstractQueuedSynchronizer source code

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    // 队列中的节点,用它来表示队列,是一个双向列表的形式
    static final class Node {
        // 共享模式
        static final Node SHARED = new Node();
        // 独占模式
        static final Node EXCLUSIVE = null;
        // 等待状态值-由于在等待队列中等待的线程等待超时或被中断,需要从等待队列中取消等待,节点进入该状态不会变化
        static final int CANCELLED =  1;
        // 等待状态值-后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或被中断,在通知后继节点,后继节点得以运行
        static final int SIGNAL    = -1;
        // 等待状态值-节点在等待队列中,节点线程等待在 Condition 上,当其他线程到 Condition 调用了 signal 方法后,该节点将从等待队列中转移到同步队列中,加入到对同步状态的获取中
        static final int CONDITION = -2;
        // 等待状态值-表示下一次共享式同步状态获取将被传播下去
        static final int PROPAGATE = -3;
        // 等待状态,只能是上面的值或0
        volatile int waitStatus;
        // 前驱节点
        volatile Node prev;
        // 后继节点
        volatile Node next;
        // 获取同步状态的线程
        volatile Thread thread;
        // 等待队列中的后继节点
        Node nextWaiter;
        // 判断当前节点是否是共享模式
        final boolean isShared() {
            return nextWaiter == SHARED;
        }
        // 返回上一个节点
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
    }

    // 等待队列的头节点
    private transient volatile Node head;

    // 等待队列的尾节点
    private transient volatile Node tail;

    // 同步器的状态
    private volatile int state;

    // 返回同步器的状态
    protected final int getState() {
        return state;
    }

    // 设置当前同步器状态
    protected final void setState(int newState) {
        state = newState;
    }

    // 从用 CAS 的方式了设置当前同步器的状态,保证设置的原子性
    protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

    // 向队列中插入节点,通过“死循环”来保证节点的正确添加,在“死循环”中只有通过CAS将节点设置成为尾节点之后,当前线程才能从该方法返回,否则,当前线程不断地尝试设置
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // 因为是双向队列,tail为null,表示列表为空,
                if (compareAndSetHead(new Node())) // 采用 CAS 的方式设置头节点
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) { // 从用CAS的方式把节点添加到列表的最后
                    t.next = node;
                    return t;
                }
            }
        }
    }

    //根据模式来插入节点
    private Node addWaiter(Node mode) {
        //创建当前线程的一个节点
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        // 队列不为空,则把当前线程的节点CAS的方式插入到队列尾部
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 否则使用 enq() 方法插入
        enq(node);
        return node;
    }

    //设置队列的头结点
    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

    // 唤醒当前节点的后继节点
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        // 唤醒后继节点
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            // LockSupport 是一个工具类,拥有一系列的静态方法来阻塞或唤醒线程,以 park 开头的为阻塞,unpark开头的为唤醒
            LockSupport.unpark(s.thread);
    }

    //共享式释放同步状态,会唤醒后继线程并传播下去
    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) { // 如果当前线程状态为SIGNAL,表示释放同步状态会通知后继节点
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 把状态设置为0,表示释放同步状态
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);// 唤醒后继节点
                }
                // 如果当前线程没有获取到同步状态,则把状态设置为可传播状态
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
    //当前线程在获取同步状态失败后是否应该被阻塞
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    // 阻塞当前线程
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
    // 主要作用是把已经追加到队列的线程节点(addWaiter方法返回值)进行阻塞,但阻塞前又通过tryAccquire重试是否能获得锁,如果重试成功能则无需阻塞,直接返回
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            // 当前线程在“死循环”中尝试获取同步状态,而只有前驱节点是头节点才能够尝试获取同步状态,获取到同步状态后就可以退出,为什么只有头节点才能获取同步状态,因为头节点是成功获取到同步状态的节点,而头节点的线程释放了同步状态之后,将会唤醒其后继节点,后继节点的线程被唤醒后需要检查自己的前驱节点是否是头节点,维护FIFO原则
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) { 再次尝试获取同步状态,成功则之间返回
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                } 
                // 再次获取同步状态失败后,判断当前线程是否应该被阻塞
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt()) // 如果是则阻塞当前线程 
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    // 可中断的获取同步状态,在独占模式下使用
    private void doAcquireInterruptibly(int arg) throws InterruptedException {
        // 当前线程节点
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) { // 尝试取获取同步状态,成功则返回
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                // 获取同步状态失败,则判断是否需要阻塞
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt()) // 阻塞当前线程
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    //共享式获取同步状态
    private void doAcquireShared(int arg) {
        // 当前线程
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) { // 当大于等于0时,表示能够获取到同步状态
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    // 尝试获取同步状态,模版方法,由子类实现
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
    // 尝试释放同步状态,模版方法,由子类实现
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }
    // 尝试获取同步状态,共享模式,模版方法,由子类实现
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }
    // 尝试释放同步状态,共享模式,模版方法,由子类实现
    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }
    // 获取同步状态,该方法对中断不敏感,也就是由于线程获取同步状态失败后进入同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移出
    public final void acquire(int arg) {
        // 保证线程安全的获取同步状态,如果同步状态获取失败,则构造同步节点(独占式)并通过addWaiter()方法将该节点加入到同步队列的尾部,最后调用acquireQueued()方法,使得该节点以“死循环”的方式获取同步状态。如果获取不到则阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    // 获取同步状态,可响应中断
    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }
    // 唤醒节点线程,该方法执行时,会唤醒头节点的后继节点线程
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h); // 唤醒处于等待状态的线程
            return true;
        }
        return false;
    }
 /*
独占式同步状态获取(acquire)和释放(release)过程总结,在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋;移出队列的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时,同步
器调用tryRelease(int arg)方法释放同步状态,然后唤醒头节点的后继节点。
   */    

    // 共享式获取同步状态
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0) // 返回值大于等于0时,表示能够获取到同步状态
            doAcquireShared(arg);
    }
    // 共享式获取同步状态, 响应中断
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
    // 共享式释放同步状态
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
}Copy the code

Through the above analysis of the source, the synchronizer rely on internal synchronous queue (two-way a FIFO queue) to complete synchronization state management, the current thread for synchronous state failure, synchronizer will the current thread and wait state information such as the structure become a Node (the Node) and add it to the synchronous queue, blocks the current thread at the same time, When the synchronization state is released, the thread in the first node is woken up to try again to obtain the synchronization state.

The basic structure of the synchronization queue is shown below:

When a thread succeeds in acquiring the synchronization state (or lock), other threads cannot acquire the synchronization state and are instead constructed as nodes and added to the synchronization queue. Only after the synchronization is successfully set, the current node is formally associated with the previous tail node.

The process of adding a node to a synchronization queue:

After understanding the implementation principle of synchronizer, let’s look at the implementation principle of Lock

Implementation principle of ReentrantLock

A ReentrantLock is a ReentrantLock that allows a thread to repeatedly lock a resource. In addition, the lock also supports fair and unfair selection when acquiring the lock.

Let’s start with a class diagram for ReentrantLock:

Can see already realized the Lock interface, Sync is an inner class, it implements the synchronizer AbstractQueuedSynchronizer, and FairSync is a subclass of it, on behalf of fair Lock, NonfairSync is a subclass of it, Represents an unfair lock, and let’s look at one of its implementations:

Public class ReentrantLock implements Lock, java.io.Serializable {// Private final Sync Sync; abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; Void lock(); // Acquire the lock in an unfair way. The lock needs to identify whether the thread that acquires the lock is the thread that currently occupies the lock. If so, Final Boolean nonfairTryAcquire(int acquires) {final Thread current = thread.currentThread (); int c = getState(); If (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current); return true; }} // If the current thread has acquired the lock and the current thread is the one occupying the lock, increment the synchronization status value and return true, indicating that the synchronization status was acquired successfully. Else if (current == getExclusiveOwnerThread()) {int nexTC = c + acquires; If (nexTC < 0) // Overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } // Release the lock. If the lock was acquired n times, then the (n-1) tryRelease(int Releases) method must return false, and only return true if the synchronization state is fully released. As you can see, the method conditions the final release on whether the synchronization status is 0. When the synchronization status is 0, the owning thread is set to NULL and returns true, indicating that the release is successful. protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() ! = getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; Protected final Boolean isHeldExclusively() {return getExclusiveOwnerThread() == Thread.currentThread(); }} static final class NonfairSync extends Sync {// Lock final void lock() {if (compareAndSetState(0, 0); 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected Final Boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires); }} static final class extends Sync {final void lock() {acquire(1); Protected final Boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread(); int c = getState(); If (c == 0) {// The hasqueuedToraise method determines whether the current node added to the synchronous queue has a precursor node. If the hasqueuedToraise method returns true, it indicates that a thread requested the lock earlier than the current thread. So you need to wait for the precursor thread to acquire and release the lock before you can continue to acquire the lock. hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; Public ReentrantLock() {sync = new NonfairSync(); } public ReentrantLock(Boolean fair) {sync = fair? new FairSync() : new NonfairSync(); } public void lock() {sync.lock(); } / / outward provided interfaces, acquiring a lock, which can response the interrupt public void lockInterruptibly () throws InterruptedException {sync. AcquireInterruptibly (1); } public Boolean tryLock() {return sync.nonfairtryacquire (1);} public Boolean tryLock() {return sync.nonfairtryacquire (1); } // If the lock is not obtained within the specified time, Public Boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } // Unlock public void unlock() {sync.release(1); }}Copy the code

Condition interfaces

Any Java object has a set of monitor methods, including wait(), wait(long Timeout), notify(), and notifyAll(), which, in conjunction with the synchronized synchronization keyword, implement the wait/notification mode. The Condition interface also provides object-like monitor methods that work with Lock to implement the wait/notify mode. Condition has more flexibility than synchronized. For example, multiple Condition instances can be implemented on a Lock object, and thread objects can be registered with different Condition instances. This allows threads to be notified selectively.

Condition defines two types of wait/notify methods that require the lock associated with the Condition object to be acquired before the current thread calls these methods. Condition is created from the Lock object; in other words, Condition depends on the Lock object. Here’s an example:

private ReentrantLock lock = new ReentrantLock(); public Condition conditionA = lock.newCondition(); public void awaitA() throws InterruptedException { if(lock.tryLock()) { System.out.println( System.currentTimeMillis() +  ", threadName : ")); conditionA.await(); lock.unlock(); }else{ } }Copy the code

After calling the await() method, the current thread releases the lock and waits, while another thread calls the signal() method of the Condition object, notifying the current thread, which returns from the await() method and has acquired the lock before returning.

The realization analysis of Condition

ConditionObject is synchronizer AbstractQueuedSynchronizer inner classes, each Condition object contains a queue (hereinafter called the waiting queue), the queue is the key to the Condition object implementation waiting/notifications.

1. Wait in a queue

A queue is a FIFO queue. Each node in the queue contains a thread reference, which is the thread waiting on Condition. If a thread calls condition.await (), the thread will release the lock, construct a node to join the queue, and enter the wait state. Nodes are defined using the nodes defined in the synchronizer and the Node class.

public class ConditionObject implements Condition, java.io.Serializable { private transient Node firstWaiter; private transient Node lastWaiter; . }Copy the code

A Condition contains a wait queue, and the Condition has a first node (firstWaiter) and a last node (lastWaiter). The current thread calls the condition.await () method, which constructs the node from the current thread and adds the node from the tail to the wait queue. The basic structure of the wait queue is as follows:

Condition has a reference to the first and last nodes, and a new node simply points nextWaiter at it and updates the last node

2. Waiting for

Calling the await() method of Condition causes the current thread to enter the wait queue and release the lock while the thread state changes to wait. When returned from await(), the current thread must have acquired the lock associated with Condition.

public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // the current thread joins the queue. Node Node = addConditionWaiter(); Int savedState = fullyRelease(node); int interruptMode = 0; while (! IsOnSyncQueue (node)) {// The current thread is in the synchronization queue, exit locksupport. park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) ! = 0) break; } if (acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter ! = null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode ! = 0) reportInterruptAfterWait(interruptMode); }Copy the code
private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t ! = null && t.waitStatus ! = Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node Node = new Node(thread.currentThread (), node.condition); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }Copy the code
final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); If (release(savedState)) {// Release lock failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; }}Copy the code


3. Inform

Calling the signal() method of Condition wakes up the node that has waited the longest in the wait queue (the first node) and moves the node to the synchronization queue before waking it up

public final void signal() { if (! IsHeldExclusively ()) / / the current thread must acquire the lock throw new IllegalMonitorStateException (); Node first = firstWaiter; if (first ! = null) doSignal(first); }Copy the code
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (! transferForSignal(first) && (first = firstWaiter) ! = null); }Copy the code
final boolean transferForSignal(Node node) { if (! compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || ! compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }Copy the code