Conditional queues based on COdition-based analysis AQS
preface
In the last article we looked at synchronous queues in AQS, now let’s look at conditional queues.
The most common locking methods in Java are Synchorinzed and Reentrantlock. We all say that Reentrantlock is more flexible than Synchorinzed. In fact, it is flexible in the use of conditional queues in Reentrantlock.
Condition interfaces
It is an interface introduced in java1.5, mainly to replace the wait and notify methods of the object class, and solve the communication problem between threads in a more flexible way:
Public interface Condition {// Causes the current thread to wait until it is notified (signal) void await() throws InterruptedException; Void awaitUninterruptibly(); void awaitUninterruptibly(); // Call this method and the current thread enters the wait state until it is awakened or interrupted or timed out. Long awaitNanos(Long nanosTimeout) throws InterruptedException; // Same as awaitNanos, but can specify the time unit Boolean await(long time, TimeUnit unit) throws InterruptedException; The current thread enters the wait state until it is awakened, interrupted, or reaches a certain deadline. If it is awakened before the specified time, returnstrue, return in other casesfalseboolean awaitUntil(Date deadline) throws InterruptedException; Void signal(); // Call up a thread waiting on Condition that must acquire the lock associated with Condition before it returns from the wait method. Void signalAll() = notifyAll(); // Void signalAll() = notifyAll(); }Copy the code
The most important thing is to await the thread and wake it up with the signal method. Next, we analyze with practical examples.
What problems can Condition solve
Imagine a producer-consumer scenario:
1. The producer has two threads to produce the roast chicken; The consumer has two threads to consume the roast chicken
2. Four threads execute together, but only one producer thread can generate the chicken and one consumer thread can consume the chicken.
3. Only when roast chicken is produced, can the consumption thread be notified to consume, otherwise it can only wait;
4. Only after the roast chicken is consumed can the producer thread be notified to produce, otherwise it can only wait
As a result, we use ReentrantLock to control concurrency and use it to generate two sets of Condition objects, productCondition and consumeCondition: the former controls the producer thread and the latter controls the consumer thread. When isHaveChicken is true, the production thread must enter the waiting state and wake up the consumption thread for consumption. When the consumption thread finishes, the flag is set to false, indicating that the consumption of the roast chicken is complete and enters the waiting state. At the same time, the production thread is woken up to produce the roast chicken…
package com.springsingleton.demo.Chicken;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class ChikenStore {
ReentrantLock reentrantLock = new ReentrantLock();
Condition productCondition = reentrantLock.newCondition();
Condition consumeCondition = reentrantLock.newCondition();
private int count = 0;
private volatile boolean isHaveChicken = false; // Produce public voidProductChicken() {
reentrantLock.lock();
while (isHaveChicken) {
try {
System.out.println("We have roast chicken." + Thread.currentThread().getName() + "No more production.");
productCondition.await();
} catch (Exception e) {
System.out.println("error" + e.getMessage());
}
}
count++;
System.out.println(Thread.currentThread().getName() + "Produces the number one." + count + "A roast chicken. Start selling it.");
isHaveChicken = true;
consumeCondition.signal();
reentrantLock.unlock();
}
public void SellChicken() {
reentrantLock.lock();
while(! isHaveChicken) { try { System.out.println("There's no more roast chicken." + Thread.currentThread().getName() + "Not for sale.");
consumeCondition.await();
} catch (Exception e) {
System.out.println("error" + e.getMessage());
}
}
count--;
isHaveChicken = false;
System.out.println(Thread.currentThread().getName() + "Sold the no." + count + 1 + "A roast chicken. Let's get started.");
productCondition.signal();
reentrantLock.unlock();
}
public static void main(String[] args) {
ChikenStore chikenStore = new ChikenStore();
new Thread(() -> {
Thread.currentThread().setName("Producer 1");
while (true) {
chikenStore.ProductChicken();
}
}).start();
new Thread(() -> {
Thread.currentThread().setName("Producer 2");
for(; ;) { chikenStore.ProductChicken(); } }).start(); new Thread(() -> { Thread.currentThread().setName(Consumer 1);
while (true) {
chikenStore.SellChicken();
}
}).start();
new Thread(() -> {
Thread.currentThread().setName(Consumer 2);
while (true) { chikenStore.SellChicken(); } }).start(); }}Copy the code
Output:
So producer 1 has his first rotisserie chicken, so he's going to start selling his first rotisserie chicken, so producer 1 is going to stop producing his first rotisserie chicken and consumer 1 is going to start producing his first rotisserie chicken, Get started selling Have chicken producers don't produce 1 consumer sold 1 01 a roast chicken, hurriedly start production 1 don't sell consumers have no chicken No chicken consumer 2 don't sell the producer has produced the first 2 chicken, hurriedly start to sell Have chicken producers consumers 1 2 does not produce sold 01 a roast chicken, So we're going to start producing and we're going to run out of roast chicken and consumer number one is going to stop selling and producer number one is going to stop producing and we're going to start selling our first roast chicken and consumer number two is going to start producing and we're going to stop selling our first roast chickenCopy the code
In Synchorinzed:
package com.springsingleton.demo.Chicken;
public class ChickenStoreSync {
private int count = 0;
private volatile boolean isHaveChicken = false;
public synchronized void ProductChicken() {
while (isHaveChicken) {
try {
System.out.println("We have roast chicken." + Thread.currentThread().getName() + "No more production.");
this.wait();
} catch (Exception e) {
System.out.println("error" + e.getMessage());
}
}
count++;
System.out.println(Thread.currentThread().getName() + "Produces the number one." + count + "A roast chicken. Start selling it.");
isHaveChicken = true;
notifyAll();
}
public synchronized void SellChicken() {
while(! isHaveChicken) { try { System.out.println("There's no more roast chicken." + Thread.currentThread().getName() + "Not for sale.");
this.wait();
} catch (Exception e) {
System.out.println("error" + e.getMessage());
}
}
count--;
isHaveChicken = false;
System.out.println(Thread.currentThread().getName() + "Sold the no." + count + 1 + "A roast chicken. Let's get started.");
notifyAll();
}
public static void main(String[] args) {
ChickenStoreSync chikenStore = new ChickenStoreSync();
new Thread(() -> {
Thread.currentThread().setName("Producer 1");
while (true) {
chikenStore.ProductChicken();
}
}).start();
new Thread(() -> {
Thread.currentThread().setName("Producer 2");
for(; ;) { chikenStore.ProductChicken(); } }).start(); new Thread(() -> { Thread.currentThread().setName(Consumer 1);
while (true) {
chikenStore.SellChicken();
}
}).start();
new Thread(() -> {
Thread.currentThread().setName(Consumer 2);
while (true) { chikenStore.SellChicken(); } }).start(); }}Copy the code
In the code above, when notify() or notifyAll() is called, synchronized waits for both producer and consumer threads, so we cannot guarantee whether it is the consumer thread or producer thread that is awakened. Codition can avoid this situation.
The realization principle of Condition in AQS
Condition is the internal class ConditionObject of AQS. We have analyzed that there are two kinds of queues in AQS, one is synchronous queue and the other is conditional queue, and conditional queue is realized based on Condition. Note that the lock must be acquired before the Condition is used (because the Condition is usually constructed from lock and depends on lock), and there is also a Node Node on the Condition queue whose waitStatus is Condition. ConditionObject has two nodes: firstWaiter and lastWaiter. FirstWaiter represents the first node in the queue, and lastWaiter represents the last node in the queue
public class ConditionObject implements Condition, java.io.Serializable {
//等待队列第一个等待结点
private transient Node firstWaiter;
//等待队列最后一个等待结点
private transient Node lastWaiter;
//省略.......
}
Copy the code
Each Condition corresponds to a Condition queue; Multiple Condition objects can be created on a lock, so there are multiple Condition queues. The Condition queue is also a FIFO queue, in which each node contains a reference to a thread that is waiting on the Condition object.
When a thread calls an await() method, the thread releases the lock and builds a Node that encapsulates the information about the current thread and joins the conditional queue to wait until it is waked, interrupted, or timed out. The wait queue model in Condition is as follows
As shown in the figure, the data structure of Node Node is unidirectional compared with that of Node with synchronous queue, and the variable used in Condtion is nextWaiter instead of next, as we have explained in the previous analysis of the data structure of Node Node. “FirstWaiter” refers to the head of the conditional queue, while “lastWaiter” refers to the tail of the conditional queue. There are only two states for conditional queue nodes: CANCELLED and CONDITION, the former indicating that the thread has ended and needs to be removed from the queue, and the latter indicating that the conditional node is waiting to be woken up.
Each Coedition object has one conditional queue, that is, there can only be one synchronous queue in the AQS, but there can be multiple conditional queues (the previous example of barbecued chicken had two queues of new conditions). At the code level, let’s look at how a thread called await() method (other await() implementations work similarly) joins and wakes up from the wait queue.
Public final void await() throws InterruptedException {// Checks whether the thread is interruptedif(Thread.interrupted()) throw new InterruptedException(); // add a new Node to the queue and return Node = addConditionWaiter(); Int savedState = fullyRelease(node); int interruptMode = 0; // Check whether the node is in SyncQueue, that is, whether it is woken upwhile(! IsOnSyncQueue (node)) {// Suspend thread locksupport.park (this); // Determine if an interrupt wakes up, if it exits the loop.if((interruptMode = checkInterruptWhileWaiting(node)) ! = 0)break; } // After waking up, spin to acquire the lock and determine whether the thread is interruptedif(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT; // clean upif cancelled
if(node.nextWaiter ! UnlinkCancelledWaiters () = null) // Clear the waiters from a non-condition node;if(interruptMode ! = 0) reportInterruptAfterWait(interruptMode); }Copy the code
Look again at the addConditionWaiter method to add to the queue:
private Node addConditionWaiter() { Node t = lastWaiter; // Determine whether it is an end node and remove itif(t ! = null && t.waitStatus ! = Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; CONDITION Node Node = new Node(thread.currentThread (), node.condition); // Join the queueif (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
Copy the code
The await() method does three things:
Call addConditionWaiter() to wrap the current thread as a node and join the queue.
The second is to call fullyRelease(node) to release the synchronization state and wake up the threads of the successor node.
IsOnSyncQueue (node) is called to determine whether the node is in the synchronization queue. This is a while loop. If the node is not in the synchronization queue, the thread is suspended directly. Remember that if a thread is called acquireQueued(Node, savedState) after being awakened to perform a spin operation to acquire the lock, the current thread node is moved from the wait queue to the synchronous queue and begins to try to acquire the lock.
Then look at Singnal
public final void signal() {// Determine if an exclusive lock is held, if not an exception is thrownif(! isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; // Wake up the thread at the first node of the queueif(first ! = null)doSignal(first);
}
Copy the code
Here the signal() method does two things:
One is to determine whether the current thread has an exclusive lock, no exception thrown.
Second, wake up the first node of the wait queue, i.e. execute doSignal(first).
private void doSignal(Node first) {
do{// Remove the first node in the conditional wait queue, // If the successor node is null, then there are no other nodes, so set the tail node to NULLif( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; // If the notified node is not in the synchronization queue and there are not empty nodes in the conditional wait queue, the notification loop continues to inform subsequent nodes}while(! transferForSignal(first) && (first = firstWaiter) ! = null); } //transferForSignal method Final Boolean transferForSignal(Node Node) {// Attempt to set the wake up NodewaitStatus is 0, that is, initialized // If compareAndSetWaitStatus returnsfalse, indicating the current node nodewaitStatus is no longer in //CONDITION, so it must be an end statefalse
if(! compareAndSetWaitStatus(node, Node.CONDITION, 0)){return false; P Node p = enq(Node); int ws = p.waitStatus; // Determine whether the precursor Node is CANCELLED=1 or wake up the thread represented by the notified Node when // the precursor Node fails to be set to Node.SIGNALif(ws > 0 || ! CompareAndSetWaitStatus (p, ws, node.signal)){// Wake up the Node thread locksupport.unpark (node.thread);return true; }}Copy the code
The doSignal(first) method does two things:
One is to remove the awakened node from the conditional queue, and then re-maintain the firstWaiter and lastWaiter references to the conditional queue.
Second, the node removed from the conditional queue is added to the synchronous queue (done in the transferForSignal() method). If the node fails to enter the synchronous queue and there are not empty nodes in the conditional queue, the loop continues to wake up subsequent threads of other nodes.
Conclusion:
If so, it wakes up the thread on the first node of the Condition queue, removes the node from the Condition queue, and moves it to the synchronous queue. If joining the synchronous queue fails (in which case the thread will only be canceled), the Condition () is called. Unpark () is used to wake up the thread represented by the notified Node if its precursor Node is terminated or fails to set its precursor Node state to Node.signal. After the signal() task is complete, notice that the waked thread will exit from the while loop in the await() method above, because the node of the thread is already in the synchronization queue, then while (! IsOnSyncQueue (node)) will not meet the loop condition, and then call AQS acquireQueued() method to join the race to obtain the synchronization state, this is the whole process of waiting for wake up mechanism implementation principle, The flow is shown below (note that both synchronous and conditional queues use the same Node data structure, but with different internal variables).