preface
ConditionObject is one of the internal classes in AQS, but ConditionObject was not introduced at that time. When you read the source code later, you will find that Condition is used in many places. Read the Condition source code today to find out what Condition does. Of course, by the time you read this article, you should have read AQS, ReentrantLock and LockSupport.
[liuzhirichard] Record technology, development and source code notes in work and study. From time to time, share what you’ve seen and heard in your life. Welcome to guide!
introduce
Object monitor methods: Wait, notify, and notifyAll are familiar. In multi-threaded scenarios, synchronized must be used to obtain the lock before Object’s wait and notify can be called.
The use of Condition is equivalent to replacing synchronized with Lock, and then replacing Object’s monitor methods with Condition.
Conditions (also known as condition queues or condition variables) provide a way for one thread to pause execution (wait) until another thread informs the blocked thread that certain state Conditions may now be true.
Because access to this shared state information occurs in different threads, it must be protected, so some form of lock is used. The key attribute provided by the wait condition is that it releases the associated lock atomically and suspends the current thread, just like object.wait.
Condition instances are essentially bound to locks. To obtain a Condition instance, the newCondition() method of a Lock instance is typically used.
Lock lock = new ReentrantLock();
Condition con = lock.newCondition();
Copy the code
The basic use
class BoundedBuffer {
final Lock lock = new ReentrantLock();
Condition instance depends on lock instance
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putPtr, takePtr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
// put to check if it is full
// the thread queue blocks on the notFull condition
while (count == items.length) {
notFull.await();
}
items[putPtr] = x;
if (++putPtr == items.length) {
putPtr = 0;
}
++count;
// After the put succeeds, there are elements in the queue
// Wake up the queue blocking thread on the notEmpty condition
notEmpty.signal();
} finally{ lock.unlock(); }}public Object take(a) throws InterruptedException {
lock.lock();
try {
// take is found empty
// the thread queue blocks on the condition notEmpty
while (count == 0) {
notEmpty.await();
}
Object x = items[takePtr];
if (++takePtr == items.length) {
takePtr = 0;
}
--count;
// the queue cannot be full
// Wake up the thread queued to block on the notFull condition
notFull.signal();
return x;
} finally{ lock.unlock(); }}}Copy the code
Here’s an example of the official documentation that implements a simple BlockingQueue, and if you look at it, you’ll see that this logic is used in many places in synchronized queues. The necessary code instructions are commented out in the code.
Question question
- What does Condition have to do with AQS?
- What is the implementation principle of Condition?
- What are the differences and connections between the waiting queue of Condition and the synchronous queue of AQS?
Source code analysis
The basic structure
It can be seen from UML that Condition is only an abstract class, and its main implementation logic is realized in ConditionObject, an internal class of AQS. ConditionObject ConditionObject ConditionObject ConditionObject ConditionObject ConditionObject ConditionObject
Create a Condition
Lock lock = new ReentrantLock();
Condition con = lock.newCondition();
Copy the code
Condition variables are typically created using lock.newcondition ().
public class ReentrantLock implements Lock.java.io.Serializable {
private final Sync sync;
public Condition newCondition(a) {
return sync.newCondition();
}
// Sync integrates AQS
abstract static class Sync extends AbstractQueuedSynchronizer {
final ConditionObject newCondition(a) {
return newConditionObject(); }}}Copy the code
Used here is already source, call the sync. NewCondition (), sync inheritance AQS, is actually created a ConditionObject AQS inner class instance.
Here it is important to note the lock every call lock, newCondition () there will be a new ConditionObject instance, is a lock can create multiple instances Condition.
Condition parameters
/** The first node of the conditional queue */
private transient Node firstWaiter;
/** The last node in the conditional queue */
private transient Node lastWaiter;
Copy the code
Await method
Await method, which causes the current thread to wait until it receives a signal or is interrupted.
The lock associated with this Condition is released atomically, and for thread scheduling purposes, the current thread is disabled and dormant until one of the following four conditions occurs:
- Some other thread calls the Condition’s signal method, and the current thread is chosen as the one to wake up;
- Several other threads call the Condition’s signalAll method;
- Other threads interrupt the current thread and support interrupt thread suspension;
- False wake up occurred.
In all cases, the current thread must reacquire the lock associated with this condition before this method can return. This lock is guaranteed to be held when the thread returns.
Now look at the implementation logic within AQS:
public final void await(a) throws InterruptedException {
// The response is interrupted
if (Thread.interrupted())
throw new InterruptedException();
// Add to end of conditional queue (wait queue)
// A Node of type Node.CONDITION is created internally
Node node = addConditionWaiter();
// Release the lock acquired by the current thread (by manipulating the value of state)
// The lock will be blocked and suspended
int savedState = fullyRelease(node);
int interruptMode = 0;
// If the node is no longer in the synchronization queue, park is called to make it hang in the wait queue
while(! isOnSyncQueue(node)) {// Call park to block and suspend the current thread
LockSupport.park(this);
Signal was called or the thread was interrupted
// If the terminal is woken up, the loop is broken
if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
break;
}
// When the while loop ends, the thread grabs the lock
if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! =null) // clean up if cancelled
unlinkCancelledWaiters();
// Handle interrupts uniformly
if(interruptMode ! =0)
reportInterruptAfterWait(interruptMode);
}
Copy the code
The await method steps are as follows:
- CONDITION Node and add it to ConditionQueue. ConditionQueue
- Release the lock acquired by the current thread (by manipulating the value of state)
- Check whether the current thread is in the SyncQueue, otherwise park will be used to suspend.
- At the end of the loop, it indicates that it is already in the SyncQueue, and then waits for the lock to be obtained.
Make a clear distinction between conditional queues and synchronous queues.
Conditional queue/wait queue: queue of Condition synchronous queue: queue of AQS.
Read the important methods in await below:
- AddConditionWaiter () method
private Node addConditionWaiter(a) {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
// Determine the state of the tail node, if cancelled, then clear all cancelled nodes
if(t ! =null&& t.waitStatus ! = Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; }// Create a new Node of type node.condition
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// Put the new node at the end of the wait queue
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
Copy the code
The addConditionWaiter method simply creates a Node of type Node.CONDITION and places it at the end of the CONDITION queue. Other conclusions can also be drawn from this code:
- The nodes inside the conditional queue use only the Thread, waitStatus, and nextWaiter attributes.
- Conditional queues are one-way queues.
For comparison, conditional queues are compared to synchronous queues:
The AQS synchronization queue is as follows:
Let’s look at the conditional queue of conditions
WaitStatus is introduced in AQS:
- The default value is 0.
- WaitStatus > 0 (CANCELLED 1) Indicates that the node times out or is interrupted and needs to be removed from the queue.
- WaitStatus = -1 SIGNAL If the status of the previous node of the current thread is SIGNAL, the current thread should block (unpark).
- WaitStatus = -2 condition-2: the node is currently in the conditional queue.
- WaitStatus = -3 PROPAGATE -3: releaseShared should be propagated to other nodes for use in shared lock mode.
- FullyRelease method (AQS)
final int fullyRelease(Node node) {
boolean failed = true;
try {
// Get the state of the current node
int savedState = getState();
/ / releases the lock
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw newIllegalMonitorStateException(); }}finally {
if(failed) node.waitStatus = Node.CANCELLED; }}Copy the code
The fullyRelease method, provided by AQS, first gets the current state and then calls the Release method to release the lock.
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if(h ! =null&& h.waitStatus ! =0)
unparkSuccessor(h);
return true;
}
return false;
}
Copy the code
The release method is described in detail in AQS. Its main function is to release the lock, and note that:
- FullyRelease will release all the locks at once, so no matter how many times you reenter it, it will all be released here.
- Exceptions are thrown, mainly when the lock fails to be released, and the Node status is set to node.cancelled in finally.
- isOnSyncQueue(node)
Through the above flow, the node has been placed on the conditional queue and released the held lock, which will then hang and block until signal wakes up. However, only when the node is no longer in the SyncQueue can it be suspended.
final boolean isOnSyncQueue(Node node) {
// The current node is a conditional queue node, or the previous node is empty
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if(node.next ! =null) // If has successor, it must be on queue
return true;
return findNodeFromTail(node);
}
// Start traversal from the tail
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false; t = t.prev; }}Copy the code
Returns true if a node (always a node originally placed on the conditional queue) is now waiting to be reacquired on the synchronous queue.
The main purpose of this code is to determine whether the node is in the synchronization queue. If not, park will be called later to block the current thread. There is a question: the AQS synchronization queue and the Condition queue should be independent, why do we need to ensure that the node is not behind the synchronization queue before blocking? After a node is woken up by signal or signalAll, it is placed in a synchronization queue.
The thread is blocked at this point and wakes up when another thread calls Signal or signalAll.
The current thread is then verified to wake up due to an interrupt, assuming that no interrupt has occurred. The isOnSyncQueue(Node Node) of the while loop must return true, indicating that the current Node is already in the synchronization queue.
AcquireQueued (Node, savedState) is then called to obtain the lock.
final boolean acquireQueued(final Node node, int arg) {
// Whether to get resources
boolean failed = true;
try {
// Interrupt status
boolean interrupted = false;
// Infinite loop
for (;;) {
// The node before the current node
final Node p = node.predecessor();
// Next is the first real data node (head is a virtual node)
// Then try to get the resource
if (p == head && tryAcquire(arg)) {
// After success, the head pointer points to the current node
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// p is not the head node, or the head node does not obtain the resource (in unfair cases, other nodes preempted)
// Determine whether the node is to be blocked. It will remain blocked until the lock is acquired
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true; }}finally {
if(failed) cancelAcquire(node); }}Copy the code
Here is the logic of AQS, also can read the relevant introduction of AQS.
- Continuously get whether the last node of this node is head, because head is a virtual node. If the last node of the current node is head, then the current node is
First data node >
;- The first data node continuously obtains resources. If the resource is successfully obtained, head points to the current node.
- The current node is not a head node, or
tryAcquire(arg)
Failure (failure may be an unfair lock). In this case, the state of the previous node needs to be determinedWhether the current node is to be blocked
(Whether the state of the previous node is SIGNAL).
It is worth noting that when a node is placed in the synchronization queue of AQS, it also fights for resources and sets the value of savedState, which represents the number of reentrant times released when the lock is released.
The overall process is drawn as follows:
signal
public final void signal(a) {
// Whether the thread is currently held
if(! isHeldExclusively())throw new IllegalMonitorStateException();
Node first = firstWaiter;
if(first ! =null)
doSignal(first);
}
private void doSignal(Node first) {
do {
// The firstWaiter header node points to the next node in the conditional queue header
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// Disconnect the original head node from the synchronization queue
first.nextWaiter = null;
} while(! transferForSignal(first) && (first = firstWaiter) ! =null);
}
final boolean transferForSignal(Node node) {
// Determine whether the node has been cancelled previously
if(! compareAndSetWaitStatus(node, Node.CONDITION,0))
return false;
// Call enq to the end of the synchronization queue
Node p = enq(node);
int ws = p.waitStatus;
// The last node of the node is changed to SIGNAL so that it can wake up later
if (ws > 0| |! compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread);return true;
}
Copy the code
Enq can also read AQS codes
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// If the tail node is empty, initialize the head node
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
// Do not assign to an empty loop
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
returnt; }}}}Copy the code
After a node is placed on the AQS synchronization queue using enQ, set the waitStatus of the preceding node to Node.signal. SignalAll’s code is similar.
conclusion
Q&A
Q: How does Condition relate to AQS?
ConditionObject (Condition) ConditionObject (Condition) ConditionObject (Condition) ConditionObject (Condition) ConditionObject (Condition) ConditionObject (Condition) ConditionObject (Condition)
Q: What is the implementation principle of Condition?
A: Condition maintains A conditional queue internally. In case of obtaining A lock, the thread calls await and is placed in the conditional queue and blocked. Until the thread is woken up by calling signal or signalAll, it will be put into the synchronization queue of AQS to compete for lock resources.
Q: What are the differences and connections between the waiting queue of Condition and the synchronous queue of AQS? A: Condition’s waiting queue is A one-way linked list, while AQS ‘is A bidirectional linked list. There is no clear link between the two. The node is moved from the wait queue to the synchronization queue only after it is woken up from the blocked state.
conclusion
This article focuses on reading the code for Condition, but omits logic such as thread interrupts. Interested partners. Can be more in-depth study of the relevant source code.