Java.util.concurrent the JDK’s JUC package (java.util.concurrent) provides a wide range of Java concurrency tools to use

This paper starts with the object lock commonly used in JUC package, the use of concurrent tools and functions, with problems, from shallow to deep, step by step to analyze the implementation of the underlying AQS abstract class

Noun explanation

1 AQS

AQS is an abstract class, the class path all Java. Util. Concurrent. The locks. AbstractQueuedSynchronizer, abstract queue synchronizer, is based on the concurrent tool abstract class template pattern development, has the following concurrent classes based on AQS implementation:

2 CAS

CAS, short for Conmpare And Swap, is an atomic manipulation instruction

The CAS mechanism uses three basic operands: memory address addr, oldVal, oldVal, and newVal. When a variable is updated, the value of memory address addr is changed to newVal only if the expected value of the variable oldVal is the same as the actual value of memory address addr

Based on the idea of optimistic locking, the variable value thread can be updated safely through constant trial and comparison of CAS

3 Thread Interrupt

Thread interrupt is a thread cooperation mechanism used to cooperate with other threads to interrupt the execution of tasks

When a thread is in a blocking wait state, such as after calling wait(), join(), or sleep() and interrupt(), the thread immediately exits the block and receives InterruptedException.

Interrupt () isInterrupted by calling isInterrupted() in the thread’s task execution logic and then responds to interrupt. InterruptedException is usually thrown

Object locking features

What are the basic features of object locking and concurrency tools, and how are they implemented

1 Obtain the value explicitly

Take ReentrantLock as an example. You can obtain the lock explicitly in the following four ways

  • (1) Block waiting for acquisition
ReentrantLock lock = new ReentrantLock(); // block and wait until successful lock.lock();Copy the code
  • (2) No blocking attempt to obtain
ReentrantLock lock = new ReentrantLock(); // Try to acquire the lock, if the lock is already occupied by another thread, do not block and wait for direct returnfalse/ / returntrue- The lock is free and acquired by the thread, or is already held by the thread // returnedfalse- Failed to obtain lock Boolean isGetLock = lock.tryLock();Copy the code
  • (3) Block waiting for acquisition within a specified time
ReentrantLock lock = new ReentrantLock(); Try {// Attempts to obtain the lock within the specified time // returnstrue- The lock is free and acquired by the thread, or is already held by the thread // returnedfalse- The lock lock.tryLock(10, timeunit.seconds) has not been obtained within the specified time; } catch (InterruptedException e) {if (InterruptedException e) {if (InterruptedException e) {if (InterruptedException e) { }Copy the code
  • (4) Response to interrupt acquisition
ReentrantLock lock = new ReentrantLock(); If thread.interrupt() is set to interrupt, the thread exits the block and waits and throws the interrupt exception lock.lockInterruptibly(); } catch (InterruptedException e) { e.printStackTrace(); }Copy the code

2 Explicit release

ReentrantLock lock = new ReentrantLock();
lock.lock();
// ... 各种业务操作
// 显式释放锁
lock.unlock();
Copy the code

3 reentrant

A thread that has acquired a lock can acquire the lock if it requests it again

4 can be Shared

The same resource can be shared by multiple threads. For example, the read lock of a read/write lock can be shared by multiple threads. The shared lock enables multiple threads to access data concurrently and securely, improving program execution efficiency

5 fair and unfair

Fair lock: Multiple threads compete for locks on a fair, first-come, first-served basis. Before each lock is added, it will check whether there are threads in the wait queue. If there are no threads in the wait queue, it will try to obtain the lock. Unfair lock: When a thread obtains a lock unfairly, the thread tries to acquire the lock first instead of waiting. If no success is obtained, the waiting queue is entered

Because the non-fair lock method can make the subsequent threads have a certain probability to directly acquire the lock, reducing the probability of thread suspension and waiting, the performance is better than the fair lock

AQS implementation principle

1 Basic Concepts

(1) Condition interfaces

Methods like Object wait(), wait(long timeout), notify(), and notifyAll() combined with synchronized built-in locks can implement the wait/notification mode. Object locks such as ReentrantLock and ReentrantReadWriteLock that implement the Lock interface have similar capabilities:

Condition interface defines await(), awaitNanos(long), signal(), signalAll() and other methods, and implements wait/notification function with object lock instances. Condition interface is implemented based on AQS internal class ConditionObject. The thread blocks and enters the CLH queue (mentioned below), waiting for another thread to wake up after calling the signal method

(2) the CLH queue

CLH queue, CLH is the short name of Craig, Landin, Hagersten who came up with the algorithm

AQS maintains a bidirectional FIFO CLH queue internally, and AQS relies on it to manage the waiting threads. If the thread fails to obtain the synchronized competing resources, it will block the thread and join the CLH synchronization queue. When competing resources are idle, threads are blocked based on the CLH queue and resources are allocated

The HEAD node of the CLH stores information about threads currently occupying resources, or no threads, while the other nodes store information about queued threads

The status (waitStatus) of each node in the CLH is as follows:

  • CANCELLED(1) : the current node has been CANCELLED. When timeout or interrupted (in the case of a response to an interrupt), a change is triggered to this state, after which the node will not change again
  • SIGNAL(-1) : indicates that the successor node is waiting for the current node to wake up. The status of the successor node is updated to SIGNAL before it enters the sleep state after joining the queue
  • CONDITION(-2) : indicates that the node is waiting on CONDITION. When another thread calls CONDITION signal(), the node in CONDITION will be transferred from the wait queue to the synchronization queue, waiting for the synchronization lock
  • PROPAGATE(-3) : In the shared mode, the precursor node not only wakes up the subsequent node, but also may wake up the subsequent node
  • 0: indicates the default status when a new node is added to the queue

(3) Resource sharing

AQS defines two types of resource sharing: Exclusive, which can be performed by only one thread, such as ReentrantLock Share, and multiple threads, such as Semaphore/CountDownLatch

(4) how to block/wake up the thread

The unpark method, provided by the sun.misc.Unsafe class, blocks the thread, and unpark wakes it up. A thread blocked by the park method can exit the block in response to interrupt()

2 Basic Design

Core design idea: AQS provides a framework for implementing CLH queue dependent blocking locks and associated concurrent synchronizers. Subclasses implement protect methods to determine whether resources can be acquired/released. AQS implements thread scheduling strategies based on these protect methods

AQS also provides a variable of int type that supports thread-safe atomic update as a synchronous state value. Subclasses can flexibly define the meaning of the variable to update according to actual needs

The family of Protect methods redefined by subclasses is as follows:

  • Boolean tryAcquire(int) Exclusive attempt to acquire the resource, return true on success, false on failure
  • Boolean tryRelease(int) Exclusive attempts to free the resource, returns true on success, false on failure
  • Int tryAcquireShared(int) Share mode Trying to obtain resources. Negative numbers indicate failure; 0 indicates success, but no available resources are available. A positive number indicates success and free resources
  • Boolean tryReleaseShared(int) Share mode attempts to free the resource, returns true if allowed to wake up subsequent waiting nodes, false otherwise

These methods are always called by threads that need scheduling cooperation, and subclasses must redefine these methods in a non-blocking manner

AQS provides the following methods to obtain/release resources based on the tryXXX method described above:

  • Void acquire(int) acquires resources exclusively, and the thread returns directly. Otherwise, the thread is queued until it acquires resources, ignoring the impact of interrupt
  • Boolean release(int) an exclusive thread that releases a specified amount of the resource. If it does, it wakes up other threads in the waiting queue to retrieve the resource
  • Void acquireShared(int) Resources are obtained in shared mode
  • Boolean releaseShared(int) Release resources in shared mode

Take exclusive mode as an example: The core implementation of obtaining/releasing resources is as follows:

 Acquire:
     while(! TryAcquire (ARG)) {queue a thread if it is not already queued; } Release:if(tryRelease(arG)) wakes up the first queued thread in CLHCopy the code

Here, a little round, the following picture to introduce the above design ideas to redefine:

Feature implementation

The following describes the implementation principles of a series of functions and features of aQs-based object locking and concurrency tools

1 Obtain the value explicitly

This feature uses ReentrantLock as an example. A ReentrantLock is a reentrantobject lock. Each time a thread successfully obtains the lock, the synchronization state value is increased by 1, and the release lock state value is decreased by 1

ReentrantLock locks support fair/unfair features. The following explicit acquisition features use fair locks as an example

(1) Block waiting for acquisition

The basic implementation is as follows:

  • TryAcquire (int) ReentrantLock AQS tryAcquire(int) ReentrantLock AQS tryAcquire(int
  • 2. Acquire(int) method of AQS determines whether the current node is head and whether the resource can be obtained based on tryAcquire(int). If not, it will join CLH queue and wait for blocking
  • AQS acquire(int) method blocks and waits for the lock to be acquired

TryAcquire (int) in ReentrantLock

protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // No thread holds the lockif(c == 0) {// No other thread acquires earlier than the current one and set state based on CAS successfully (expected state old value is 0)if(! Hasqueuedtoraise () && compareAndSetState(0, acquires)) {// Set the thread holding the lock to the current threadsetExclusiveOwnerThread(current);
			return true; }} // The thread holding the lock is the current threadelse if(current == getExclusiveOwnerThread()) {int nexTC = c + acquires;if (nextc < 0)
			throw new Error("Maximum lock count exceeded");
		setState(nextc);
		return true; } // Another thread already holds the lockreturn false;
}
Copy the code

AQS acquire(int) method implementation

Public final void acquire(int arg) {public final void acquire(int arg) {public final void acquire(int arg) {public final void acquire(int arg) {public final void acquire(int arg)if(! TryAcquire (ARG) && acquireQueued(addWaiter(Node.exclusive), ARg)) // acquireQueued returnstrue, which means that the thread is interrupted in the process of obtaining resources // then the method is called to set the thread interrupt flag bit totrueselfInterrupt(); } final Boolean acquireQueued(final Node Node, int arg) {Boolean failed =true; Try {// Flags whether the wait has been interrupted. Boolean interrupted =false; // Loop until the resource is releasedfor(;;) {// Final Node p = node.predecessor(); // If the head is the first node, then this node is eligible to attempt to fetch the resource.if(p == head && tryAcquire(arg)) {// The resource was acquired successfullysetHead(node);
				// help GC
				p.next = null; 
				failed = false;
				returninterrupted; } // If the thread interrupts during the process and does not respond to interrupts // and continues to queue for resources, set the interrupted variable to interruptedtrue
			if (shouldParkAfterFailedAcquire(p, node) &&
				parkAndCheckInterrupt())
				interrupted = true;
		}
	} finally {
		if(failed) cancelAcquire(node); }}Copy the code

(2) No blocking attempt to obtain

The implementation of tryLock() in ReentrantLock is merely a non-fair lock implementation. The implementation logic is basically the same as that of tryAcquire, except that it does not check the HEAD of the CLH queue by hasqueued24 () to see if another thread is waiting on it, so that when the resource is released, If there are threads requesting resources, they can jump the queue first

TryLock () in ReentrantLock

public boolean tryLock() {
	returnsync.nonfairTryAcquire(1); } final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // No thread holds the lockif(c == 0) {// Set state successfully based on CAS (old expected state value was 0) // No check to see if there are threads waiting in the CLH queueif (compareAndSetState(0, acquires)) {
			setExclusiveOwnerThread(current);
			return true; }} // The thread holding the lock is the current threadelse if(current == getExclusiveOwnerThread()) {int nexTC = c + acquires;if(nextc < 0) // overflow, int overflow throw new Error("Maximum lock count exceeded");
		setState(nextc);
		return true; } // Another thread already holds the lockreturn false;
}
Copy the code

(3) Block waiting for acquisition within a specified time

The basic implementation is as follows:

  • ReentrantLock’s tryLock(long, TimeUnit) calls AQS ‘tryAcquireNanos(int, long)
  • TryAcquire (int); doAcquireNanos(int, long)
  • AQS doAcquireNanos determines whether the current node is head and can obtain resources based on tryAcquire(int). If it cannot obtain resources and the timeout is greater than 1 microsecond, the node will sleep for a while and then try to obtain resources

The implementation in ReentrantLock is as follows:

public boolean tryLock(long timeout, TimeUnit unit)
		throws InterruptedException {
	returnsync.tryAcquireNanos(1, unit.toNanos(timeout)); } public final boolean tryAcquireNanos(int arg, Long nanosTimeout) throws InterruptedException {// If the thread has been interrupted by the interrupt() methodif(Thread.interrupted()) throw new InterruptedException(); // tryAcquire tries to acquire the lockreturn tryAcquire(arg) ||
		doAcquireNanos(arg, nanosTimeout);
}
Copy the code

The implementation in AQS is as follows:

private boolean doAcquireNanos(int arg, long nanosTimeout)
		throws InterruptedException {
	if (nanosTimeout <= 0L)
		return false; Final long deadline = system.nanotime () + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); Boolean failed =true;
	try {
		for(;;) {// Final Node p = node.predecessor(); // If the head is the first node, then this node is eligible to attempt to fetch the resource.if(p == head && tryAcquire(arg)) {// The resource was acquired successfullysetHead(node);
				// help GC
				p.next = null; 
				failed = false;
				return true; } // update timeout timeout nanosTimeout = deadline-system.nanotime ();if (nanosTimeout <= 0L)
				return false; If the timeout period is greater than 1 microsecond, the thread will sleep until the timeout period expiresif(shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); // If the thread is already interrupted by the interrupt() method, it is no longer queued and exitsif (Thread.interrupted())
				throw new InterruptedException();
		}
	} finally {
		if(failed) cancelAcquire(node); }}Copy the code

(4) Response to interrupt acquisition

ReentrantLock acquires the lock in response to an interrupt by: When a thread responds to thead.interrupt() while sleeping in the Park method, it checks that the thread interrupt flag is true and actively throws an exception. The core implementation is in the doAcquireInterruptibly(int) method of AQS

The basic implementation is similar to blocking wait for fetch, except that the AQS doAcquireInterruptibly(int) method is called instead from acquire(int)

private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); Boolean failed =true;
	try {
		for(;;) {// Final Node p = node.predecessor(); // If the head is the first node, then this node is eligible to attempt to fetch the resource.if(p == head && tryAcquire(arg)) {// The resource was acquired successfullysetHead(node);
				p.next = null; // help GC
				failed = false;
				return; } // there is a queue blocking waitif(shouldParkAfterFailedAcquire (p, node) && aroused from / / line blocking, if check to interrupt flag bittrueParkAndCheckInterrupt ()) // Throw new InterruptedException(); } } finally {if(failed) cancelAcquire(node); }}Copy the code

2 Explicit release

AQS resource sharing can be divided into exclusive and shared modes. This section uses ReentrantLock as an example to describe the explicit release of exclusive resources and the shared mode will be described later

Explicit ReentrantLock explicit ReentrantLock explicit ReentrantLock explicit ReentrantLock

  • ReentrantLock implements AQS tryRelease(int), which reduces the state variable by 1. If state becomes 0, no thread is holding the lock, return true, otherwise return false
  • The release(int) method of AQS is based on the tryRelease(int) queue to see if there are any threads holding resources, and if not, to wake up the thread of the head node in the CLH queue
  • The thread continues to execute the acquireQueued(Node,int) or doAcquireNanos(int, long) or doAcquireInterruptibly(int) for(;;). To continue trying to get the resource

TryRelease (int) in ReentrantLock

protected final boolean tryRelease(int releases) { int c = getState() - releases; // Only the thread holding the lock is eligible to release the lockif(Thread.currentThread() ! = getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); Boolean free = indicates whether no thread holds the lockfalse; // There is no thread holding the lock. // Every reentrant lock needs an unlockif (c == 0) {
		free = true;
		setExclusiveOwnerThread(null);
	}
	setState(c);
	return free;
}
Copy the code

The release(int) method in AQS is implemented as follows:

Public final Boolean release(int arg) {// Try to free the resourceif(tryRelease(arg)) { Node h = head; // The head node is not empty. // The status of the precursor node is updated to SIGNAL(-1) before the successor node enters the sleep state after joining the queue. // The status of the head node is 0, indicating that there are no successor waiting nodesif(h ! = null && h.waitStatus ! // The head node is the thread that hogs resources, the second node is the first unparksucceeded (h);return true;
	}
	return false;
}
Copy the code

3 reentrant

The implementation of ReentrantLock is relatively simple. For example, ReentrantLock is mainly implemented in tryAcquire(int). If the thread holding the lock is the current thread, if so, update the synchronization status value state and return true, indicating that the lock can be acquired

4 can be Shared

Shareable resource The following uses ReentrantReadWriteLock as an example. The difference between ReentrantLock and ReentrantReadWriteLock is that multiple threads can share the read lock. When the write lock is released, multiple threads blocking and waiting for the read lock can obtain the read lock at the same time

The ReentrantReadWriteLock class defines the state synchronization status of AQS as follows: the high 16 bits are the number of read locks held, and the low 16 bits are the number of write locks held

TryAcquireShared (int) and tryReleaseShared(int) in ReentrantReadWriteLock have long logic, mainly involving mutual exclusion of read and write, reentry judgment, and concession of read lock to write lock

Get ReadLock (readlock. lock()).

  • ReentrantReadWriteLock Implements the tryAcquireShared(int) method of AQS to determine whether the current thread can obtain a read lock
  • If AQS acquiacquireshared (int) fails, join CLH queue and wait for blocking
  • ReentrantReadWriteLock readLock. lock() blocks the AQS acquireShared(int) method to wait for the lock to be acquired

The specific realization of obtaining resources in shared mode in AQS is as follows:

Public final void acquireShared(int arg) {// If tryAcquireShared returns a negative value, the resource fails to be obtained. // If tryAcquireShared returns a negative value, the resource fails to be obtainedif (tryAcquireShared(arg) < 0)
		doAcquireShared(arg); } // If the resource is successfully acquired, it will be propagated to the node waiting for the resource on the CLH queuedoAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); Boolean failed =true;
	try {
		boolean interrupted = false;
		for (;;) {
			final Node p = node.predecessor();
			if(p == head) { int r = tryAcquireShared(arg); // The resource is successfully obtainedif(r >= 0) {// propagate to the node on the CLH queue waiting for the resourcesetHeadAndPropagate(node, r);
					p.next = null; // help GC
					if (interrupted)
						selfInterrupt();
					failed = false;
					return; }} // If the thread interrupts during the process and does not respond to interrupts // and continues to queue for resources, set the interrupted variable to interruptedtrue
			if (shouldParkAfterFailedAcquire(p, node) &&
				parkAndCheckInterrupt())
				interrupted = true;
		}
	} finally {
		if(failed) cancelAcquire(node); }} // The resource is propagated to the node on the CLH queue waiting for the resource private voidsetHeadAndPropagate(Node node, int propagate) {
	Node h = head; 
	setHead(node);
	if (propagate > 0 || h == null || h.waitStatus < 0 ||
		(h = head) == null || h.waitStatus < 0) {
		Node s = node.next;
		if(s = = null | | s.i sShared ()) / / release of Shared resourcesdoReleaseShared(); }}Copy the code

ReadLock release (readlock. unlock()) is implemented as follows: ReentrantReadWriteLock releases shared resources as follows:

  • 1. ReentrantReadWriteLock Implements the tryReleaseShared(int) method of AQS to determine whether any thread holds a read lock after the read lock is released
  • DoReleaseShared (); / / doReleaseShared()
  • ReentrantReadWriteLock readLock. unlock() releases a lock based on AQS releaseShared(int)

The specific implementation of sharing mode to release resources in AQS is as follows:

Public Final Boolean releaseShared(int arg) {// Allows to wake up dormant threads in CLHif(tryReleaseShared(arg)) {// Perform resource releasedoReleaseShared();
		return true;
	}
	return false;
}
	
private void doReleaseShared() {
	for (;;) {
		Node h = head;
		if(h ! = null && h ! = tail) { int ws = h.waitStatus; // The current node is waiting for resourcesif(ws == node.signal) {// The current Node is woken up by another threadif(! compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; unparkSuccessor(h); } / / to enterelseThe condition is that the current node has just become the head node. // The tail node has just joined the CLH queue and has not changed the status of the precursor node to SIGNAL before sleep. // CAS fails because the tail node has changed the status of the precursor node to SIGNAL before sleepelse if(ws == 0 && ! compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; } // After each wake up of the backdrive node, the thread entersdoAcquireShared, then update head // If the h variable has not been changed in this loop, head == tail and all nodes in the queue are woken upif (h == head)                 
			break; }}Copy the code

5 fair and unfair

ReentrantLock = AQS acquire(int); ReentrantLock = AQS acquire(int); Based on CAS, expect the state synchronization variable to have a value of 0(no thread holds the lock), update to 1, and queue if all CAS updates fail

// Fair locking implements final voidlock() { acquire(1); } // Non-fair locks implement final voidlock() {// the first CAS // state value is 0, which means that no thread holds the lockif (compareAndSetState(0, 1))
		setExclusiveOwnerThread(Thread.currentThread());
	else
		acquire(1);
}

protected final boolean tryAcquire(int acquires) {
    returnnonfairTryAcquire(acquires); Final Boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread(); int c = getState();if(c == 0) {// CAS attempts to obtain the lock for the second timeif (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true; }} // The lock has been obtainedelse if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
Copy the code

conclusion

The meaning of the state variable value of AQS does not necessarily represent resources. Different AQS inherited classes can have different definitions of the state variable value

For example, in the countDownLatch class, the value of the state variable represents the latch count that still needs to be released (which can be understood as the number of latches that need to be opened). Each latch needs to be opened for the door to open, and all waiting threads will start executing. Each countDown() will decrement the state variable by one. If the state variable is reduced to 0, the dormant threads in the CLH queue are awakened

Learning similar to the underlying source code recommended first set a few problems, with the problem of learning; Before popular learning, it is recommended to understand the overall design thoroughly, the overall principle (you can read the relevant documents first), and then study and source details, to avoid the beginning of the source code, easy to fail