In the last blog, simply said the basic concept of AQS, the core source code analysis, but there is a part of the content did not involve, is AQS support for conditional variables, this blog will focus on the introduction of this aspect of the content.
Condition variables,
The basic application
We first simulate a consumer/producer model to see the basic application of conditional variables:
- When data is available, producers stop producing data and inform consumers to consume it;
- When no data is available, consumers stop consuming data and inform producers to produce data;
public class CommonResource {
private boolean isHaveData = false;
Lock lock = new ReentrantLock();
Condition producer_con = lock.newCondition();
Condition consumer_con = lock.newCondition();
public void product() {
lock.lock();
try {
while (isHaveData) {
try {
System.out.println("There's still data, waiting for consumption data.");
producer_con.await();
} catch (InterruptedException e) {
}
}
System.out.println("The producer has produced the data.");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
isHaveData = true;
consumer_con.signal();
} finally {
lock.unlock();
}
}
public void consume() {
lock.lock();
try {
while(! isHaveData) { try { System.out.println("No more data. Wait for producer consumption data.");
consumer_con.await();
} catch (InterruptedException e) {
}
}
System.out.println("Consumer Consumption data");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
isHaveData = false; producer_con.signal(); } finally { lock.unlock(); }}}Copy the code
public class Main {
public static void main(String[] args) {
CommonResource resource = new CommonResource();
new Thread(() -> {
while (true) {
resource.product();
}
}).start();
new Thread(() -> {
while (true) { resource.consume(); } }).start(); }}Copy the code
Running results:
“Wait /nofity” (object) works with synchronized, and “await/signal” (condition variable) works with AQS. Of course, it also depends on whether the lock implemented with AQS supports conditional variables. Synchronized can only work with one shared variable, whereas locks implemented by AQS support multiple condition variables.
Let’s try analyzing the code above:
We first create two condition variables, one to block/wake up the consumer thread and one to block/wake up the producer thread.
The producer first obtains the exclusive lock and determines whether there is data:
- If there is data, it blocks the current thread by calling the await method of the condition variable producer_con, and wakes it up when the consumer thread calls the signal method of the condition variable producer_con again.
- If there is no data, the data is produced and the signal method of the condition variable Consumer_con is called to wake up the consumer thread that was blocked because of calling the await method of consumer_con.
Finally release the lock.
The consumer, first obtaining the exclusive lock, determines whether there is data:
- If there is no data, the call to the await method of the condition variable Consumer_con blocks the current thread, and the producer thread wakes up when it calls the condition variable’s signal method again.
- If there is data, the data is consumed and the signal method of the condition variable producer_con is called, waking up the producer thread that was blocked because of calling producer_con’s await method.
Finally release the lock.
Here’s a special note:
- Release locks, which should be placed in finally in case an exception occurs and the lock is not released.
To deepen our understanding of conditional variables, let’s look at another example where two threads alternately print odd and even numbers:
public class Test {
private int num = 0;
private Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void add() {
while(num<100) {
try {
lock.lock();
System.out.println(Thread.currentThread().getName() + ":"+ num++); condition.signal(); condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); }}}}Copy the code
public class Main {
public static void main(String[] args) {
Test test=new Test(); new Thread(() -> { test.add(); }).start(); new Thread(() -> { test.add(); }).start(); }}Copy the code
Running results:
Through online most of the cases are in two alternate print thread method, at the same time open two condition variables, one of the conditions of variable responsible for blocking/wake to print an odd number of threads, a variable is responsible for blocking/wake print even number of threads, but people feel no need, two threads share a thread method, can also share a condition variables. Do not know everybody see officer what think?
The source code parsing
When we click on Lock. newCondition, we see that it has several implementation classes:
public Condition newCondition() {
return sync.newCondition();
}
Copy the code
Further down:
final ConditionObject newCondition() {
return new ConditionObject();
}
Copy the code
As you can see, when we call lock newnewCondition, eventually the new out a ConditionObject object, and ConditionObject class is AbstractQueuedSynchronizer inner classes, Let’s start with ConditionObject’s UML diagram:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while(! isOnSyncQueue(node)) { LockSupport.park(this);if((interruptMode = checkInterruptWhileWaiting(node)) ! = 0)break;
}
if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! = null) // clean upif cancelled
unlinkCancelledWaiters();
if(interruptMode ! = 0) reportInterruptAfterWait(interruptMode); }Copy the code
Here, we can make clear three questions:
- Where and how is the complete conditional queue kept?
- The await method, how does it release the lock?
- How does the await method block a thread?
The first question can be answered in the addConditionWaiter method:
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if(t ! = null && t.waitStatus ! = Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION);if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
Copy the code
The first step is to check whether the tail node in the conditional queue is canceled. If it is, run the unlinkCancelledWaiters method. We definitely haven’t been cancelled here, in fact, if we call the await method for the first time, lastWaiter is empty, so we definitely won’t enter the first if. Then, create a new Node, the Node type is introduced, a blog post on a large number of AbstractQueuedSynchronizer inner classes, namely has built a Node Node, which saved the current thread and the type of Node, this Node is the type of CONDITION, If t==null, the newly created Node is the first Node, so give firstWaiter. Otherwise, the nextWaiter of the last Node is set to the new Node, creating a one-way linked list. Where is the nextWaiter? That is, it is a field of the Node class:
With the first question solved, let’s look at the second question, the answer to the second question is in the second method of await:
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if(failed) node.waitStatus = Node.CANCELLED; }}Copy the code
I’m going to call getState, what’s the state, if you remember, for ReentrantLock, state is the number of reentrantlocks, and then I’m going to call Release, passing in state. So no matter how many times I reenter, I’m going to release the lock all at once.
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if(h ! = null && h.waitStatus ! = 0) unparkSuccessor(h);return true;
}
return false;
}
Copy the code
You can see that releasing the lock still calls the tryRelease method, which is exactly what needs to be overridden.
So when we’re done calling the first two methods, we do a judgment isOnSyncQueue, and generally we go into this if, park thread, waiting to wake up, and that solves the third problem.
Let’s look at the signal method again. Again, we need to solve several problems:
- The AQS conditional queue and the blocking queue are not the same queue, so the await thread will never enter the blocking queue?
- How does the signal method wake up a thread?
public final void signal() {
if(! isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter;if(first ! = null)doSignal(first);
}
Copy the code
The key is the transferForSignal method in doSignal:
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if(! compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if(ws > 0 || ! compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread);return true;
Copy the code
In this method, we call the ENQ method, put the conditional queue thread into the blocking queue, and then call the unpark method to wake up the thread.
That’s the end of this blog.
After the next two blogs, I believe you must have a relatively simple understanding of AQS. If you are smart, you can see that there are a lot of things in these two blogs that are not fully explained, even a little ambiguous, just “scratching the surface”, so it also fits my title: Difficult to understand AQS. Indeed, AQS are no simpler to explore than thread pools. See, I’m making excuses for myself again. I hope after future precipitation, I can rewrite these two blogs, and then change the title of “Understand AQS thoroughly”, heh heh.