This is the seventh day of my participation in the August More text Challenge. For details, see: August More Text Challenge
1, Condition introduction
Condition is a multithreaded communication utility class that provides await/signal methods that allow threads to wait for a Condition together and awaken only when the Condition is met.
The Condition interface is provided in Java concurrent programming. The await/signal mechanism of this interface is designed to replace the wait/notify mechanism of monitor locks. The Condition interface allows for more complex and fine-grained coordination between multiple threads.
The Condition interface code is as follows:
public interface Condition {
/** * Thread waiting */
void await() throws InterruptedException;
/** * Thread waits for the specified number of nanoseconds */
void awaitUninterruptibly();
/** * The thread waits for the specified time */
long awaitNanos(long nanosTimeout) throws InterruptedException;
/** * The thread waits without responding to interrupts */
boolean await(long time, TimeUnit unit) throws InterruptedException;
/** * Thread waits until the specified date */
boolean awaitUntil(Date deadline) throws InterruptedException;
/** * Thread wake up */
void signal();
/** * calls all blocked threads */
void signalAll();
}
Copy the code
Condition and Object wait/wake methods compare:
Method of the Object class | Condition interfaces | note |
---|---|---|
Void wait () |
Void await () |
There are similar methods |
Void wait (long timeout) |
Long awitNanos (long nanosTimeout) |
The unit of time is different from the return value |
Void wait (long timeout, int nanos) |
Boolean await (long time, TimeUnit unit) |
Time unit, parameter and return value are different |
Void notify () |
Void signal () |
There are similar methods |
Void notifyAll () |
Void signalAll () |
There are similar methods |
Void awitUninterruptibly () |
Condition interface unique | |
Boolean awaitUntil (Date deadline) |
Condition interface unique |
Object’s monitor methods compare to the Condition interface:
Compare the item | Object monitor method | Condition |
---|---|---|
precondition |
Gets the lock of the object | Call lock.lock () to get the Lock. Call lock.newcondition () to get the Condition object |
Call way |
Call directly such as: object.wait() | Call directly such as condition.wait() |
Number of waiting queues |
a | multiple |
The current thread releases the lock and enters the wait state |
support | support |
The current thread releases the lock and enters the wait state, where it does not respond to interrupts |
Does not support | support |
The current thread releases the lock and enters the timeout wait state |
support | support |
The current thread releases the lock and enters the wait state until some future time |
Does not support | support |
Wakes up a thread in the waiting queue |
support | support |
Wake up all threads in the waiting queue |
support | support |
2. The implementation of Condition
ConditionObject inner class
AQS ConditionObject (AbstractQueuedSynchronizer) inner class implements the interface Condition.
final class ConditionObject implements Condition {
/** * Conditional queue head node */
private transient Node firstWaiter;
/** * The last node of the conditional queue */
private transient Node lastWaiter;
/** * ConditionObject default constructor */
public ConditionObject(){}}Copy the code
The structure is shown in the figure (unidirectional queue) :
2, Await () method
await()
Method process equivalentThe first node of the synchronization queue (the node that acquired the lock) is moved to the Condition’s wait queue.
public void await() throws InterruptedException {
if (Thread.interrupted())// If the current thread is interrupted, throw an interrupt exception
throw new InterruptedException();
Node node = addConditionWaiter();// Add a new wait node to the conditional queue
int savedState = fullyRelease(node);// Release the resources occupied by the current node and return the status value held by the thread
int interruptMode = 0;
while(! isOnSyncQueue(node)) {// Determine if the thread node is in the synchronization queue
LockSupport.park(this);// Block the current thread and wait to wake up if it is not in the synchronization queue
/* * Can execute the following code to indicate that the thread has been awakened from the blocked state * 1: Is thread receives signals to the signal from the blocking state of awakened * checkInterruptWhileWaiting return values have 3 * 0 indicates: the thread is not interrupted by * 1 REINTERRUPT said: THROW_IE indicates the */ where the interrupt occurred before signal
if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)// Check whether thread interrupts have occurred
break;// If the thread is not interrupted, it will be woken by signal
}
/* * acquireQueued returns true * acquireQueued returns true * acquireQueued returns true * acquireQueued returns true AcquireQueued returns true returns false means no interrupt was sent and the following assignment will not go to * If acquireQueued returns true And interruptMode is non-throw_ie. The entire method is a result of REINTERRUPT because there is no need to throw an exception */
if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;// Record the thread interrupt representation bit
If the current node nextWaiter is not null, then the node is still in the Condition queue. If the current node nextWaiter is not null, then the node is still in the Condition queue What happens in that case is when the current interruptMode is THROW_IE * why? Because THROW_IE means to interrupt sending before signal. Signal * Because if it's signal The current node's nextWaiter is set to null
if(node.nextWaiter ! =null)
unlinkCancelledWaiters();
if(interruptMode ! =0)// 0 indicates that no interrupt has occurred
reportInterruptAfterWait(interruptMode);
}
Copy the code
addConditionWaiter()
This method encapsulates the current thread as a Node and adds it to the wait queue in the Condition. The queue here is no longer a two-way list, but a one-way list.
/** * Add a new waiting node to the waiting condition queue **@return its new wait node
*/
private Node addConditionWaiter() {
Node t = lastWaiter;// Wait for the last one in the queue for the condition
// This method clears the ConditionQueue if the last lastWaiter node is not in the Condition
if(t ! =null&& t.waitStatus ! = Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter;// The lastWaiter may have overwritten the assignment for the unlinkCancelledWaiters method
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);// The current thread is wrapped as a node
/* * if t is null, the last node is null. /* * if t is null, the last node is null
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;//lastWaiter overrides assignment because node is the last node to be added
return node;
}
Copy the code
afteraddConditionWaiter
After this method, a condition queue like this is generated
UnlinkCancelledWaiters ()
When a node is added to the LastWaiter queue, the LastWaiter state is not in the CONDITION, and the node is removed from the queue from the beginning.
/** * The conditional queue is moved from the header unless the CONDITION node */
private void unlinkCancelledWaiters() {
Node t = firstWaiter;// Assign the header node to t
Node trail = null;// Trail is the node whose CONDITION precedes the next node of t
// This loop removes a node that is not a CONDITION from the starting node
while(t ! =null) {
Node next = t.nextWaiter;//next is the next node of t
if(t.waitStatus ! = Node.CONDITION) {// If the state of t is not CONDITION it should not be removed from the CONDITION queue
t.nextWaiter = null;// Set the next node of t to null, so that t is disconnected from the entire conditional queue list to facilitate GC
/** *trail = null; /** *trail = null; /** *trail = null If trail is not null then assign the next node of the local trail to the next node of the trail. Since the current node t is not available, the * next node of T is re-associated with the list Trail is the last valid node of t *, so there is this assignment * */
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
/* * If next is null, then trail is the last valid node * */
if (next == null)
lastWaiter = trail;
} else
trail = t;// Trail is equivalent to a temporary variable where the assignment is the last valid node value of next
t = next;//next is assigned to t to prepare the next loop}}Copy the code
FullyRelease ()
methodsReleases the synchronization state and wakes up the thread referenced by the successor node of the head node in the synchronization queue, returns normally if the release was successful, otherwise an exception is thrown.
/** * Releases all resources held by the current node and wakes up the head node in the synchronization queue to get the resources */
final int fullyRelease(Node node) {
boolean failed = true;// Indicates whether the release fails
try {
int savedState = getState();// Get the synchronizer state value
if (release(savedState)) {// Release the resource and wake up the waiting thread to fetch the resource
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();// Release failure throws an exception}}finally {
if (failed)
node.waitStatus = Node.CANCELLED;// If the release fails, set the current node to cancel
// Discard nodes that are not Condition}}Copy the code
At this point, the synchronization queue triggers lock release and re-contention. ThreadB acquired the lock.
IsOnSyncQueue ()
The method is to determine whether the current node is in the SyncQueue, if so, jump out of the while loop and execute the next method, if not, then enter the body of the while loop.
/** * Determine whether the current node is in the synchronization queue */
final boolean isOnSyncQueue(Node node) {
/* * If the node is added to the Sync queue using the enq method, the pre of the current node must be non-null * if the pre is null * it is not in Sync queue */
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if(node.next ! =null) // If the current node has a successor, it must be in the synchronization queue because next is a node in the synchronization queue
return true;
return findNodeFromTail(node);// Desynchronize the matching node in the queue
}
/** * Start from the tail node and see if you can find the current node */
private boolean findNodeFromTail(Node node) {
Node t = tail;// Synchronize the tail node of the queue
for(; ;) {if (t == node)// t ==node Indicates that the synchronization queue can find true
return true;
/* * t==null The first loop indicates that the tail node does not exist, indicating that the synchronization queue does not exist, and the node is unlikely to exist in the synchronization queue Node returns false */
if (t == null)
return false; t = t.prev; }}Copy the code
CheckInterruptWhileWaiting ()
Method is used to determine whether a thread has been interrupted while waiting after it has been awakened.- If the current thread is interrupted
TransferAfterCancelledWait ()
Method determines whether subsequent processing should throw InterruptedException or re-interrupt.
/** * Mode meaning to reinterrupt on exit from wait */
private static final int REINTERRUPT = 1;
/** * Mode meaning to throw InterruptedException on exit from wait */
private static final int THROW_IE = -1;
/** * Check whether a thread interrupt has occurred * Return 0 for: the thread was not interrupted * 1 for: the interrupt occurred after signal * -1 for: the interrupt occurred before signal */
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
/** * Transfers node, if necessary, to sync queue after a cancelled wait. * Returns true if thread was cancelled before being signalled. */
final boolean transferAfterCancelledWait(Node node) {
/* * * As I mentioned earlier, there are two ways to wake up: it can be either excursion or Interrupted * The following is a CAS operation that updates the current state of the node as 0 * If the update is successful, it means that the current state of the node is still CONDITION In the same way, if the update fails, it means that the current node states have been modified. In the same way, it means that the current node states have been modified by the signal excursion. It means that the current node states have been modified by the signal excursion * / in the queue
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);// The successful update here indicates that the current thread was interrupted and that the interrupt preceded the signal by a compensation operation here to put the node into the Sync queue
return true;
}
/* * If the current node is in a synchronous queue, the current node is in a synchronous queue. /* * If the current node is in a synchronous queue, the current node is in a synchronous queue So here, although the state has changed, maybe the thread is executing the ENq method. So here, we're just checking that if it's not in the Sync queue * then the current thread has to yield, which means it's going to wait a minute */
while(! isOnSyncQueue(node)) Thread.yield();return false;
}
Copy the code
ReportInterruptAfterWait ()
Method, where interruptMode is equal to THROW_IE, indicating that an interruption occurred. When interruptMode equals THROW_IE, the reportInterruptAfterWait() method throws an interrupt exception.
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
// Throw an exception directly
if (interruptMode == THROW_IE)
throw new InterruptedException();
// Restart the interrupt and the thread handles it itself
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
Copy the code
The await() method indicates:
- Adds the current thread to the conditional wait queue
- Releases the synchronization state acquired by the current thread and wakes up the successor node
- Suspends the current thread
- After waking up (check if it was interrupted), get the synchronization status again
Signal () /SignalAll () method
Signal ()
The method is to select a valid node from the Contidion header and convert it to the SyncQueue.
public void signal() {
if(! isHeldExclusively())throw new IllegalMonitorStateException();
Node first = firstWaiter;
if(first ! =null)
doSignal(first);
}
Copy the code
isHeldExclusively()
Method needs to be subclassed to determine whether the current thread is the one that acquired the lock.
/** * a judge to determine whether the thread used for the lock and the thread releasing the lock are the same subclass from write implementation */
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
Copy the code
doSignal()
Remove the head node from the waiting queue.
/** * causes the first node in the conditional queue not to enq to the end of the synchronization queue */
private void doSignal(Node first) {
do {
Set lastWaiter to null * set nextWaiter of first to null */
if ((firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;// First to be added to the queue nextWaiter==null
} while(! transferForSignal(first) && (first = firstWaiter) ! =null);
}
Copy the code
transferForSignal()
Method to add a node to the synchronization queue.
/** * Convert the node from the adjustment queue to the synchronization queue. If true is returned, the transition is successful */
final boolean transferForSignal(Node node) {
/* * If the current CAS operation fails, the node is not in the condition and may have been canceled
if(! compareAndSetWaitStatus(node, Node.CONDITION,0))
return false;
Node p = enq(node);// Add the current node to the synchronization queue to monopolize the lock
int ws = p.waitStatus;
/ * * this ws is a state of node prev node p If p ws that means greater than 0 p has been cancel That can wake up directly to the node node * here don't understand can go shouldParkAfterFailedAcquire method a look In the same way that the following CAS action changes the ws state of the precursor P to signal, the current P state has been changed by another thread * It is necessary to wake up the node thread to acquire the resource lock
if (ws > 0| |! compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread);return true;
}
Copy the code
SignalAll ()
This is done by converting all ConditiaonQueue nodes to SyncQueue.
public void signalAll() {
if(! isHeldExclusively())throw new IllegalMonitorStateException();
Node first = firstWaiter;
if(first ! =null)
doSignalAll(first);
}
/** * Remove all nodes in the conditional queue from the synchronous queue */
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;// The conditional queue is null because all nodes have been moved
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while(first ! =null);// Loop until the last nextWaiter equals null
}
Copy the code
AwaitNanos ()
The Condition Queue () method has a timeout function and a response to interrupts, either to move the node from the Condition Queue to the Sync Queue.
/**
* Impelemnts timed condition wait
*
* <li>
* If current thread is interrupted, throw InterruptedException
* Save lock state returned by {@link #getState()}
* Invoke {@link #release(int)} with saved state as argument,
* throwing IllegalMonitorStateException if it fails
* Block until aignalled, interrupted, or timed out
* Reacquire by invoking specified version of
* {@link #acquire(int)} with saved state as argument
* If interrupted while blocked in step 4, throw InterruptedException
* </li>
*/
/** * all awaitXX methods are * 0. Encapsulate the current thread as Node and add it to Condition * 1. * 2. Wait for another thread that has acquired the exclusive lock to wake up, wake up from the Condition Queue to the Sync Queue, and acquire the exclusive lock * 3. After the lock is finally acquired, it is processed according to how the thread is woken up (signal/interrupt) * 4. Finally, you need to call lock./unlock to release the lock */
@Override
public final long awaitNanos(long nanosTimeout) throws InterruptedException {
if(Thread.interrupted()){ // 1. Check whether the thread is interrupted
throw new InterruptedException();
}
Node node = addConditionWaiter(); // 2. Wrap the thread as a Node and put it in the Condition Queue. There may be some cleaning
int savedState = fullyRelease(node); // 3. Release all locks acquired by the current thread (PS: to call await method, the current thread must have acquired an exclusive lock)
final long deadline = System.nanoTime() + nanosTimeout; // 4. Calculate the cut-off time for wait
int interruptMode = 0;
while(! isOnSyncQueue(node)){// 5. Determine if the current thread is in Sync Queue. (if the Node is in Sync Queue, it can be transferred from the Condtion Queue to the Sync Queue.) The current thread is interrupted by the Node of the transfer (transfer in checkInterruptWhileWaiting))
if(nanosTimeout <= 0L){ // 6. NanosTimeout timeout (where nanosTimeout is possible < 0),
transferAfterCancelledWait(node); / / 7. Call transferAfterCancelledWait will Node from Condition to Sync Queue
break;
}
if(nanosTimeout >= spinForTimeoutThreshold){ // 8. When the remaining time is < spinForTimeoutThreshold, the spin function is actually more efficient than locksupport. parkNanos
LockSupport.parkNanos(this, nanosTimeout); // 9. Block the thread
}
if((interruptMode = checkInterruptWhileWaiting(node)) ! =0) {/ / 10. Determine whether the thread wakes up because the thread is interrupted, if be interrupted, will be in transferAfterCancelledWait checkInterruptWhileWaiting node metastasis; Returns the value interruptMode! = 0
break; // This is a thread interrupt to wake up, and the node has been transferred to the Sync Queue
}
nanosTimeout = deadline - System.nanoTime(); // 11. Calculate the remaining time
}
if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE){// 12. Call acquireQueued to acquire the exclusive lock in the Sync Queue. The return value indicates whether the acquisition was interrupted
interruptMode = REINTERRUPT;
}
if(node.nextWaiter ! =null) {// 13. Pass "node.nextwaiter! = null" indicates whether the thread has been woken up as an interrupt or a signal, since the Node representing the thread is present in both Condition Queue and Sync Queue
unlinkCancelledWaiters(); // 14. Delete the cancelled node
}
if(interruptMode ! =0) {// 15. "interruptMode ! = 0" means to wake up the thread by interruption
reportInterruptAfterWait(interruptMode); // 16. Decide whether to throw an exception or interrupt yourself, depending on the type of interruptMode
}
return deadline - System.nanoTime(); // 17 The return value indicates whether to pass signal or timeout
}
Copy the code
4, the salient features of the Condition
- 1. Rely on two simultaneous wait queues, one is
AQS
Provide, another isConditionObject
Provide. - 2,
await()
Method will releaseAQS
The blocking nodes in the wait queue are synchronized. These nodes are added to the conditional wait queue to block. - 3,
signal()
orsignalAll()
The node in the conditional wait queue will be rejoinedAQS
In the synchronous waiting queue, the blocking status of normal nodes is not lifted. - 4. In step 3, these enter
AQS
The nodes in the synchronized wait queue will compete again to become the head node, and the next step is the same as in the exclusive mode analyzed earlierAQS
How it works.
Condition is used to realize the producer-consumer pattern
import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class BoundedQueue {
private LinkedList<Object> buffer; // The producer container
private int maxSize ; // What is the maximum value of the container
private Lock lock;
private Condition fullCondition;
private Condition notFullCondition;
BoundedQueue(int maxSize){
this.maxSize = maxSize;
buffer = new LinkedList<Object> (); lock =new ReentrantLock();
fullCondition = lock.newCondition();
notFullCondition = lock.newCondition();
}
/** * producer *@param obj
* @throws InterruptedException* /
public void put(Object obj) throws InterruptedException {
lock.lock(); / / acquiring a lock
try {
while (maxSize == buffer.size()){
notFullCondition.await(); // If it is full, the added thread enters the wait state
}
buffer.add(obj);
fullCondition.signal(); / / notice
} finally{ lock.unlock(); }}/** * Consumer *@return
* @throws InterruptedException* /
public Object get() throws InterruptedException {
Object obj;
lock.lock();
try {
while (buffer.size() == 0) {// There is no data in the queue
fullCondition.await();
}
obj = buffer.poll();
notFullCondition.signal(); / / notice
} finally {
lock.unlock();
}
returnobj; }}Copy the code
Phase to recommend
- Java Basics: The Object class
- Do you understand these frequent meeting questions of the Object class?
- Spring common API: Spring class and related surface pilot
- Java concurrent programming: Threads
- Thread Pools
- Java Concurrency – details of common blocking queues
- In August in Java lock | more challenges
- Java concurrency -JUC lock core class AQS detailed