“This is the fourth day of my participation in the Gwen Challenge in November. Check out the details: The Last Gwen Challenge in 2021.”

The introduction

With concurrent programming, will have to talk about already, when it comes to already will ask principle, when it comes to principle leads to AQS (AbstractQueuedSynchronized), and then is relentless friction on the ground. This article will focus on the locking process, and the next article will focus on the locking process.

Already use

Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
lock.lock();
try {
  while(conditional expression) {condition.wait(); }// Process logic
} finally {
    lock.unlock();
}
Copy the code

Lock. Lock () shows the lock acquisition and release in the finally block to ensure that the lock will eventually be released after it is acquired.

Lock () method call procedure (default unfair lock)

Non-fair lock call lock method source as follows:

static final class NonfairSync extends Sync {
	private static final long serialVersionUID = 7316153563782823691L;

	final void lock(a) {
		// Indicates that there is no thread currently holding the lock and sets the owner of the lock to the current thread
		if (compareAndSetState(0.1))
			setExclusiveOwnerThread(Thread.currentThread());
		/ / acquiring a lock
		else
			acquire(1);
	}

	protected final boolean tryAcquire(int acquires) {
		returnnonfairTryAcquire(acquires); }}Copy the code

The following figure shows the call process of ReentrantLock.lock.

AQS lock implementation

AQS: (AbstractQueuedSynchronizer) is provided in the JDK sync framework, internal maintains a bi-directional FIFO queue (CLH synchronous queue). This queue is used to manage synchronization state (the state state modified by volatile, used to indicate whether a lock is held).

Node

AQS maintenance queue Node structure, the following is the queue Node source:

static final class Node {
	/** Shared node */
	static final Node SHARED = new Node();

	/** exclusive node */
	static final Node EXCLUSIVE = null;

	/** The node will be set to the cancelled state because of timeout or interruption. The cancelled node will not participate in the competition and will remain cancelled
	static final int CANCELLED =  1;

	/** The successor node is in the wait state. If the current node releases synchronization or is cancelled, the successor node is notified so that it can run */
	static final int SIGNAL    = -1;

	/** The thread of the node waits on condition. When another thread calls signal on condition, the node will enter the synchronization queue from the waiting queue to obtain synchronization status */
	static final int CONDITION = -2;

	/** * The next shared synchronization state acquisition is propagated unconditionally */
	static final int PROPAGATE = -3;

	/** Wait status */
	volatile int waitStatus;

	/** The precursor node */        
	volatile Node prev;

	/** The successor node */
	volatile Node next;

	/** The thread that gets the synchronization status */
	volatile Thread thread;

	/** * The next conditional queue waits for the node */
	Node nextWaiter;

	final boolean isShared(a) {
		return nextWaiter == SHARED;
	}

	final Node predecessor(a) throws NullPointerException {
		Node p = prev;
		if (p == null)
			throw new NullPointerException();
		else
			return p;
	}

	Node() {    // Used to establish initial head or SHARED marker
	}

	Node(Thread thread, Node mode) {     // Used by addWaiter
		this.nextWaiter = mode;
		this.thread = thread;
	}

	Node(Thread thread, int waitStatus) { // Used by Condition
		this.waitStatus = waitStatus;
		this.thread = thread; }}Copy the code

FIFO queue (CLH queue)

Queue with head, tail and state three variables to maintain, the source code is as follows:

/** the head node */
private transient volatile Node head;

/** last node */
private transient volatile Node tail;

/** Synchronization status */
private volatile int state;
Copy the code

The schematic diagram of the structure is as follows:

CompareAndSetState call

First try to acquire the lock, calling the compareAndSetState method with a expected value of 0 and a new value of 1. The unsafe compareAndSwapInt method is used to modify the state property with a SINGLE CAS operation.

The CAS operation takes the volatile state value and compares it with the expected value of 0. If the value is 0, the CAS operation changes the state to 1. It also covers the topic of volatile modifiable variables ensuring visibility between threads, and the classic ABA problem of CAS operations.

The source code is as follows:

/** * If the current status value is equal to the expected value, the synchronization status is automatically set to the given update value. * This operation has the memory semantics of volatile reads and writes. *@paramExpect *@paramUpdate the new values@returnFalse indicates that the actual value does not equal the expected value, and true indicates success */
protected final boolean compareAndSetState(int expect, int update) {
	// See below for intrinsics setup to support this
	return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
Copy the code

If this method succeeds, calling the setExclusiveOwnerThread method will let the thread own the lock, with state already set to 1.

Acquire call

Entering this method indicates that another thread is currently holding the lock. Because this locking method is unfair, after entering the method, first try to obtain the lock, if not, then put the current thread in the queue, let the current thread interrupt execution.

The unfair lock in this method first shows the unfairness to the thread in the queue. Just like when we go to the bank to do business, if I am a VIP user, I can go ahead of the waiting users to do business, which is unfair to other waiting users.

public final void acquire(int arg) {
	if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code

tryAcquire

Call this method to try to acquire the lock. The source code is as follows:

final boolean nonfairTryAcquire(int acquires) {
	final Thread current = Thread.currentThread();
	int c = getState();
        If c==0, it is possible for another thread to release the lock. If state =0, the current thread will hold the lock */
	if (c == 0) {
		if (compareAndSetState(0, acquires)) {
			setExclusiveOwnerThread(current);
			return true; }}/** If the current thread is the thread that holds the lock, then state+1, which refers to the concept of a reentrant lock, which is repeated each time the thread reenters the lock
	else if (current == getExclusiveOwnerThread()) {
		int nextc = c + acquires;
		if (nextc < 0) // overflow
			throw new Error("Maximum lock count exceeded");
		setState(nextc);
		return true;
	}
	return false;
}
Copy the code

Here, setState(NEXTC) simply makes state+1, not CAS.

addWaiter

The thread that is currently unable to acquire the lock is wrapped as a Node and added to the end of the queue.

private Node addWaiter(Node mode) {
	Node node = new Node(Thread.currentThread(), mode);
	// Try the fast path of enq; backup to full enq on failure
	Node pred = tail;
	if(pred ! =null) {
		node.prev = pred;
		if (compareAndSetTail(pred, node)) {
			pred.next = node;
			return node;
		}
	}
	enq(node);
	return node;
}
Copy the code

The mode parameter is an exclusive lock or a shared lock. The default value is null. There are two steps to append to the end of a team: 1. If the current end of the team already exists (tail! =null), CAS is used to append the current thread to the end of the queue. 2. If the Tail is null or the CAS thread fails to set the Tail, the enQ method is used.

enq

Add the current thread to the queue via the for loop and the CAS spin process.

private Node enq(final Node node) {
	for (;;) {
		Node t = tail;
                // If the end of the queue is null, the queue is empty and the current thread is set as the head and tail node
		if (t == null) {
			if (compareAndSetHead(new Node()))
				tail = head;
		} else {
                // The queue is not empty, and the current thread is added to the queue via CAS.
			node.prev = t;
			if (compareAndSetTail(t, node)) {
				t.next = node;
				returnt; }}}}Copy the code

acquireQueued

This method is a complement to addWaiter and will interrupt the execution of queued threads.

final boolean acquireQueued(final Node node, int arg) {
	// The operation failed
	boolean failed = true;
	try {
		// Thread interrupt flag
		boolean interrupted = false;
		for (;;) {
			// Prev of the current node
			final Node p = node.predecessor();
			// If the previous node is a header, and the attempt to obtain synchronization status succeeded
			if (p == head && tryAcquire(arg)) {
				// Set the current thread as a header
				setHead(node);
				// Remove the prev from the queue
				p.next = null; // help GC
				failed = false;
				return interrupted;
			}
			// Determine if the current thread needs to block && block the current thread and check the thread interrupt status
			if (shouldParkAfterFailedAcquire(p, node) &&
				parkAndCheckInterrupt())
				interrupted = true; }}finally {
		// Cancel obtaining the synchronization status
		if(failed) cancelAcquire(node); }}Copy the code

P == head && tryAcquire(ARG), where the judgment also shows the significance of unfairness. There are waiting threads in the queue trying to acquire locks.

shouldParkAfterFailedAcquire

This method is the last check before a thread is blocked. It determines whether the current thread should be blocked by the wait state of the prev node.

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
	// Get the wait state of the prev node
	int ws = pred.waitStatus;
	
	if (ws == Node.SIGNAL)
		/* * If the prev status is signal, the prev is waiting and can block the current thread. * When the preV releases synchronization or cancels, the current node is notified. * /
		return true;
	if (ws > 0) {
		/* * status > 0, which indicates that the cancelled node is removed from the queue * until a non-cancelled node is found */
		do {
			node.prev = pred = pred.prev;
		} while (pred.waitStatus > 0);
		pred.next = node;
	} else {
		/* * Set prev status to signal */ via CAS
		compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
	}
	return false;
}
Copy the code

parkAndCheckInterrupt

If the program reaches this point, the current thread has been queued and can be interrupted. Thread blocking is done by calling locksupport.park (), which calls the local sun.misc.unsafe. Park () method, and further, HotSpot blocks threads to the kernel in Linux by calling the pthread_mutex_lock function.

private final boolean parkAndCheckInterrupt(a) {
	LockSupport.park(this);
	return Thread.interrupted();
}
Copy the code

The locking process is complete.

conclusion

ReentrantLock is implemented through AQS, which maintains a FIFO queue. When there is lock competition, the queue is constructed. CAS and spin are used in the construction process to ensure that threads can enter the queue. A queued thread needs to block, which is done using the locksupport.park () method. Blocking threads allows the CPU to focus more on executing the thread holding the lock, rather than wasting resources trying to acquire the spin of the lock. The above is the process analysis of ReentrantLock lock, hope the big man more comments.