Hashtag: “We are all little frogs” public account article
Design Java uncles for our convenient custom synchronization tools, gives us a big bombshell AbstractQueuedSynchronizer class, this is an abstract class, the following we will be referred to as “AQS, translated into Chinese is abstract queue synchronizer. This is useful because it encapsulates all the low-level synchronization details, and we programmers who want to customize our own synchronization tools can simply subclass this class and override some of the methods it provides. The explicit ReentrantLock we used earlier was implemented with the help of AQS. Now let’s take a look at how this class is implemented and how it can be used to customize the synchronization tool.
sync
In the AQS, a field called state is maintained, which is modified by volatile and is known as the synchronization state:
private volatile int state;
Copy the code
It also provides several methods to access this field:
The method name | describe |
---|---|
protected final int getState() |
To obtainstate The value of the |
protected final void setState(int newState) |
Set up thestate The value of the |
protected final boolean compareAndSetState(int expect, int update) |
useCAS Way to updatestate The value of the |
You can see that these methods are final, indicating that they cannot be overridden in subclasses. In addition, they are protected, which means that you can only use these methods in subclasses.
In some thread coordination scenarios, when one thread performs certain operations, no other thread can perform the operation, such as the operation when holding the lock, only one thread can hold the lock at the same time, we call this situation as the exclusive mode; In other thread coordination scenarios, it is possible to allow multiple threads to do something at the same time, which we call shared mode.
We can modify itstate
Represented by the fieldsync
To achieve multithreadingExclusive mode
orSharing model
.
For example, in the exclusive mode, we can set the initial value of state to 0. Every time a thread wants to perform an exclusive operation, it needs to determine whether the value of state is 0. If it is not 0, it means that another thread has entered the operation, and the thread needs to block and wait. If it is 0, set state to 1 and enter the operation yourself. This process of deciding before setting can be guaranteed atomicity by CAS operation, which we call trying to get synchronization state. If a thread for sync successful, then in another thread attempts to acquire the synchronization state to find the state value is 1 has been blocked waiting, until the acquiring the synchronization state finished thread execution need to be synchronized to the success of the operation of the release after synchronization state, also is the value of the state is set to 0, and inform the subsequent waiting thread.
The same is true in shared mode. For example, if we allow 10 threads to perform an operation at the same time, more threads will block and wait. So we can set the initial value of state to 10. The meaning of a thread trying to obtain synchronization state is to determine whether the value of state is greater than 0. If the value is not greater than 0, it means that there are 10 threads performing this operation at the same time, and this thread needs to block and wait. If the value of state is greater than 0, the operation can be started by reducing the value of state by 1. Each time a thread completes the operation, it needs to release the synchronization state, which is to increase the value of state by 1 and notify the following waiting threads.
So for our custom synchronization tool, we need to customize the way to get and release the synchronization state, and several methods in AQS are used to do this:
The method name | describe |
---|---|
protected boolean tryAcquire(int arg) |
Exclusive get synchronization state, return true on success, false otherwise |
protected boolean tryRelease(int arg) |
Exclusive release synchronization, return true on success, false otherwise |
protected int tryAcquireShared(int arg) |
Shared get synchronization status, return true on success, false otherwise |
protected boolean tryReleaseShared(int arg) |
Shared release synchronization, returns true on success, false otherwise |
protected boolean isHeldExclusively() |
In exclusive mode, returns true if the current thread has acquired synchronization status; Otherwise return false |
We say that AQS is an abstract class. Let’s take tryAcquire as an example to see how it is implemented in AQS:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
Copy the code
Oh 😯 my god, it’s not scientific to just throw an exception. Yes, this method is not implemented in AQS, different synchronization tools for specific concurrent scenarios, so how to obtain the synchronization state and how to release the synchronization state needs to be implemented in our custom AQS subclass, if our custom synchronization tool needs to work in exclusive mode, Then we override tryAcquire, tryRelease, and isHeldExclusively, or tryAcquireShared and tryReleaseShared if we are working in shared mode. For example, in exclusive mode we can define a subclass of AQS like this:
public class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
return compareAndSetState(0.1);
}
@Override
protected boolean tryRelease(int arg) {
setState(0);
return true;
}
@Override
protected boolean isHeldExclusively(a) {
return getState() == 1; }}Copy the code
TryAcquire means to attempt to acquire synchronization state, we define a very simple method here, which is to use CAS to set state to 1, return true on success, false on failure, tryRelease means to attempt to release synchronization state, Again, a very simple release algorithm is used, setting state to 0. IsHeldExclusively indicates whether any thread has obtained the synchronization status. If you have more complex scenarios, you can override these methods with more complex get and release algorithms.
From the above, we just learned what synchronous state is and how to customize the various methods of obtaining and releasing synchronous state in exclusive and shared mode by inheriting AQS, but you’ll be surprised to find that this still doesn’t work. The desired effect is that a thread will immediately return true if it succeeds in acquiring the synchronization state, continue to perform any operations that need to be synchronized, and release the synchronization state when the operation is complete. If it fails to acquire the synchronization state, it will immediately return false and enter the blocked wait state. Don’t walk away. It’s better next time.
Synchronous queue
AQS also maintains a so-called synchronous queue. The node class of this queue is defined as a static inner class with the following main fields:
static final class Node {
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
}
Copy the code
AQS defines a head node reference and a tail node reference:
private transient volatile Node head;
private transient volatile Node tail;
Copy the code
These two nodes control the queue, which means you can perform insert and remove operations on the queue. You can see that the Node class has a field of type Thread, indicating that each Node represents a Thread. We expect effect is when a thread after failed to get the synchronization status, put the thread block and packaged into the Node Node is inserted into the synchronous queue, when acquiring the synchronization of successful release thread synchronization state, at the same time inform next did not get to the synchronous state of the queue of nodes, let the Node thread again to obtain sync.
The meaning of the other fields of this node class will be discussed in more detail later, but we will first look at the exclusive and shared modes in which nodes are added to and removed from the synchronous queue, as well as the implementation details of thread blocking and recovery.
Exclusive synchronization state acquisition and release
In exclusive mode, only one thread can obtain the synchronization state at a time. Other threads that obtain the synchronization state at the same time are wrapped as a Node Node and placed in the synchronization queue, until the thread that has obtained the synchronization state releases the synchronization state. The initial synchronization queue is an empty queue with no nodes in it and looks like this:
Let’s take a closer look at how threads that fail to get synchronization status are wrapped as Node nodes and inserted into the queue while blocking the wait.
As mentioned earlier, the methods for obtaining and releasing synchronous states are customized. In exclusive mode, we need to subclass AQS and override the following methods:
protected boolean tryAcquire(int arg)
protected boolean tryRelease(int arg)
protected boolean isHeldExclusively(a)
Copy the code
Once these methods are defined, who calls them? AQS defines a number of methods that call them, all of which are modified by public final:
The method name | describe |
---|---|
void acquire(int arg) |
Retrieves the synchronization state exclusively, returns it on success, and wraps the current thread on failureNode The node is inserted into the synchronization queue. |
void acquireInterruptibly(int arg) |
The same meaning as the previous method, except that a thread in the execution of this method is interrupted by another thread, then thrownInterruptedException The exception. |
boolean tryAcquireNanos(int arg, long nanos) |
On the basis of the previous method added a timeout limit, if the synchronization status is not obtained within a given time, the returnfalse Otherwise returntrue . |
boolean release(int arg) |
Exclusive release synchronization state. |
Acquire method = acquire method = acquire method = acquire method
public final void acquire(int arg) {
if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code
The code shows that the Acquire method actually gets the synchronization status through the tryAcquire method, terminating if the tryAcquire method returns true, and continuing if false. The tryAcquire method is our self-defined way to get the synchronization state. If thread T1 calls tryAcquire () and the tryAcquire method fails, the addWaiter method will be used first. Let’s look at the addWaiter method:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); // Construct a new node
Node pred = tail;
if(pred ! =null) { // The tail node is not empty and is inserted to the end of the queue
node.prev = pred;
if (compareAndSetTail(pred, node)) { // Update tail and insert the new node to the end of the list
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { //tail initializes the queue if the tail node is empty
if (compareAndSetHead(new Node())) // Set the head node
tail = head;
} else { //tail node is not empty, and the node is actually inserted
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
returnt; }}}}Copy the code
As you can see, this addWaiter method is the method to insert a node into the queue. First, we construct a Node. Let’s say this Node is Node 1, and its thread field is the current thread T2. The newly created Node looks like this:
And then we’ll look at the insertion process. If the tail node is not empty, insert the new node directly behind the queue. If the tail node is empty, call enq to initialize the head and tail nodes before inserting the new node behind the queue. These lines of enQ method initialization code need special attention:
if (t == null) { //tail initializes the queue if the tail node is empty
if (compareAndSetHead(new Node())) // Set the head node
tail = head;
} else {
// The process of actually inserting the node
}
Copy the code
When the queue is empty, insert the head and tail references to the same Node. This Node is simply a new Node(). We will call this Node Node 0. So after the first node insertion, the queue actually looks like this:
Node 1 is the node we actually inserted, representing the thread that failed to get the synchronization state. Node 0 is created during initialization, but we’ll see what it does later.
The addWaiter method will return the newly inserted node, node 1, and acquire will then call the acquireQueued method:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); // Get the previous node
if(p == head && tryAcquire(arg)) {setHead(node); p.next =null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true; }}finally {
if(failed) cancelAcquire(node); }}Copy the code
As you can see, if the newly inserted node before a node is the first, will call again tryAcquire attempts to acquire synchronization state, the main is afraid of acquiring the synchronization state thread soon to release the sync, so before the current thread blocks can try hold fluky psychology the success to the synchronous state, If you can get it by chance, call the setHead method to replace the head node with itself:
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
Copy the code
If the thread field of the Node is set to null, the Node becomes Node 0.
If the current Node Node is not a head Node or already get to sync thread does not release the sync, that is the darling of the execution down shouldParkAfterFailedAcquire method:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; // The state of the previous node
if (ws == Node.SIGNAL) //Node.SIGNAL is -1
return true;
if (ws > 0) { // The current thread has been cancelled to remove all nodes in the cancelled state
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else { // Set the state of the previous node to -1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
Copy the code
This method is the various operations on waitStatus in the Node Node. If the previous Node’s waitStatus is node. SIGNAL (-1), it means that the current Node can be blocked. If the previous Node’s waitStatus is greater than 0, it means that the thread that this Node represents has been canceled. Remove all nodes whose waitStatus is greater than 0. If the previous Node’s waitStatus is neither -1 nor greater than 0, set the previous Node’s waitStatus to Node.signal. We know that the Node class defines some static variables that represent waitStatus, so let’s see what each waitStatus value means:
A static variable | value | describe |
---|---|---|
Node.CANCELLED |
1 |
The thread corresponding to the node has been cancelled (we will talk more about how threads are cancelled later). |
Node.SIGNAL |
- 1 |
Indicates that the thread corresponding to the following node is in the wait state |
Node.CONDITION |
2 - |
Indicates that the node is in a wait queue (more on wait queues later) |
Node.PROPAGATE |
- 3 |
Indicates that the next shared synchronization state acquisition will be propagated unconditionally. |
There is no | 0 |
The initial state |
Now let’s focus on the case where waitStauts is 0 or -1. Currently our current node is node 1, which corresponds to the current thread, and the previous node of the current node is node 0. At the beginning, all the Node Node waitStatus is zero, so the call shouldParkAfterFailedAcquire method for the first time, before a Node of the current Node, Node 0’s waitStatus is set to Node.SIGNAL immediately returns false. This status means that all nodes behind Node 0 are waiting, and the queue now looks like this:
Since acquireQueued method is a cycle, in the second execution to shouldParkAfterFailedAcquire method, because the waitStatus has 0 Node to the Node. The SIGNAL, So shouldParkAfterFailedAcquire method will return true, then continue to execute parkAndCheckInterrupt method:
private final boolean parkAndCheckInterrupt(a) {
LockSupport.park(this);
return Thread.interrupted();
}
Copy the code
LockSupport. Park (this) methods:
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false.0L); // Call the underlying method to block the thread
setBlocker(t, null);
}
Copy the code
UNSAFE. Park (false, 0L);
public native void park(boolean var1, long var2);
Copy the code
This means that the thread is blocked immediately, which is a low-level method so that we programmers don’t have to worry about how the operating system blocks the thread. At this point, in exclusive mode, we have run through the process of how a thread that can’t get the synchronization state is inserted into the synchronization queue and blocked. This process needs to be seen several times, after all, it is more troublesome
If a new thread t2 calls acquire to acquire the synchronization state, it will be wrapped as Node and inserted into the synchronization queue, as shown in the following figure:
Note that the waitStauts of Node 1 has changed to -1. Remember that when waitStauts is -1, Node.SIGNAL means that its next Node is waiting, because both Node 0 and Node 1 have waitStauts of -1. This means that their successors, node 1 and node 2, are in a wait state.
AcquireInterruptibly acquireInterruptibly acquireInterruptibly acquireInterruptibly acquireInterruptibly acquireInterruptibly acquireInterruptibly acquireInterruptibly acquireInterruptibly The acquireInterruptibly method throws InterruptedException and returns if another thread interrupts the thread after one thread has called acquireInterruptibly and blocked because it did not obtain the synchronization status. TryAcquireNanos also supports interrupts, but with a timeout period after which the tryAcquireNanos has not returned, false is returned.
If a thread fails to acquire synchronization status in the various acquire methods, it is wrapped as a Node and placed in the synchronization queue, which can be regarded as an insertion process. If a thread completes an exclusive operation, it needs to release the synchronization state and wake up the thread represented by the first (non-0) node in the synchronization queue, in our case node 1, to continue execution. The release method is used to release the synchronization state:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if(h ! =null&& h.waitStatus ! =0)
unparkSuccessor(h);
return true;
}
return false;
}
Copy the code
You can see that this method uses the tryRelease method we overwrote in the AQS subclass. If it succeeds in releasing the sync state, then proceed. If the head node is not null and the waitStatus of the head is not 0, the unparkprecursor method is executed:
private void unparkSuccessor(Node node) {
int ws = node.waitStatus; // Wait status of the node
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) { // If node is the last node or its successor is cancelled
s = null;
for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0) // Find the node whose waitStatus is negative nearest the head node
s = t;
}
if(s ! =null)
LockSupport.unpark(s.thread); // Wake up the thread corresponding to this node
}
Copy the code
Our current head node, head, points to node 0, and its status is -1, so its waitStatus is first set to 0, and its successor, the thread represented by node 1, is called locksupport.unpark (s.read), Call thread T2 on node 1, set thread t2 on node 1 to null, and set thread T2 on node 1 as the head node.
So now only one T2 thread in the wait queue is blocked. This is the process of releasing synchronized states.
Example of an exclusive synchronization tool
A simple lock can be used to obtain and release the state of an exclusive synchronization. A simple lock can be used to obtain and release the state of an exclusive synchronization.
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class PlainLock {
private static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
return compareAndSetState(0.1);
}
@Override
protected boolean tryRelease(int arg) {
setState(0);
return true;
}
@Override
protected boolean isHeldExclusively(a) {
return getState() == 1; }}private Sync sync = new Sync();
public void lock(a) {
sync.acquire(1);
}
public void unlock(a) {
sync.release(1); }}Copy the code
We define an AQS subclass Sync in PlainLock, overriding some methods to customize how to get and release synchronization state in exclusive mode. Static inner classes are the most common way that AQS subclasses are defined in our custom synchronization tools. PlainLock (); unlock (); PlainLock (); unlock ();
public class Increment {
private int i;
private PlainLock lock = new PlainLock();
public void increase(a) {
lock.lock();
i++;
lock.unlock();
}
public int getI(a) {
return i;
}
public static void test(int threadNum, int loopTimes) {
Increment increment = new Increment();
Thread[] threads = new Thread[threadNum];
for (int i = 0; i < threads.length; i++) {
Thread t = new Thread(new Runnable() {
@Override
public void run(a) {
for (int i = 0; i < loopTimes; i++) { increment.increase(); }}}); threads[i] = t; t.start(); }for (Thread t : threads) { // The main thread waits for all other threads to complete
try {
t.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println(threadNum + "One thread, one loop." + loopTimes + "Secondary result:" + increment.getI());
}
public static void main(String[] args) {
test(20.1);
test(20.10);
test(20.100);
test(20.1000);
test(20.10000);
test(20.100000);
test(20.1000000); }}Copy the code
Execution Result:
20 threads, loop 1 result: 20 20 threads, loop 10 times Result: 200 20 threads, loop 100 times result: 2000 20 threads, loop 1000 times result: 20,000 20 threads, loop 10000 times result: 200,000 20 threads, loop 100000 times result: 2000000 20 threads, loop 1000000 times Result: 20000000Copy the code
It’s clear that this lock, which we only wrote a few lines of code, already works, and that’s the power of AQS. It takes very little writing to build a synchronization tool, and it doesn’t take into account the underlying complexity of synchronization state management, thread queuing, wait and wake up, and so on.
Shared synchronization state acquisition and release
The main difference between shared and exclusive fetching is whether more than one thread can simultaneously acquire the synchronous state. Threads that can’t get the synchronization state also need to be wrapped as nodes and blocked, and can access the synchronization queue using the following methods:
| | void acquireShared (int arg) Shared acquiring the synchronization state, if the failure insert the current thread packaged into the Node Node synchronization queue.. | | | void acquireSharedInterruptibly (int arg) and last means the same way, just a thread in the process of this method by other threads, it throws InterruptedException. | | Boolean tryAcquireSharedNanos (int arg, long nanos) | last method on the basis of timeouts, if in a given period of time did not get to sync, it returns false, otherwise it returns true. | | | Boolean releaseShared (int arg) Shared the release of sync. |
Ha, it looks very similar to the methods in exclusive mode, except that each method has a Shared word added. The acquireShared method is used as an example:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
Copy the code
This method will call the tryAcquireShared method in our custom AQS subclass to get the state, but the return value of tryAcquireShared is an int. If the value is not less than 0, the state is successfully obtained. Do nothing; If the return value is greater than 0, it indicates that the synchronization state failed to be obtained, and the thread is wrapped as a Node and inserted into the synchronization queue. The insertion process is similar to the exclusive mode.
The other two acquire methods are also trivial, except that one is interruptible and the other supports timeout
The method of releasing synchronous state is similar to that of exclusive mode:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
Copy the code
This method calls the tryReleaseShared method in our custom AQS subclass to release the synchronization state and, if successful, remove a blocking node from the synchronization queue. Different from the exclusive mode, it is possible for multiple threads to release the synchronization state at the same time. This means that multiple threads may remove blocking nodes from the synchronization queue at the same time. This problem does not look at the source code, we try to write.
Example of shared synchronization tool
Given that an operation can only be performed by two threads at the same time, and the other threads need to be in the wait state, we can define the lock as follows:
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class DoubleLock {
private static class Sync extends AbstractQueuedSynchronizer {
public Sync(a) {
super(a); setState(2); // Set the synchronization status
}
@Override
protected int tryAcquireShared(int arg) {
while (true) {
int cur = getState();
int next = getState() - arg;
if (compareAndSetState(cur, next)) {
returnnext; }}}@Override
protected boolean tryReleaseShared(int arg) {
while (true) {
int cur = getState();
int next = cur + arg;
if (compareAndSetState(cur, next)) {
return true; }}}}private Sync sync = new Sync();
public void lock(a) {
sync.acquireShared(1);
}
public void unlock(a) {
sync.releaseShared(1); }}Copy the code
The initial value of state is 2. Every time a thread calls tryAcquireShared to get the synchronization state, the value of state is reduced by 1. When the value of state is 0, other threads cannot get the synchronization state and are wrapped as Node nodes to wait in the synchronization queue.
AQS
Other important methods for synchronous queues in
In addition to a series of acquire and release methods, AQS provides a number of methods that directly access the queue, all of which are public final qualifiers:
The method name | describe |
---|---|
boolean hasQueuedThreads() |
Whether there are threads waiting to obtain synchronization status. |
boolean hasContended() |
Whether a thread has blocked because it could not get the synchronization state |
Thread getFirstQueuedThread() |
Returns the first (longest waiting) thread in the queue, or NULL if no threads are currently enqueued |
boolean isQueued(Thread thread) |
Returns true if the given thread is currently queued for synchronization. |
int getQueueLength() |
Returns an estimate of the number of threads waiting to obtain the synchronization state, because the actual thread set in a multithreaded environment can change significantly when constructing this result |
Collection<Thread> getQueuedThreads() |
Returns a collection containing threads that may be waiting to be fetched, because the actual thread collection in a multithreaded environment can change significantly when the result is constructed |
They can be used in our custom synchronization tool if necessary.
digression
Writing articles is tiring, and sometimes you feel that the reading is smooth, but it is actually the result of countless revisions behind. If you think it is good, please help to forward it, thanks a million ~ here is my public account “we are all small frog”, there are more technical dry goods, occasionally pull a calf, welcome to pay attention to:
Small volumes
In addition, the author also wrote a short book on MySQL: How MySQL works: A link to understand MySQL from the root. The content of the volume is mainly from the perspective of small white, using popular language to explain some core concepts about MySQL advanced, such as record, index, page, table space, query optimization, transaction and lock, a total of about 300,000 or 400,000 words, with hundreds of original illustrations. MySQL > MySQL > MySQL > MySQL > MySQL > MySQL > MySQL > MySQL > MySQL