preface
Java concurrent packages below a Semaphore, already and CountDownLatch multi-threaded auxiliary class, while their inner classes are inherited an abstract class – AbstractQueuedSynchronizer, referred to as “AQS. AQS can be used to build locks or synchronizers, and an abstract class can implement all kinds of synchronizers, so the mystery is sure to be explored.
The body of the
AQS
AQS is composed of a bidirectional linked list, and uses an int type as state. Different implementation classes have different understanding methods for this state, so it depends on the implementation class to rewrite the abstract method to define the understanding method of state.
CompareAndSetState Atomic modification method
Each Node holds the thread instance and state. If the AQS implementation class is a lock, then each Node represents the thread that wants to acquire the lock.
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
Node nextWaiter;
volatile Node prev;
volatile Node next;
volatile Thread thread;
final Node predecessor(a) throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Copy the code
Prev and Next represent the precursor and successor nodes, thread represents the thread at queue time, and waitStatus and nextWaiter are of interest.
waitStatus
Represents the state of the current node, which has the following states:
- Block state (0), which is undefined, actually indicates that node is in a queue waiting to acquire a lock
- The CANCELLED state (1), defined as CANCELLED, indicates that the node/thread has been CANCELLED
- SIGNAL status (-1), defined as SIGNAL, indicates that the successor nodes of the current node need to be notified to run
- CONDITION state (-2), defined as CONDITION, indicates that the current node is in the CONDITION queue
- Propagation state (-3), defined as PROPAGATE, means that the subsequent acquireShared can be executed in the current scene
According to the above definition, greater than 0 can be represented as a cancellation and less than 0 as a special event
nextWaiter
NextWaiter forms a single linked list, which can be expressed as a CONDITION queue; It can also be the special value SHARED, indicating that the current node is in a SHARED mode. If empty, EXCLUSIVE is EXCLUSIVE mode.
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
private static final boolean compareAndSetNext(Node node, Node expect, Node update) {
return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}
Copy the code
Changes to linked list nodes also require atomicity. Here, AQS uses unsafe to calculate the offset of Next, and compareAndSwapObject to change the next value of the linked list
API
AQS provides apis for getting and releasing state, consisting of the following methods
methods | describe |
---|---|
acquire(int) | Get status in exclusive mode |
acquireInterruptibly(int) | Gets state in exclusive mode, throws an exception if the current thread is interrupted |
acquireShared(int) | Obtain the status in shared mode |
acquireSharedInterruptibly(int) | Gets state in shared mode, throws an exception if the current thread is interrupted |
release(int) | Release status |
acquireShared(int) | Release status in shared mode |
Acquire and release are equivalent to acquiring and releasing locks. AQS doesn’t care how we acquire locks, but hands them off to subclasses:
- TryAcquire /Share: Check whether it can be obtained in the current state. It can be divided into Share and exclusive modes, which are implemented by subclasses respectively.
- TryRelease /Share: indicates whether the current state allows release
- IsHeldExclusively: Indicates whether the state is occupied in exclusive mode
Through the above API can form a synchronizer, the function of the synchronizer is implemented by the subclass. If locks need to be implemented, then Lcok and UnLcok correspond to acquire and release. We only need to implement tryAcquire and tryRelease to determine whether locks can be acquired and the release logic.
Let’s look at the implementation:
- Acquire and release
public final void acquire(int arg) {
if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code
AcquireQueued is a spin state operation. If the condition is not met and the spin fails, the thread must be interrupted to avoid consuming I0.
// Add a Waiter to the current queue
private Node addWaiter(Node mode) {
// Save thread information
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if(pred ! =null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
// queue operation
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
returnt; }}}}Copy the code
When called acquireQueued, addWaiter is called to add the current thread to the blocking queue as a node, enQ is called to add the new node to the queue as tail, head is an empty node. Using the atomic method, if multiple threads join the queue at the same time only one thread can join the queue at the same time, and the remaining threads continue to join the queue.
++ Start spin lock after joining the team ++
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true; }}finally {
if(failed) cancelAcquire(node); }}Copy the code
Failed and interrupted are used to indicate success and whether or not interrupted. Each loop determines whether the prev node is head and whether it has a condition to fetch. When getting into shouldParkAfterFailedAcquire and parkAndCheckInterrupt after failure
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
Copy the code
Returns a Boolean indicating whether the thread should be blocked
- If its prev is signal state, it is safe to block, indicating that the previous thread has set the state and will emit a signal when released.
- If its PREv nodes are cancelled, they need to be removed from the queue and reconnected.
- If its prev node state is 0, it is given a signal state
private final boolean parkAndCheckInterrupt(a) {
LockSupport.park(this);
return Thread.interrupted();
}
Copy the code
The unsafe park method is used here, which can be understood as wait and notify for threads.
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
Node pred = node.prev;
// Disconnect the canceled node
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED;
// Remove itself if it is the tail node
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if(pred ! = head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <=0&& compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread ! =null) {
Node next = node.next;
if(next ! =null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC}}Copy the code
When the spin fails, you need to delete this node, and there are some subtleties to deleting this node. Because if there are other nodes below, the next node needs the previous node to be in SIGNAL state to notify it, disconnecting itself in the process and setting the state of the previous node to SIGNAL. If we find this node and the point is not head, and the next node is valid, then we can connect the two nodes. If not, we need to wake up the next node and let it spin.
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// If the next node is empty or cancelled, continue to find one that matches the criteria
if (s == null || s.waitStatus > 0) {
s = null;
for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
s = t;
}
if(s ! =null)
LockSupport.unpark(s.thread);
}
Copy the code
The unparkantecedent is chiefly to notify the unpark of the next node, or the search for it from behind if the next is an invalid node. Because it’s a FIFO queue, you start at the end.
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if(h ! =null&& h.waitStatus ! =0)
unparkSuccessor(h);
return true;
}
return false;
}
Copy the code
The release is easy, plus the tryRelease of the subclass’s implementation, will wake up the next node through the unparkprecursor.
- AcquireShared and releaseShared
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
Copy the code
Shared mode only enters spin when acquireShared<0, because shared mode is a state shared by multiple threads and blocks when this state is not reached.
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }}Copy the code
DoAcquireShared and acquireQueued are the same to some extent. The differences are:
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return; }}Copy the code
The main entry is sethead propagate
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null|| s.isShared()) doReleaseShared(); }}Copy the code
If the next node is also shared, doReleaseShared is called
for (;;) {
Node h = head;
if(h ! =null&& h ! = tail) {int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if(! compareAndSetWaitStatus(h, Node.SIGNAL,0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
Copy the code
This method requires that the state of the current head be set to PROPAGATE, notify subsequent nodes if the current state is SIGNAL, and also spin when the head is changed by another thread.
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
Copy the code
ReleaseShared is very simple, and doReleaseShared is called when tryReleaseShared is successfully triggered.
conclusion
Through the analysis of acquire and release, we can know that our customized logic is carried out on the Try start method, while AQS is processed differently according to the different exclusive and shared modes, but in essence, they are all operations on FIFO queues. The advantage of using queues to manage threads is that they can be kept in order and can be woken up for one thread rather than notifyAll.
ConditionObject
One entity class within AQS is ConditionObject, which is an extension of AQS. When using locks written by AQS, multiple condition variables can be generated, and condition variables can block and wake up threads.
ConditionObject internal have like AQS queue queue called CONDITION, use CONDITION to CONDITION of representation:
private transient Node firstWaiter;
private transient Node lastWaiter;
Copy the code
ConditionObject has three methods: await, signal and signalAll wait, notify and notifyAll.
- await
public final void await(a) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while(! isOnSyncQueue(node)) { LockSupport.park(this);
if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
break;
}
if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! =null) // clean up if cancelled
unlinkCancelledWaiters();
if(interruptMode ! =0)
reportInterruptAfterWait(interruptMode);
}
Copy the code
First call addConditionWaiter to create a new node to add to the conditional queue
private Node addConditionWaiter(a) {
Node t = lastWaiter;
if(t ! =null&& t.waitStatus ! = Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node =new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
private void unlinkCancelledWaiters(a) {
Node t = firstWaiter;
Node trail = null;
while(t ! =null) {
Node next = t.nextWaiter;
if(t.waitStatus ! = Node.CONDITION) { t.nextWaiter =null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
elsetrail = t; t = next; }}Copy the code
First, remove the non-condition node from the CONDITION queue, and then add a new CONDITION node to the end of the queue
int savedState = fullyRelease(node);
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw newIllegalMonitorStateException(); }}finally {
if(failed) node.waitStatus = Node.CANCELLED; }}Copy the code
Save the current state to release the current state. If the current thread does not acquire state through Acquire, an exception is thrown.
The lock is released because the current condition is not met, allowing another thread to acquire the lock execution logic
while(! isOnSyncQueue(node)) { LockSupport.park(this);
if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
break;
}
Copy the code
When the lock is released, the thread is added to the conditional queue, isOnSyncQueue is to determine whether it is in the synchronous blocking queue, that is, the AQS queue; When another thread executes signal, the conditional node is placed on the synchronous blocking queue. If the node is not on the synchronous blocking queue, signal has not been called.
Instead of looking at the following code, take a look at sign ++
public final void signal(a) {
if(! isHeldExclusively())throw new IllegalMonitorStateException();
Node first = firstWaiter;
if(first ! =null)
doSignal(first);
}
Copy the code
Condition variables are exclusive, so subclass isHeldExclusively or throw an exception. The real logic is DoSignal
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while(! transferForSignal(first) && (first = firstWaiter) ! =null);
}
Copy the code
The inside of the while is to migrate the first node of the conditional queue to the synchronous queue and, if unsuccessful, to the next node
final boolean transferForSignal(Node node) {
// Cas failure indicates that the other thread has completed the transfer and returns to continue the transfer to the next node
if(! compareAndSetWaitStatus(node, Node.CONDITION,0))
return false;
// Add the node to the synchronous blocking queue
Node p = enq(node);
int ws = p.waitStatus;
// Unpark wakes the thread true if the node is canceled. Let the thread spin
if (ws > 0| |! compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread);return true;
}
Copy the code
At this point, the node has been placed in the synchronization blocking queue, and the thread in the synchronization queue may be woken up by another thread. Take a look at this code:
while(! isOnSyncQueue(node)) { LockSupport.park(this);
if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
break;
}
Copy the code
After the thread to be awakened, while condition is not established, but also to carry out the method checkInterruptWhileWaiting, check whether be interrupted
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
Copy the code
If broken, to determine whether to REINTERRUPT (exit await the reset interrupt) or THROW_IE (await exit when an exception is thrown), specific see transferAfterCancelledWait logic, without interruption is 0.
final boolean transferAfterCancelledWait(Node node) {
// If the wake up is caused by a synchronous queue, node status is no longer CONDITION, indicating that the interrupt occurred before signal
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
Waitstatus is not CONDITION, which is an interrupt that occurs after the signal has already occurred
// The signal method moves the node to the blocking queue, but it may not be done yet, and spins to wait for it to complete
while(! isOnSyncQueue(node)) Thread.yield();return false;
}
Copy the code
Because this thread is not necessarily woken up in a synchronous queue, it may also be woken up by a false wake, node cancellation, or failure to modify SIGNAL.
Now assume that signal succeeded and the node was successfully added to the synchronous blocking queue ++
if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! =null)
unlinkCancelledWaiters();
if(interruptMode ! =0)
reportInterruptAfterWait(interruptMode);
Copy the code
Passing previously saved state as an argument to acquireQueued is the same as calling Acquire directly
public final void acquire(int arg) {
if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code
Acquire has a judgment of selfInterrupt, and await does the same
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
Copy the code
conclusion
Both AQS and ConditionObject use queues to save their status, which can be a bit convoluted. The TryAcquire method is used to define whether the state can be obtained. If the state is not obtained, the spin blocks. When a thread releases the state, the next node will be awakened, thus realizing the synchronization of multiple nodes.