Java concurrency — condition
What is condition?
Condition was introduced in Java 1.5 and is used to replace the traditional wait() and notify() of Object for thread collaboration. It is safer and more efficient to implement inter-thread collaboration using await() and signal() of Condition. As a result, Condition is generally recommended. Blocking queues actually use Condition to simulate collaboration between threads.
In Java, sychronized and Lock+Condtion belong to the pipe procedure model, and Condition represents the waiting Condition in the pipe procedure model. Condition is used on the basis of Lock, on the basis of the original Lock can be based on a variety of conditions to realize the synchronization effect of the thread after adding a number of conditions, we can be more targeted, but also more flexible coordination under a variety of conditions thread coordination.
Conditions (also known as conditional queues or conditional variables) provide a way for one thread to pause execution (” wait “) until another thread informs that certain status conditions are now true. Because access to this shared state information occurs in a different thread, it must be secured, so some form of lock is associated with this condition. The key property provided by the wait condition is that it acts like an atom to automatically release the associated lock and suspend the current thread object.wait.
A Condition instance is essentially bound to a lock. To get an instance of a particular Lock instance of Condition, use its newCondition() method.
A Condition implementation can provide behavior and semantics that are different from Object monitoring methods, such as guaranteeing the order of notifications, or not needing to hold a lock when executing notifications. If the implementation provides such special semantics, the implementation must document those semantics.
Note:
Condition instances are just plain objects that can themselves be used as targets in synchronized statements and can invoke their own monitor WAIT and Notification methods. Acquiring the monitor Lock of a Condition instance or using its monitor methods has no specific relationship to acquiring the monitor Lock of that instance or using the Condition’s wait and signaling methods. It is recommended to avoid confusing the Condition and not to use the instance in this way unless possible in its own implementation.
The three forms of conditional wait (interruptible, non-interruptible, and timed) may vary in their ease of implementation and performance characteristics on some platforms. In particular, it may be difficult to provide these capabilities and maintain specific semantics, such as sorting assurance. In addition, the ability to actually suspend an interrupt thread may not always be feasible on all platforms.
Therefore, there is no need to define exactly the same guarantee or semantic implementation for all three forms of wait, nor to support the actual suspension of interrupted threads.
An implementation is needed to clearly document the semantics and guarantees provided by each wait method, and when the implementation does support interrupt thread suspension, it must obey the interrupt semantics defined in this interface.
Because interrupts usually mean cancellation, and interrupt checking is not usually done, implementations may prefer to respond to interrupts over normal method returns. This is true even if it can be proved that the interrupt occurred after another operation that might unblock the thread. The implementation should document this behavior.
Condition is an interface, and the basic methods are await() and signal() methods;
Condition depends on the Lock interface, and the basic code to generate a Condition is lock.newcondition ();
Call the await() and signal() methods of Condition, which must be within lock protection, that is, between lock.lock() and lock.unlock.
-await () in Conditon corresponds to wait() of Object;
– Signal () in Condition corresponds to notify() of Object;
– signalAll() in Condition corresponds to notifyAll() of Object.
What does Condition do?
– ReentrantLock is a more flexible extension of synchronized. Another ReentrantLock application scenario is used in conjunction with Condition to implement the function of waiting state conditions in multi-threaded environments. Object.wait and object. notify are used with synchronized, and the Condition variables Condition. Await and Condition. Signal are associated with ReentrantLock.
The ArrayBlockingQueue in the JDK uses Condition to synchronize the empty/full state of the queue.
– Condition Implements the producer and consumer modes
Condition source code analysis
/ * * *@since1.5 * /
public interface Condition {
/** * causes the current thread to wait until signaled or interrupted */
void await(a) throws InterruptedException;
/** * causes the current thread to wait until signaled. * /
void awaitUninterruptibly(a);
/** * causes the current thread to wait until signaled or interrupted or a specified wait time */
long awaitNanos(long nanosTimeout) throws InterruptedException;
/** * causes the current thread to wait until signaled or interrupted or a specified wait time has passed. * /
boolean await(long time, TimeUnit unit) throws InterruptedException;
/** * causes the current thread to wait until signaled or interrupted or the specified cutoff time has passed. * /
boolean awaitUntil(Date deadline) throws InterruptedException;
/** ** Wakes up a waiting thread */
void signal(a);
/** * Wake up all waiting threads. * /
void signalAll(a);
}
Copy the code
public final void await(a) throws InterruptedException {
// 1. If the current thread is interrupted, an interrupt exception is thrown
if (Thread.interrupted())
throw new InterruptedException();
// add the node to the Condition queue. If lastWaiter is in the Condition queue, it will be kicked out of the Condition queue.
Node node = addConditionWaiter();
// 3. Call tryRelease to release the current thread lock
long savedState = fullyRelease(node);
int interruptMode = 0;
// 4. Why is there a judgment on the waiting queue in AQS?
// Answer: The signal operation removes a Node from the Condition queue and places it on the AQS queue, depending on whether the signal is executed
If it is not in the AQS queue, park the current thread. If it is in the AQS queue, exit the loop. If it is interrupted, exit the loop
while(! isOnSyncQueue(node)) { LockSupport.park(this);
if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
break;
}
// 5. The thread has already been woken up by signal() or signalAll() and exits the while loop in 4
// Spin waits to attempt to acquire the lock again, calling the acquireQueued method
if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! =null)
unlinkCancelledWaiters();
if(interruptMode ! =0)
reportInterruptAfterWait(interruptMode);
}
// Add a condition node
private Node addConditionWaiter(a) {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if(t ! =null&& t.waitStatus ! = Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node =new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
public final void signal(a) {
// Check whether the current thread has acquired the lock, otherwise exception
if(! isHeldExclusively())throw new IllegalMonitorStateException();
Node first = firstWaiter;
if(first ! =null)
doSignal(first);
}
// Remove the head node and move the head node to the synchronization queue
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while(! transferForSignal(first) && (first = firstWaiter) ! =null);
}
final boolean transferForSignal(Node node) {
// CAS sets the state to 0, which is an invalid state
if(! compareAndSetWaitStatus(node, Node.CONDITION,0))
return false;
/* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */
Node p = enq(node);
int ws = p.waitStatus;
// Wake up the thread
if (ws > 0| |! compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread);return true;
}
// This method is AQS, node into the synchronization queue
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
returnt; }}}}Copy the code
// Wake up the longest waiting thread
// Move the node corresponding to this thread from the conditional queue to the blocking queue
public final void signal(a) {
// The thread calling signal must hold the current exclusive lock
if(! isHeldExclusively())throw new IllegalMonitorStateException();
Node first = firstWaiter;
if(first ! =null)
doSignal(first);
}
// Walk back from the conditional queue to find the first node to transfer
private void doSignal(Node first) {
do {
// Set firstWaiter to the first after the first node because the first node is leaving soon
// Set lastWaiter to null if there is no node waiting after first is removed
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// Because first is about to be moved to the blocking queue, the link to the conditional queue is broken here
first.nextWaiter = null;
} while(! transferForSignal(first) && (first = firstWaiter) ! =null);
// In this while loop, if the first transition fails, the first node after first is selected for the transition, and so on
}
// Move the node from conditional queue to blocking queue
// True indicates successful transfer
// False indicates that the node has been canceled before signal
final boolean transferForSignal(Node node) {
// If the CAS fails, the node's waitStatus is no longer Node.CONDITION, and the node is cancelled.
// Now that it has been canceled, there is no need to move it
// Otherwise, set waitStatus to 0
if(! compareAndSetWaitStatus(node, Node.CONDITION,0))
return false;
Enq (node): spins to the end of the blocking queue
// Note that the return value p is node's precursor node in the blocking queue
Node p = enq(node);
int ws = p.waitStatus;
// ws > 0 indicates that node unlocks the wait lock on the precursor node in the blocking queue, and directly wakes up the corresponding node thread. What happens when you wake up? I'll explain later
// If ws <= 0, compareAndSetWaitStatus will be called. If ws <= 0, compareAndSetWaitStatus will be called.
if (ws > 0| |! compareAndSetWaitStatus(p, ws, Node.SIGNAL))// If the precursor node cancels or the CAS fails, the thread will wake up here
LockSupport.unpark(node.thread);
return true;
}
Copy the code
Condition of actual combat
Scenario: Print 0-50 data sequentially based on multiple threads
public class Test {
public static void main(String[] args) {
Data data = new Data();
new Thread(() -> {
for (int i = 0; i < 10; i++) { data.printA(); }},"A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) { data.printB(); }},"B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) { data.printC(); }},"C").start(); }}class Data{
Lock lock = new ReentrantLock();
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();
Condition condition3 = lock.newCondition();
private int num = 1;
private int size = 0;
public void printA(a) {
lock.lock();
try {
while(num ! =1) {
condition1.await();
}
System.out.println(Thread.currentThread().getName() + ":" + size ++);
num = 2;
condition2.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally{ lock.unlock(); }}public void printB(a) {
lock.lock();
try {
while(num ! =2) {
condition2.await();
}
System.out.println(Thread.currentThread().getName() + ":" + size ++);
num = 3;
condition3.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally{ lock.unlock(); }}public void printC(a) {
lock.lock();
try {
while(num ! =3) {
condition3.await();
}
System.out.println(Thread.currentThread().getName() + ":" + size ++);
num = 1;
condition1.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally{ lock.unlock(); }}}Copy the code
Execution Result:
A:1
B:2
C:3
A:4
B:5
C:6
A:7
B:8
C:9
A:10
B:11
C:12
A:13
B:14
C:15
A:16
B:17
C:18
A:19
B:20
C:21
A:22
B:23
C:24
A:25
B:26
C:27
A:28
B:29
C:30
Copy the code
Refer to the article (thanks for this article) :
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/Condition.html javadoop.com/post/Abstra…