Part of this article is excerpted from The Art of Concurrent Programming in Java
An overview of the
Queue synchronizer AbstractQueuedSynchronize (hereinafter referred to as the synchronizer), is used to construct the Lock (Lock) or other synchronous component based framework (JUC and contract), it USES a int member variable synchronous state, Through the built-in FIFO queue to complete the queuing of resource acquisition threads
The primary use of synchronizers is inheritance. Subclasses manage synchronization state by inheriting the synchronizer and implementing its abstract methods. Subclasses recommend being defined as static inner classes for custom synchronization components. The synchronizer itself does not implement any synchronization interface, it simply defines several methods to get and release synchronization state for use by custom components
In a nutshell, synchronizers are a way to implement locks (or any synchronization component) that mask some of the lower-level mechanisms and make them easier to understand and use
Interface to the queue synchronizer
The synchronizer design is based on the template method pattern, in which the consumer inherits the queue synchronizer and overwrites the specified methods, then combines the synchronizers in the implementation of the custom synchronization component and calls the template methods provided by the synchronizer, which will invoke the user-overwritten methods
1. Access or modify the synchronization status
When overriding a method specified by the synchronizer, you need to use the following three methods provided by the synchronizer to access or modify synchronization state:
-
getState()
Gets the current synchronization status
-
setState(int newState)
Set the current synchronization status
-
compareAndSetState(int expect, int update)
CAS is used to set the current state. This method ensures atomicity of the state setting
2. Methods that synchronizer can override
Method names | describe |
---|---|
protected boolean tryAcquire(int arg) | Obtain the synchronization status exclusively. To achieve this method, you need to query the current status and determine whether the synchronization status meets expectations. Then set the CAS synchronization status |
protected boolean tryRelease(int arg) | The synchronization state is exclusively released, and the thread waiting to acquire the synchronization state has the opportunity to acquire the synchronization state |
protected int tryAcquireShared(int arg) | If the value is greater than or equal to 0, the synchronization status is obtained successfully. Otherwise, the synchronization fails |
protected boolean tryReleaseShared(int arg) | Release synchronization state in shared mode |
protected boolean isHeldExclusively() | Whether the current synchronizer is owned by the thread in exclusive mode. This method usually indicates whether the current synchronizer is owned by the thread |
3. Template methods provided by synchronizer
Method names | describe |
---|---|
void acquire(int arg) | If the current thread succeeds in obtaining the synchronization status, the method will return it. Otherwise, it will enter the synchronization queue and call the overridden tryAcquire(int arg) method |
void acquireInterruptibly(int arg) | Same as acquire(int ARG), but in response to an interrupt, the current thread enters the synchronization queue without obtaining the synchronization status. If the current thread is interrupted, the method throws InterruptedException and returns |
boolean tryAcquireNanos(int arg, long nanos) | AcquireInterruptibly (int ARG) adds a timeout limit to the acquireInterruptibly(int ARG) |
void acquireShared(int arg) | The main difference between shared and exclusive acquisition is that multiple threads can obtain the synchronization state at the same time |
void acquireSharedInterruptibly(int arg) | As with acquireShared(int arg), this method responds to an interrupt |
boolean tryAcquireSharedNanos(int arg, long nanos) | On the basis of acquireSharedInterruptibly increased timeouts |
boolean release(int arg) | Exclusive release synchronization, which wakes up the thread contained by the first node in the synchronization queue |
boolean releaseShared(int arg) | Shared release synchronization state |
Collection<Thread> getQueuedThreads() | Gets the collection of threads waiting on the synchronization queue |
Example 4.
Let’s take a closer look at how synchronizers work with an example of an exclusive lock. As the name implies, an exclusive lock means that only one thread can acquire the lock at a time. Other threads that acquire the lock can only wait in the synchronization queue. Only when the thread that acquired the lock releases the lock, the subsequent thread can acquire the lock
public class Mutex implements Lock {
/** * Custom synchronizer */
private static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean isHeldExclusively(a) {
// Whether it is in the occupied state
return getState() == 1;
}
@Override
public boolean tryAcquire(int acquires) {
// Get the lock when the state is 0
if (compareAndSetState(0.1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int releases) {
// Release the lock and set the status to 0
if (getState() == 0) {
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
/** * returns a Condition, each of which contains a Condition queue */
Condition newCondition(a) {
return newConditionObject(); }}private final Sync sync = new Sync();
@Override
public void lock(a) {
sync.acquire(1);
}
@Override
public void lockInterruptibly(a) throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock(a) {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock(a) {
sync.release(1);
}
@Override
public Condition newCondition(a) {
returnsync.newCondition(); }}Copy the code
Mutex defines a static inner class that inherits the synchronizer and implements the exclusive acquisition and release of synchronization state. Users using Mutex do not directly interact with the internal synchronizer implementation, but instead call the methods provided by Mutex, significantly lowering the threshold for implementing a reliable custom component
Implementation of queue synchronizer
1. Synchronize queues
The synchronizer relies on the internal bidirectional synchronization queue to complete the synchronization state management. When the current thread fails to obtain the synchronization state, the synchronizer constructs the current thread and its waiting state into a node, adds it to the synchronization queue, and blocks the current thread at the same time. When the synchronization state is released, the thread in the first node is woken up to try to get the synchronization state again
A node is the basis of a synchronization queue. A synchronizer has a head node and a tail node. A thread that fails to obtain synchronization status becomes a node and joins the tail of the queue
The basic structure of a synchronization queue is as follows:
The synchronizer adds the node to the synchronization queue as shown in the figure below:
The first node is the node that has successfully obtained the synchronization status. When releasing the synchronization status, the thread of the first node will wake up the successor node, and the successor node will set itself as the first node when obtaining the synchronization status successfully. The process is as follows:
The first node is set by the thread that has successfully obtained the synchronization status. Since only one thread can successfully obtain the synchronization status, CAS is not required to ensure the setting of the first node, just set the first node as the successor of the original first node and disconnect the next reference of the original first node
2. Obtain and release the exclusive synchronization status
The synchronization state can be obtained by calling the acquire(int ARg) method of the synchronizer, which is not sensitive to interrupt. If a thread fails to obtain the synchronization state, it will enter the synchronization queue, and the subsequent interrupt operation will not remove the thread from the synchronization queue
The exclusive synchronous state acquisition process, namely acquire(int ARg) method call process, is shown in the figure below:
If the current thread fails to obtain the synchronization state, a Node (node. EXCLUSIVE, where only one thread can successfully obtain the synchronization state at a time) is created and added to the end of the queue. There are many nodes in a queue, and only nodes whose precursor is the head node can attempt to obtain synchronization status for two reasons:
- The head node is the node that has successfully obtained the synchronization state. After the thread of the head node releases the synchronization state, it will wake up its successor nodes. After waking up, the thread of the successor node needs to check whether its precursor node is the head node
- Maintain the FIFO principle for synchronization queues
Therefore, if a precursor node of a thread in the queue that is not a head node exits the queue or is interrupted and returns from the wait state, it then checks to see if its precursor is a head node and tries to get the synchronization state if it is
Once the current thread has acquired the synchronization state and executed the logic, it needs to release the synchronization state so that subsequent nodes can continue to acquire the synchronization state. The synchronization state can be released by calling the synchronizer’s release(int arg) method, which, when executed, wakes up subsequent node threads of the head node
3. Obtain and release the shared synchronization status
The main difference between shared and exclusive acquisition is whether more than one thread can simultaneously acquire the synchronization state. Take file reading and writing as an example. If a program is reading a file, all write operations on the file are blocked at that moment, but the read operations can be performed simultaneously. Write operations require exclusive access to resources, while read operations can be shared access. The two different access modes can access files or resources at the same time, as shown in the following figure:
The acquireShared(int arg) method of the synchronizer can be used to obtain the synchronization state in a shared manner. The core logic of the code is similar to acquire(), and it is also used to determine whether the precursor of the current node is the head node. If so, it tries to obtain the synchronization state. After the head node releases the synchronization state, it also wakes up subsequent nodes in the wait state
The key is how to get multiple threads to access the synchronous state, because following the procedure described above, there is little difference from exclusive mode. The only difference in implementation between the exclusive and shared mode is that each time the head node releases the synchronization state, the exclusive mode only sets the subsequent nodes as the head node, while the shared mode also has a propagation process.
As with exclusive, shared fetching requires releasing synchronous state, which can be released by calling releaseShared(int arg) and waking up subsequent waiting nodes
4. Obtain the synchronization status by exclusive timeout
The synchronization state can be timeout obtained by calling the synchronizer’s doAcquireNanos(int arg, Long nanosTimeout) method, that is, obtaining the synchronization state within a specified period of time
Before I introduce this method, I’ll describe the synchronization state retrieval process in response to interrupts. After Java5, the synchronizer provides the acquireInterruptibly(int arg) method, which returns immediately and throws InterruptedException while waiting to obtain the synchronization status if the current thread is interrupted
Timeout obtaining synchronization state can be viewed as an enhanced version of obtaining synchronization state in response to an interrupt. The process of an exclusive timeout is very similar to that of a non-exclusive fetch, with the main difference being the processing logic when the synchronous state is not acquired. DoAcquireNanos (int ARg, Long nanosTimeout) causes the current thread to wait nanosTimeout nanosTimeout nanosTimeout nanosTimeout If the current thread does not acquire synchronization status within nanosTimeout nanoseconds, it will automatically return from the wait logic
Custom synchronization components
Design a synchronization tool that allows access from no more than two threads at a time, and blocks access from more than two threads. Obviously, this is shared access, and the main design idea is as follows:
- Rewrite the tryAcquireShared(int args) and tryReleaseShared(int args) methods
- Define the initial status as 2. When one thread acquires, status decreases by 1, the thread releases, status increases by 1, and when it reaches 0, another thread acquires, and the block occurs
Example code is as follows:
public class TwinsLock implements Lock {
private final Sync sync = new Sync(2);
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
if (count <= 0) {
throw new IllegalArgumentException("count must large than zero");
}
setState(count);
}
@Override
public int tryAcquireShared(int reduceCount) {
while (true) {
int current = getState();
int newCount = current - reduceCount;
if (newCount < 0 || compareAndSetState(current, newCount)) {
returnnewCount; }}}@Override
protected boolean tryReleaseShared(int reduceCount) {
while (true) {
int current = getState();
int newCount = current + reduceCount;
if (compareAndSetState(current, newCount)) {
return true; }}}Condition newCondition(a) {
return newConditionObject(); }}@Override
public void lock(a) {
sync.acquireShared(1);
}
@Override
public void lockInterruptibly(a) throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock(a) {
return sync.tryAcquireShared(1) > 0;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
}
@Override
public void unlock(a) {
sync.releaseShared(1);
}
@Override
public Condition newCondition(a) {
returnsync.newCondition(); }}Copy the code
Write another test to verify that TwinsLock works as expected
public class TwinsLockTest {
public static void main(String[] args) {
final Lock lock = new TwinsLock();
class Worker extends Thread {
@Override
public void run(a) {
while (true) {
lock.lock();
try {
SleepUtils.second(1);
System.out.println(Thread.currentThread().getName());
SleepUtils.second(1);
} finally{ lock.unlock(); }}}}for (int i = 0; i < 10; i++) {
Worker worker = new Worker();
worker.setDaemon(true);
worker.start();
}
for (int i = 0; i < 10; i++) {
SleepUtils.second(1); System.out.println(); }}}Copy the code
Run the test case and find that the thread names are printed in pairs, meaning that only two threads can acquire the lock at any one time