Original text: chenmingyu. Top/concurrent -…

The lock

A Lock is used to control how multiple threads access a shared resource. In Java, synchronized and Lock can be used to implement the Lock function

Synchronized is a keyword in Java that hides the process of obtaining and releasing locks. Lock is an interface in Java that requires the active acquisition and release of locks. Synchronized is an exclusive Lock that can be interrupted or timed out

Interface provided by Lock

public interface Lock {

    /** * gets the lock. The current thread gets the lock and returns */ from this method
    void lock(a);

    /** * interruptible lock acquisition process can interrupt the current thread */
    void lockInterruptibly(a) throws InterruptedException;

    /** * attempts to acquire the lock without blocking, returns immediately after calling the method, true if it got the lock, false otherwise */
    boolean tryLock(a);

    /** * this method returns */ when the lock is acquired within the timeout period, or when the lock is interrupted within the timeout period
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

    /** * release lock */
    void unlock(a);

    /** * The wait notification component can only be called after the current thread has acquired the lock */
    Condition newCondition(a);
}
Copy the code

Queue synchronizer

Queue synchronizer AbstractQueuedSynchronizer (AQS synchronizer) is used to construct the lock or other synchronous component based framework

The implementation of the basic lock in the Java are by aggregating a subclass of synchronizer complete threads access control, synchronizer is the key to realize the lock, so to understand, lock for programmers, hides the implementation details, synchronizer for the realization of the lock, simplifies the lock is implemented, shielding the synchronous state management, thread queue, waiting to awaken the underlying operating, Through AbstractQueuedSynchronizer we can easily achieve a lock

Design principles

The design of synchronizer is based on the template method mode. The template methods mainly include: exclusive lock to obtain lock and release synchronization state, shared to obtain and release synchronization state, and obtain the status of waiting threads in the synchronization queue

Exclusive operation

To implement an exclusive lock, override the following methods

The method name describe
void acquire(int arg) Only one thread can obtain the synchronization status at a time. If the thread fails to obtain the synchronization status, it enters the synchronization queue and waits
void acquireInterruptibly(int arg) Exclusive access to the synchronization state, response to interrupt operations, interrupted when throwing an exception and return
boolean tryAcquireNanos(int arg, long nanosTimeout) Exclusivity retrieves synchronization state in response to interrupt operations, and adds a timeout limit that returns false if synchronization state has not been obtained by a specified time, and true otherwise
boolean release(int arg) An exclusive release of synchronization state that wakes up the thread contained by the first node in the synchronization queue

Shared operation

To implement a shared lock, override the following methods

The method name describe
void acquireShared(int arg) The synchronization state can be shared. Multiple threads can obtain the synchronization state at the same time
void acquireSharedInterruptibly(int arg) Shared gets synchronization status and responds to interrupt operations
boolean tryAcquireSharedNanos(int arg, long nanosTimeout) Shared retrieves synchronization status in response to interrupt operations, and adds a timeout limit that returns false if synchronization status has not been obtained by a specified time, and true otherwise
boolean releaseShared(int arg) Release synchronization state in shared mode
Gets synchronization queue thread information
The method name describe
Collection getQueuedThreads() Gets a collection of threads on the synchronization queue

In these template methods, synchronous queues are mentioned several times. Let’s look at how AQS implements synchronous queues

First of all see AbstractQueuedSynchronizer class diagram

Node

Node is AbstractQueuedSynchronizer class inner classes, synchronizer rely on internal state of a synchronous queue to complete synchronization of management, the current thread when acquiring the synchronization state failure, synchronizer will the current thread and waiting for the information structure into a Node Node into the synchronous queue

attribute describe
waitStatus The thread wait state contains the following:

CANCELLED is 1, indicating that the wait needs to be CANCELLED from the synchronous queue

The SIGNAL value is -1, indicating that the successor node is in the waiting state. If the current node releases the synchronization state, the successor node is notified so that the thread of the successor node can run

The value of CONDITION is -2, indicating that the node is in the wait queue

PROPAGATE value is -3, which means that the next shared synchronization state acquisition will PROPAGATE unconditionally

The value of INITIAL is 0, indicating the INITIAL state
prev:Node Precursor node
next:Node The subsequent nodes
thread:Thread The current thread
nextWaiter:Node Next wait node

It can be seen that the node information in AQS includes precursor and successor nodes, so we know that the synchronous queue of AQS is bidirectional linked list structure

AQS

Several important properties in AQS

attribute describe
state:int Synchronization: If it is 0, the lock is idle. If it is 1, the lock is occupied. If it is greater than 1, the lock has been locked multiple times by the current thread, that is, the reentrant state
head:Node The head node of the queue
tail:Node The tail node of the queue
unsafe:Unsafe AQS cas algorithm implementation

AQS provides three methods for manipulating the synchronization state

  1. getState()The synchronization status is obtained
  2. setState(int newState)Setting synchronization Status
  3. compareAndSetState(int expect, int update)useCASSets the current state. This method ensures atomicity of the setting

The basic structure of AQS is shown below

The references to the head and tail nodes in the synchronizer point to the head and tail nodes of the synchronization queue, so that only the head and tail nodes in the synchronizer need to be operated on when the operation nodes are in and out of the column

Exclusive lock

ReentrantLock

ReentrantLock the implementation of the internal AQS is based on exclusive get/release synchronization state. Let’s take a look at the implementation of ReentrantLock to further understand AQS

Re-entry means that any thread can acquire the lock again without being blocked by the lock. It means that a thread can repeatedly lock resources, and it supports the use of fair or unfair locks when acquiring the lock

Ex. :

/ * * *@author: chenmingyu
 * @date: 2019/4/12 15:09
 * @description: ReentrantLock
 */
public class ReentrantLockTest {

    private static Lock LOCK = new ReentrantLock();

    public static void main(String[] args) {
        Runnable r1 = new TestThread();
        new Thread(r1,"r1").start();
        Runnable r2 = new TestThread();
        new Thread(r2,"r2").start();
    }

    public static class TestThread implements Runnable{
        
        @Override
        public void run(a) {
            LOCK.lock();
            try {
                System.out.println(Thread.currentThread().getName()+": Lock obtained"+LocalTime.now());
                TimeUnit.SECONDS.sleep(3L);
            }catch (Exception e){
                e.printStackTrace();
            }finally{ LOCK.unlock(); }}}}Copy the code

The output

Only after R1 thread releases the lock does R2 thread acquire the lock to execute the code print data

Source code analysis

Creates an instance that uses an unfair lock by default and calls the constructor with arguments if a fair lock is required

/** * Unfair lock * creates an instance of ReentrantLock. By default, unfair lock */ is used
public ReentrantLock(a) {
    sync = new NonfairSync();
}

/** * Fair lock creates an instance of ReentrantLock. Fair is true to use fair lock */
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}
Copy the code

NonfairSync and FairSync are already kind of inner classes, inherit from already internal class Sync, Sync class inherited AbstractQueuedSynchronizer

The class diagram below

Exclusive lock acquisition

Implementation of an unfair lock

/** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */
final void lock(a) {
    if (compareAndSetState(0.1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}
Copy the code

An unfair lock will first attempt to acquire the lock by calling the compareAndSetState(0, 1) method when the lock() method is called, or acquire(1) method if no lock is obtained

CompareAndSetState (0, 1) is a CAS operation. If the value is set successfully, setExclusiveOwnerThread(thread.currentThread ()) is called. Method sets the current thread as the owner of the exclusive mode synchronization state

What we mean by obtaining synchronization state actually refers to obtaining lock state. If the synchronization state is successfully obtained, the lock is successfully added

protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
Copy the code

Acquire(1) methods are provided template methods that call tryAcquire(arg) and acquireQueued(addWaiter(node.exclusive), arg)

public final void acquire(int arg) {
    if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code

The tryAcquire(ARG) method calls the subclass’s implementation, the tryAcquire method of NonfairSync

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
Copy the code

NonfairTryAcquire (acquires) method

/** * Unfair attempt to get synchronization status */
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true; }}else if (current == getExclusiveOwnerThread()) {
        /** * Obtain the synchronization status according to the 'getState()' method. If = 0, try calling 'compareAndSetState(0, * acquires)' method to obtain the synchronization status. Sets the current thread to the * owner */ of the exclusive mode synchronization state
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
Copy the code
  1. According to thegetState()Method to get synchronization status, if equal to 0 try to callcompareAndSetState(0, acquires)Method to get the synchronization state, if set successfully, the synchronization state is successfully obtained, and the current thread is set as the owner of the synchronization state in exclusive mode
  2. If the current thread is equal to the thread with the exclusive synchronous state owner, thenstate+1: the current thread locks multiple times

AcquireQueued (addWaiter(Node.exclusive), arg) ¶ If tryAcquire(ARG) returns false, the synchronization status has not been obtained, and the lock has not been obtained, call acquireQueued(addWaiter(Node.exclusive), arG) And in the way of an infinite loop to obtain the synchronization state, if not, the thread in the node is blocked, and the blocked thread can only be through the preceding node out of the queue, or the blocking thread is interrupted to realize the wake up

The addWaiter(Node.exclusive) method constructs the Node information of the synchronization queue and adds it to the end of the synchronization queue

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if(pred ! =null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

Copy the code

We first create an instance by calling the constructor of the Node class, tail being the tail Node of the queue in the AQS

If the tail node is not empty, set the precursor node of the instance to the node pointed to by tail and call compareAndSetTail(pred, node). CompareAndSetTail (Mr Pred, node) method calls unsafe.com pareAndSwapObject (this, tailOffset, expect, update), and this method is a CAS operation, do not interrupt, To ensure that the node can be added safely by thread, the tail node of the tail node points to the current instance to add the current instance to the end of the synchronization queue

If the tail node is empty or the compareAndSetTail(pred, node) setting fails, the enq(node) method is called

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                returnt; }}}}Copy the code

If the tail Node of the current AQS is empty, then there is no thread waiting in the queue, that is, there is no Node. Call compareAndSetHead(new Node()) method to construct a head Node. A loop call to compareAndSetTail(t, node) adds the current instance to the end of the queue, and if it fails, continues until it succeeds

AcquireQueued (Final Node Node, int arg) will be invoked after the addWaiter(Node mode) method is called. The final Node Node (int arg) will enter a spin state after each Node enters the queue. And whether the synchronization state is obtained is judged as the condition. If the condition is met, it exits from the spin and is responsible for keeping the spin

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true; }}finally {
        if(failed) cancelAcquire(node); }}Copy the code

The node.predecessor() method is used to obtain the precursor nodes of the incoming node instance and compare them with the HEAD node of AQS. If they are equal, the synchronized state acquisition lock is attempted. If the acquisition is successful, setHead(node) is called. Method sets the current Node instance Node to head and sets the successors of the original HEAD Node to null to facilitate GC collection

setHead(node);

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

Copy the code

If the incoming Node with AQS head precursor for instance Node is not equal to or acquiring the synchronization state failure, call the shouldParkAfterFailedAcquire (p, Node) and parkAndCheckInterrupt () method

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /* * This node has already set status asking a release * to signal it, so it can safely park. */
        return true;
    if (ws > 0) {
        /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

Copy the code

Using the CAS operation, set the Node’s precursor Node waiting status to Node.SIGNAL. If the setting fails, false is returned because the outer loop is infinite and the current method is repeated until the setting succeeds

The parkAndCheckInterrupt() method calls looksupport.park () to block the thread and then clear the interrupt flag

private final boolean parkAndCheckInterrupt(a) {
    LockSupport.park(this);
    return Thread.interrupted();
}

Copy the code

After returning from the acquireQueued(addWaiter(Node.exclusive), ARg) method, selfInterrupt() is called to interrupt the thread

Implementation of fair locks

In the understanding of acquire (1); Once the method is in place, it is easy to understand the implementation of fair locking

final void lock(a) {
    acquire(1);
}

Copy the code

Compared to the implementation of an unfair lock, there is one less step to get the synchronization state, and the rest of the operation is the same as the implementation of an unfair lock

Fair lock and unfair lock summary:

  1. Fair lock. If there is a synchronization column before the lock is added, it is added to the end of the synchronization queue
  2. Unfair lock. Before locking, try to obtain the synchronization status regardless of whether there is a synchronization queue. If the synchronization status is not obtained, the lock is added to the end of the synchronization queue
  3. The efficiency of non-fair lock is much higher than that of fair lock. Fair lock ensures the acquisition of synchronous state. In accordance with THE FIFO principle, the cost is that a large number of thread switching is required. It is possible, however, that a thread in the synchronization queue never acquirethe lock

The process of acquiring locks exclusively

Exclusive lock release

ReentrantLock’s unlock() method actually calls AQS’s Release (int arg) method

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if(h ! =null&& h.waitStatus ! =0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

Copy the code

TryRelease (ARG) is first called to release the synchronization state

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if(Thread.currentThread() ! = getExclusiveOwnerThread())throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

Copy the code

If c==0, release the lock, set the thread owning the current exclusive lock to null, and set state to 0

The unparksucceeded (Node Node) method is then called to wake up the threads of the succeeding nodes

private void unparkSuccessor(Node node) {

    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
                s = t;
    }
    if(s ! =null)
        /* * Wake up the thread of the subsequent node */
        LockSupport.unpark(s.thread);
}

Copy the code

To summarize the process of acquiring and releasing locks exclusively:

  1. When the lock is obtained, the synchronization status will be obtained first. If the lock is successfully obtained, the lock will be successfully added. If the lock fails, the current thread information will be constructed as node information and added toAQSMaintain the tail of the synchronization queue, and start to spin, the condition to jump out of spin is that the precursor node isAQSThe node is removed from the synchronization queue
  2. When a lock is released, synchronization state is first released and subsequent nodes of the node are woken up
  3. After a thread locks N times, it must release the lock N times before another thread acquires the lock
Implement an exclusive lock yourself

Now that we know how ReentrantLock works, we can emulate our own implementation of a custom exclusive lock

steps

  1. To create aLockTestClasses that implementLockInterface, override the necessary interface
  2. inLockTestClass to create an inner classSync, inheritanceAQS, because to implement an exclusive lock, so rewritetryAcquire(int arg)andtryRelease(int arg)The method will do

LockTest code

/ * * *@author: chenmingyu
 * @date: 2019/4/11 * 3@description: Custom exclusive lock */
public class LockTest implements Lock{

    private final Sync SYNC = new Sync();

    public static class Sync extends AbstractQueuedSynchronizer{

        @Override
        protected boolean tryAcquire(int arg) {
            if(compareAndSetState(0.1)){
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {
            if(getState()<1) {throw new IllegalMonitorStateException("Release synchronization state not less than 1");
            }
            int c = getState() - arg;
            if (c == 0) {
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return true; }}@Override
    public void lock(a) {
        SYNC.acquire(1);
    }

    @Override
    public void lockInterruptibly(a) throws InterruptedException {
        SYNC.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock(a) {
        return SYNC.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public void unlock(a) {
        SYNC.release(1);
    }

    @Override
    public Condition newCondition(a) {
        return null; }}Copy the code
validation
/ * * *@author: chenmingyu
 * @date: 2019/4/12 15:09
 * @description: LockTest
 */
public class ReentrantLockTest {

    private static Lock LOCKTEST = new LockTest();

    public static void main(String[] args) {
        Runnable r1 = new TestThread();
        new Thread(r1,"LockTest 1").start();
        Runnable r2 = new TestThread();
        new Thread(r2,"LockTest 2").start();
    }

    public static class TestThread implements Runnable{

        @Override
        public void run(a) {
            LOCKTEST.lock();
            try {
                System.out.println(Thread.currentThread().getName()+": Lock obtained"+LocalTime.now());
                TimeUnit.SECONDS.sleep(3L);
            }catch (Exception e){
                e.printStackTrace();
            }finally{ LOCKTEST.unlock(); }}}}Copy the code

The output

A Shared lock

Read-write lock

ReentrantReadWriteLock is an implementation of read/write locks and implements the ReadWriteLock interface

ReentrantReadWriteLock also maintains an internal Sync class that implements AQS and overrides the corresponding methods to implement read and write locks

The state is an integer variable maintained by AQS. When an exclusive lock acquires a lock, it is increded by 1. ReentrantReadWriteLock also uses an integer variable to maintain the read and write lock states. To maintain two states on a variable, you need to segment integer variables by bit. A variable of the int type contains four characters, and a character has eight bits. The value is 32 bits

Write lock acquisition

Write locks in read/write locks, exclusive locks that support re-entry

Override the tryAcquire(int Acquires) method in Sync of ReentrantReadWriteLock’s internal class

protected final boolean tryAcquire(int acquires) {
    
    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);
    /* * 1, if the synchronization state c is not equal to 0, it indicates that there is a read or write lock */
    if(c ! =0) {
        // 2, if c is not equal to 0, w write lock synchronization status is 0, cut the current thread is not the thread holding the lock, return false
        if (w == 0|| current ! = getExclusiveOwnerThread())return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        setState(c + acquires);
        return true;
    }
    if(writerShouldBlock() || ! compareAndSetState(c, c + acquires))return false;
    setExclusiveOwnerThread(current);
    return true;
}

Copy the code

Interpretation of the

If there is a read lock, the write lock cannot be acquired. The write lock cannot be acquired until the other read thread releases the read lock. The reason for this is to ensure that the operations performed by the write lock are visible to the read lock

Write lock release

Read locks of read and write locks, shared locks that support re-entry

The write lock release process is similar to that of the exclusive lock release. Each time, the synchronization state of the write lock is reduced until it reaches 0, indicating that the write lock is released

Read lock acquisition and release

A read lock is a shared lock that supports reentrancy. Override the tryAcquireShared(int unused) method in Sync, the internal class of ReentrantReadWriteLock

protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null|| rh.tid ! = getThreadId(current)) rh = readHolds.get();int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            return nextc == 0; }}Copy the code

If another thread obtains the write lock, the current thread fails to obtain the read lock status and enters the waiting state. If the current thread obtains the write lock or the write lock is not obtained, the current thread obtains the synchronization status successfully and obtains the read lock

When the read lock is released, the synchronization status is -1 each time. When the synchronization status is 0, the read lock is released

Lock down

Lock degradation refers to the process of demoting a write lock to a read lock. When the current thread has obtained the write lock, it obtains the read lock and then releases the write lock to ensure data visibility

After the current thread A obtains the write lock, it modifies the data, then obtains the read lock, releases the write lock, and completes the lock degradation. At this time, before thread A releases the read lock, other threads cannot obtain the write lock, so they cannot make logarithmic modification, so as to ensure the visibility of data

Reference: The Art of Concurrent Programming in Java

Recommendation:

Concurrent Java programming | thread explanation