ReentrantLock, we call it ReentrantLock. These rely on AbstractQueuedSynchronizer class to implement thread synchronization.

ReentrantLock Demo

/ * * *@program: source-demo
 * @description: ReentrantLock implements thread-safe versions of counters *@ClassName: ReentrantLockDemo *@author: Mr.Wang
 * @create: he who * * / 2022-01-27
public class ReentrantLockDemo {
  static ReentrantLock lock = new ReentrantLock();
  private int count = 0;
  public void incr(a){
    lock.lock();
    try {
      count++;
    } finally{ lock.unlock(); }}public static void main(String[] args) throws InterruptedException {
    ReentrantLockDemo atomicExample = new ReentrantLockDemo();
    Thread[] threads = new Thread[2];
    for (int i = 0; i < 2; i++) {
      threads[i]=new Thread(() ->{
        for (int j = 0; j < 10000; j++) { atomicExample.incr(); }}); threads[i].start(); } threads[0].join();
    threads[1].join(); System.out.println(atomicExample.count); }}Copy the code

ReentrantLock defines a Sync class, source code as follows:

abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;
​
        // Abstract methods
        abstract void lock(a);
​
      // Not fair, trying to get resources
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true; }}else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
​
  // In exclusive mode, attempt to release the resource, return true on success, false on failure
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if(Thread.currentThread() ! = getExclusiveOwnerThread())throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
​
        protected final boolean isHeldExclusively(a) {
            // While we must in general read state before owner,
            // we don't need to do so to check if current thread is owner
            return getExclusiveOwnerThread() == Thread.currentThread();
        }
​
        final ConditionObject newCondition(a) {
            return new ConditionObject();
        }
​
        // Methods relayed from outer class
​
        final Thread getOwner(a) {
            return getState() == 0 ? null : getExclusiveOwnerThread();
        }
​
        final int getHoldCount(a) {
            return isHeldExclusively() ? getState() : 0;
        }
​
        final boolean isLocked(a) {
            returngetState() ! =0;
        }
​
        /** * Reconstitutes the instance from a stream (that is, deserializes it). */
        private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // reset to unlocked state}}Copy the code

Sync class inherited AbstractQueuedSynchronizer, referred to as “AQS.

There are two types of locks available in AQS:

  • An exclusive lock that allows only one thread to acquire the lock at a time

    /** * Exclusive mode, attempts to obtain resources, return true on success, false * on failure@param arg
         * @return* /
    @Override
    protected boolean tryAcquire(int arg) {
      return super.tryAcquire(arg);
    }
    ​
    ​
    /** * Exclusive, attempts to release resources, return true on success, false * on failure@param arg
         * @return* /
    @Override
    protected boolean tryRelease(int arg) {
      return super.tryRelease(arg);
    }
    Copy the code
  • A shared lock allows multiple threads to acquire the lock at the same time
  /** * In shared mode, attempts to release the resource, returns true if it is allowed to wake up subsequent waiting nodes, otherwise returns false *@param arg
     * @return* /
    @Override
    protected boolean tryReleaseShared(int arg) {
        return super.tryReleaseShared(arg);
    }
​
    /** * Share mode. Attempt to obtain resources. Negative numbers indicate failure; 0 indicates success, but no available resources are available. A positive value indicates success and available resources. *@param arg
     * @return* /
    @Override
    protected int tryAcquireShared(int arg) {
        return super.tryAcquireShared(arg);
    }
Copy the code

The core variables

/** ** private volatile int state;Copy the code
  • State > 0: indicates that a thread has occupied the resource but has not released it. In the case of reentrant, the value of state may be greater than 1
  • State = 0: The current lock resource is idle
// Ensure atomicity of state in multithreaded contention
protected final boolean compareAndSetState(int expect, int update) {
  // See below for intrinsics setup to support this
  return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
Copy the code

Already the source code

  • When you call the reentrantLock. lock() method, you actually call the abstract static inner class sync.lock() method.
public void lock(a) {
  sync.lock();
}
Copy the code

Syanc has two specific implementations:

Fair lock, must follow the RULES of FIFO to access the lock resources

  • static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;
    ​
        final void lock(a) {
            acquire(1);
        }
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if(! hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true; }}else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false; }}Copy the code

    Unfair lock: You can directly attempt to obtain lock resources without following THE FIFO rules. By default, the unfair lock is used

  •   static final class NonfairSync extends Sync {
            private static final long serialVersionUID = 7316153563782823691L;
            final void lock(a) {
              // Whether the current thread is queued or not, directly preempt the lock resource through CAS.
              Acquire (1); // Acquire (1);
                if (compareAndSetState(0.1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }
    ​
            protected final boolean tryAcquire(int acquires) {
                returnnonfairTryAcquire(acquires); }}Copy the code

    Acquire (int I

/** Try to acquire an exclusive lock through tryAcquire(), return true if successful, false otherwise. If tryAcquire() returns false, the current lock is occupied and the current thread can only be wrapped as a Node by the addWaiter() method and added to the AQS synchronization queue. AcquireQueued () takes Node as an argument and attempts to obtain the lock */ by spinning
public final void acquire(int arg) {
  if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code

CAS Implementation Principle

Protected final Boolean compareAndSetState(int expect, int update) {// Use CAS optimistic lock to compare and replace. Then it is updated to Update. Return true if the update was successful, false otherwise. Return unsafe.compareAndSwapInt(this, stateOffset, expect, update); return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }Copy the code

State property

State is an attribute in AQS that means different things in different implementations. For an implementation of a reentrant lock, state represents the synchronization state, which has two meanings.

  • When state is 0, it indicates that there is no lock.
  • When state>0, it indicates that a thread has acquired the lock, that is, state=1. However, because ReentrantLock allows ReentrantLock, when the same thread acquires the lock multiple times, state will increase. For example, if the thread ReentrantLock has five times, state=5. When the lock is released, it also needs to be released 5 times, until state=0 before any other thread is eligible to acquire the lock.

NonfairTryAcquire () method source code

final boolean nonfairTryAcquire(int acquires) {
  final Thread current = Thread.currentThread();// Get the current thread
  int c = getState();// Get the state value
  if (c == 0) {// 0 indicates no lock
    if (compareAndSetState(0, acquires)) {//CAS compares and replaces the value of state
      setExclusiveOwnerThread(current);// Save the current thread lock, the next access to this resource does not need to compete for the lock again
      return true; }}else if (current == getExclusiveOwnerThread()) {// If it is the same thread
    // Directly increase the number of reentrants
    int nextc = c + acquires;
    if (nextc < 0) // overflow
      throw new Error("Maximum lock count exceeded");
    setState(nextc);
    return true;
  }
  return false;
}
Copy the code

The implementation logic of the nonfairTryAcquire() method is as follows. Determine the status of the current lock. C ==0 indicates that there is no lock. In this case, modify state to preempt lock resources through compareAndSetState(). ○ If the preemption succeeds, true is returned. ○ If preemption fails, false is returned. Current == getExclusiveOwnerThread(), which indicates that the thread preempted to the lock is the same thread as the current thread. Therefore, the reentrant count is directly increased and stored in the state field

AbstractQueuedSynchronizer.addWaiter(Node mode)

When tryAcquire() fails to acquire the lock, the addWaiter() method is first called to encapsulate the current thread as a Node and join the queue. The source code is as follows

private Node addWaiter(Node mode) {// The mode parameter indicates the current Node state, and the node. EXCLUSIVE parameter is passed to indicate the EXCLUSIVE state.
  Node node = new Node(Thread.currentThread(), mode);// Encapsulate the thread that failed to acquire the lock as Node
  Node pred = tail;//tail Indicates the pair of queues in AQS. The default value is null
  if(pred ! =null) {// If tail is not null, there is a node in the queue
    node.prev = pred;// Point the Node prev of the current thread to tail
    if (compareAndSetTail(pred, node)) {// Add node to the AQS queue via CAS, that is, set to tail
      pred.next = node;// Point the next of the tail node to the current node
      return node;
    }
  }
  enq(node);// Add node to the synchronization queue when tail=null
  return node;
}
Copy the code

Encapsulate the current thread into a Node and store it. You can then retrieve the thread directly from the Node and wake it up using the unpark(Thread) method. By Mr Pred! =null Determines whether the current linked list has been initialized. If the initialization has been completed, compareAndSetTail sets the Node of the current thread as the tail Node and establishes the bidirectional association. If the list has not been initialized or the CAS addition fails (with thread contention), the enq() method is called to complete the addition.

Enq () method

private Node enq(final Node node) {
  for (;;) {
    Node t = tail;
    if (t == null) { // If null, CAS initialization is called. Until successful initialization
      if (compareAndSetHead(new Node()))
        tail = head;
    } else {
      node.prev = t;
      if (compareAndSetTail(t, node)) {
        t.next = node;
        returnt; }}}}Copy the code

The method uses a spin lock to initialize the synchronization queue and add the current node to the synchronization queue. The overall structure of AQS is shown as follows:

)

ReentrantLock release lock source code analysis

public void unlock(a) {
  sync.release(1);
}
​
public final boolean release(int arg) {
  if (tryRelease(arg)) {// Release successful
    Node h = head;// Get the head node in AQS
    if(h ! =null&& h.waitStatus ! =0)
      // If the head is not null and the state is not equal to 0, the unparksucceeded (h) method is called to wake up the succeeding nodes
      unparkSuccessor(h);
    return true;
  }
  return false;
}
Copy the code

TryRelease (int Releases) releases the lock by modifying the state value

protected final boolean tryRelease(int releases) {
  int c = getState() - releases;// Subtract the number of releases
  if(Thread.currentThread() ! = getExclusiveOwnerThread())throw new IllegalMonitorStateException();
  boolean free = false;
  if (c == 0) {
    free = true;
    setExclusiveOwnerThread(null);//
  }
  setState(c);
  return free;
}
Copy the code

The state of an exclusive lock increases by 1 when it is added to the lock, and decreases by 1 when it is released. When the same lock can be re-entered, it may increase by 2, 3, 4, and 5. The ‘ExclusiveOwnerThread’ thread is empty only when unlock() is called the same number of times as lock() is called. The lock is released

Unparksucceeded (Node Node) Wakes the threads in the synchronization queue

private void unparkSuccessor(Node node) {
​
  int ws = node.waitStatus;// Get the status of the head node
  if (ws < 0)
    compareAndSetWaitStatus(node, ws, 0);// Set the node status to 0
  Node s = node.next;// Get the next node of the head node
  if (s == null || s.waitStatus > 0) {
    // Cancelled if the next node is null or status>0
    // Find the node whose waitStatus<=0 is closest to head by scanning from the tail node
    s = null;
    for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
        s = t;
  }
  if(s ! =null)// If the next node is not empty, the thread is awakened directly
    LockSupport.unpark(s.thread);
}
Copy the code

The unparksucceeded () method has two main logics.

  • Check the status of the current node. If the node status is invalid, scan the tail node to find the node nearest the head and whose status is SIGNAL.

  • Wake up the node with the locksupport.unpark () method.

    Why scan from tail forward? This is related to the enq() method, where the logic to add a new node to the list is as follows. Point the prev of the new node to tail. Setting tail to the new node through CAS ensures thread-safety because CAS is an atomic operation. T. next=node to set tail’s next node to point to the new node. If, after CAS and before t. ext=node, another thread calls unlock() and traverses from head, the list is not completely established because t.ext =node has not been executed, and the traversal to t node will be interrupted. If you traverse from tail forward, this problem will definitely not occur.

The thread that releases the lock continues to execute

Back to the acquireQueued() method in AQS, threads that were not preempted to the lock are blocked in this method, and when the blocked thread is woken up, execution continues from the blocked position, as shown below.

final boolean acquireQueued(final Node node, int arg) {
  boolean failed = true;
  try {
    boolean interrupted = false;
    for (;;) {
      final Node p = node.predecessor();// Returns the last node
      if (p == head && tryAcquire(arg)) {// Preempt the lock resource again
        setHead(node);
        p.next = null; // help GC
        failed = false;
        return interrupted;
      }
      if (shouldParkAfterFailedAcquire(p, node) &&
          parkAndCheckInterrupt())// Wake up to enter the next loop
        interrupted = true; }}finally {
    if(failed) cancelAcquire(node); }}Copy the code

\