preface
This is a reading note for chapter 5, section 6, the Art of Concurrent Programming in Java.
Any Java Object has a set of monitor methods (defined on java.lang.object and actually acting on the Object’s corresponding monitor), including wait(), wait(long timeout), notify(), and notifyAll(). These methods work with the synchronized synchronization keyword to implement wait/notification mode. The Condition interface also provides object-like monitor methods that work with Lock to implement wait/notification patterns, but the two differ in usage and functionality.
Object’s monitor method compared to the Condition interface
By comparing Object’s monitor method and Condition interface, you can understand Condition’s features in more detail. The comparison items and results are shown in the following table:
Condition interface with example
Condition defines two types of wait/notification methods that require the Lock associated with the Condition object (lock.lock ()) to be acquired in advance when the current thread calls these methods. The Condition object is created by the Lock object (which calls the Lock object’s newCondition() method); in other words, the Condition depends on the Lock object.
The interface method of Condition
The Condition interface method is as follows:
-
Void await() throws InterruptedException: The current thread enters the wait state until it is notified or interrupted. If a thread exits from await(), it has acquired the lock corresponding to the Condition object. Reasons for returning from the wait state to the run state include:
-
Other threads call the Condition’s signal() or signalAll() method, and the current thread is selected to wake up.
-
Other threads call the interrupt() method to interrupt the current thread.
-
-
Void awaitUninterruptibly() : The current thread enters the wait state until it is notified (signal). As the method name indicates, this method is not interrupt-sensitive.
-
Long awaitNanos(Long nanosTimeout) throws InterruptedException: The current thread enters the wait state until it is notified, interrupted, or timed out. The return value represents the time remaining, which is (nanosTimeout – actual time) if awakened before nanosTimeout is nanosTimeout. If the return value is 0 or negative, you have timed out.
-
Boolean await(long time, TimeUnit unit) throws InterruptedException: This method is equivalent to awaitNanos(unit.tonanos (time)) > 0.
-
Boolean awaitUntil(Date deadline) throws InterruptedException: The current thread enters the wait state until it is notified, interrupted, or at a certain time. This method returns true if it is not notified by the specified time, false otherwise.
-
Void signal() : Wakes up a thread waiting on Condition that must acquire the lock associated with Condition before returning from await().
-
Void signalAll(): Wakes up all threads waiting on Condition. Threads that can return from await() must acquire the lock associated with Condition.
An example use of Condition
A Condition must be obtained through Lock’s newCondition() method. Let’s take a closer look at how Condition is used with an example of a bounded queue. A bounded queue is a special kind of queue
- When the queue is empty, the fetch operation of the queue blocks the fetch thread until there are new elements in the queue.
- When the queue is full, insertion operations on the queue will block the insertion thread until the queue is “empty”.
The following is the implementation code of the bounded queue based on Condition:
public class BoundedQueue<T> { private Object[] items; Private int addIndex; Private int removeIndex; Private int count; private Lock lock = new ReentrantLock(); /** * remove operation thread await from notEmpty if queue is empty, Signal notEmpty Condition */ private Condition notEmpty = lock.newCondition(); /** * add operation thread await at notFull when queue full, Signal notFull Condition */ private Condition notFull = lock.newCondition(); public BoundedQueue(int size) { items = new Object[size]; } /** * add an element, if the array is full, add thread into wait state, until there is"Open"
*/
public void add(T t) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[addIndex] = t;
addIndex += 1;
if(addIndex == items.length) { addIndex = 0; } count += 1; notEmpty.signal(); } finally { lock.unlock(); Public T remove() throws InterruptedException {lock.lock(); try {while (count == 0)
notEmpty.await();
Object x = items[removeIndex];
removeIndex += 1;
if (removeIndex == items.length) {
removeIndex = 0;
}
--count;
notFull.signal();
return(T) x; } finally { lock.unlock(); }}}Copy the code
Here’s how the add(T) method works:
- The lock needs to be acquired first to ensure visibility and exclusivity of array changes. Also, to await() requires the current line to acquire the lock first.
- If the number of arrays equals the length of the array, the array is full, then notfull.await () is called, and the current thread releases the lock and enters the wait state.
- If the number of arrays is not equal to their length, the array is not full, and elements are added to the array, notifying the thread waiting on notEmpty that there are new elements in the array to fetch.
The purpose of the while loop in the add and delete methods here is also to prevent pseudo-wake up. This is also very similar to the classic wait/notification paradigm for threads.
Here is another test case of a bounded queue, and you can see that it does block ok.
public class BoundedQueueTest {
@Test
public void testCase01() throws InterruptedException {
BoundedQueue<Integer> boundedQueue = new BoundedQueue<>(3);
for (int i = 0; i < 3; i++) {
boundedQueue.add(i);
}
Thread thread = new Thread(new RunnableRemove(boundedQueue));
thread.start();
boundedQueue.add(3);
System.out.println("Succeed add!");
}
@Test
public void testCase02() throws InterruptedException {
BoundedQueue<Integer> boundedQueue = new BoundedQueue<>(3);
Thread thread = new Thread(new RunnableAdd(0, boundedQueue));
thread.start();
boundedQueue.remove();
System.out.println("Succeed remove!");
}
static class RunnableAdd implements Runnable {
int num;
BoundedQueue<Integer> boundedQueue;
public RunnableAdd(int num, BoundedQueue<Integer> boundedQueue) {
this.num = num;
this.boundedQueue = boundedQueue;
}
@Override
public void run() {
try {
System.out.println("New thread add element!");
Thread.sleep(1000 * 5);
boundedQueue.add(num);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("add:" + num);
}
}
static class RunnableRemove implements Runnable {
BoundedQueue<Integer> boundedQueue;
public RunnableRemove(BoundedQueue<Integer> boundedQueue) {
this.boundedQueue = boundedQueue;
}
@Override
public void run() {
try {
System.out.println("New thread remove element!");
Thread.sleep(1000 * 5);
int num = boundedQueue.remove();
System.out.println("remove:"+ num); } catch (InterruptedException e) { e.printStackTrace(); }}}}Copy the code
The realization analysis of Condition
ConditionObject is a static inner class of AQS that implements the Condition interface. Each Condition contains a wait queue, which is the key for Condition to implement the wait/notification function.
ConditionObject is taken as an example to analyze the implementation of Condition interface, which includes wait queue, wait and notification.
Waiting queue
-
Condition.await() is a FIFO queue. Each node in the queue contains a reference to the thread waiting on condition.await (). The thread then releases the lock, constructs the node to join the wait queue, and enters the wait state.
-
ConditionObject uses the Node definition in AQS. That is to say, the synchronous Node types in the queue and waiting queue are AbstractQueuedSynchronizer AQS inner class. The Node.
-
ConditionObject contains a wait queue, and ConditionObject has both the firstWaiter and the lastWaiter. The current thread calls the condition.await () method, which will construct the node from the current thread and queue the node from the tail.
-
ConditionObject has a reference to the beginning and end of the ConditionObject, and all you need to do is point nextWaiter at it and update it. The procedure of referring to the update above does not use CAS guarantees because the thread calling the await() method must be the one that has acquired the lock, that is, the procedure is thread-safe by the lock.
On the Monitor model of an Object, an Object has an entry set and a wait set, as described in Thread for Understanding Concurrent Programming in Java.
For the Lock or synchronization component that uses AQS and inherits the Lock interface, there is a synchronization queue and multiple wait queues. A synchronous queue refers to the synchronous queue in AQS, and multiple wait queues refer to the queues on multiple instances of ConditionObject. The relationship between AQS, synchronization queue and wait queue is shown in the figure below:
Waiting for await ()
Calling the await() method of Condition (or a method beginning with await) causes the current thread to queue and release the lock while the thread state changes to wait. The current thread must have acquired the lock associated with ConditionObject when it returns from the await() method.
View the await() method from the perspective of the synchronous queue and wait queue of AQS
- The thread calling await() successfully acquires the locked thread, which is the first node in the synchronization queue. This method reconstructs the current thread as a node and joins the wait queue, then releases the synchronization state and wakes up the successor nodes in the synchronization queue, and then the current thread enters the wait state.
The code for the await() method of ConditionObject is as follows:
public final void await() throws InterruptedException {
if(Thread.interrupted()) throw new InterruptedException(); // the current thread joins the queue. Node Node = addConditionWaiter(); Int savedState = fullyRelease(node); int interruptMode = 0;while(! isOnSyncQueue(node)) { LockSupport.park(this);if((interruptMode = checkInterruptWhileWaiting(node)) ! = 0)break;
}
if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! = null) // clean upif cancelled
unlinkCancelledWaiters();
if(interruptMode ! = 0) reportInterruptAfterWait(interruptMode); }Copy the code
- Instead of joining the queue directly, the first node of the synchronous queue (which wraps around the current thread) constructs the current thread as a new node and adds it to the queue using the addConditionWaiter() method.
Notify the signal ()
Calling ConditionObject’s signal() will wake up the node that has waited the longest in the ConditionObject queue (the first node) and move it to the synchronization queue before waking it up.
The signal() method of ConditionObject is as follows:
public final void signal() {
if(! isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter;if(first ! = null)doSignal(first);
}
Copy the code
- The precondition for calling the signal() method is that the current thread must have acquired the lock. As you can see, the signal() method does an isHeldExclusively() check that the current thread must be the one that acquired the lock.
- It then gets the head node of the wait queue, moves it to the synchronization queue and wakes up the thread in the node using LockSupport.
- By calling the synchronizer’s enQ (Node Node) method, the thread waiting for the head Node in the queue is safely moved to the synchronization queue. When the node is moved to the synchronization queue, the current thread wakes up the thread of the node using LockSupport.
- The awakened thread exits from the while loop in the await() method (isOnSyncQueue(Node Node) method returns true, the Node is already in the synchronization queue) and calls the synchronizer’s acquireQueued() method to join the race to get the synchronization state.
- After successfully obtaining the synchronization state (or lock), the awakened thread will return from the await() method called earlier, at which point it has successfully obtained the lock.
The process of the node moving from the wait queue to the synchronization queue is shown below:
ConditionObject’s **signalAll()** method is equivalent to executing signal() once for each node in the queue. The effect is to move all the nodes in the queue to the synchronization queue and wake up the thread on each node.