AbstractQueuedSynchronizer is implemented in the JDK synchronization tool of a very important class.
- The above is its inheritance relationship
Semaphore release()
// Semaphore.release(int permits) method
public void release(int permits) {
if (permits < 0)
throw new IllegalArgumentException();
sync.releaseShared(permits);
}
/ / Sync. ReleaseShared (int arg) method, including the Sync extends AbstractQueuedSynchronizer
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
signalNext(head);
return true;
}
return false;
}
/ / AbstractQueuedSynchronizer tryReleasedShared (int arg) method is implemented in the Semaphore
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// Atomically update resources
if (compareAndSetState(current, next))
return true; }}. / / AbstractQueuedSynchronizer signalNext (head)
private static void signalNext(Node h) {
Node s;
// the h node should be the head node, and the S node is the first queue element after the head node, which is the node we should wake up, so s = h.next
if(h ! =null&& (s = h.next) ! =null&& s.status ! =0) {
s.getAndUnsetStatus(WAITING);
// The actual wake up operation is achieved by waking up the thread of the node binding. Corresponding to the thread that failed to get the resource while trying to sleepLockSupport.unpark(s.waiter); }}Copy the code
Before we specific depth, first to know about the queue AbstractQueuedSynchronizer structure:
This is a very typical queue, with each insertion adding from the tail and maintaining a head for control, and the tail being the last node.
Once you understand this, it’s easy to understand the key acquire() method.
final int acquire(Node node, int arg, boolean shared, boolean interruptible, boolean timed, long time) {
Thread current = Thread.currentThread();
byte spins = 0, postSpins = 0; // retries upon unpark of first thread
// If the current node is inserted, it will not be the first node, so first=false
boolean interrupted = false, first = false;
Node pred = null; // predecessor of node when enqueued
/* * Repeatedly: * Check if node now first * if so, ensure head stable, else ensure valid predecessor * if node is first or not yet enqueued, try acquiring * else if node not yet created, create it * else if not yet enqueued, try once to enqueue * else if woken from park, retry (up to postSpins times) * else if WAITING status not set, set and retry * else park and clear WAITING status, and check cancellation */
for (;;) {
// Since node=null for the first loop, the first loop does not go here
if(! first && (pred = (node ==null)?null: node.prev) ! =null && !(first = (head == pred))) {
if (pred.status < 0) {
cleanQueue(); // predecessor cancelled
continue;
} else if (pred.prev == null) {
Thread.onSpinWait(); // ensure serialization
continue; }}// Each time the loop looks to see if the current node is the first node, and if so it tries to apply for resources
if (first || pred == null) {
boolean acquired;
try {
if (shared)
// Try again to apply for resources, in fact, to compare the available resources are enough, especially simple implementation
acquired = (tryAcquireShared(arg) >= 0);
else
/ / same as above
acquired = tryAcquire(arg);
} catch (Throwable ex) {
cancelAcquire(node, interrupted, false);
throw ex;
}
// If the resource can run
if (acquired) {
// If the current node is still the first node, kick it out of the queue and let it run
if (first) {
node.prev = null;
head = node;
pred.next = null;
node.waiter = null;
if (shared)
signalNextIfShared(node);
if (interrupted)
current.interrupt();
}
// At this point, the first node exits the queue and is re-scheduled, and enters the Runnable state
// The next node becomes the new first node because the pred of the next node is null
return 1; }}// If node == null, a node is being added
if (node == null) { // allocate; retry before enqueue
// Create nodes by type
if (shared)
// The node is created and the next cycle begins
node = new SharedNode();
else
// The node is created and the next cycle begins
node = new ExclusiveNode();
// Check whether the precursor node is empty. If it is empty, this node is not linked to the previous node
// The next step is to link to the previous node
} else if (pred == null) { // try to enqueue
node.waiter = current;
Node t = tail;
node.setPrevRelaxed(t); // avoid unnecessary fence
// see if the queue is initialized at the beginning and end
if (t == null)
tryInitializeHead();
CasTail (Node, Node) returns true on success
else if(! casTail(t, node))// If tail is updated by another thread, unset the precursor node
node.setPrevRelaxed(null); // back out
// Join the queue by associating the preceding nodes with nodes
else
t.next = node;
} else if(first && spins ! =0) {
--spins; // reduce unfairness on rewaits
Thread.onSpinWait();
// The new node status defaults to 0, so set its state to WAITING
} else if (node.status == 0) {
node.status = WAITING; // enable signal and recheck
} else {
long nanos;
spins = postSpins = (byte)((postSpins << 1) | 1);
// Because we set times=false, we park the current thread, which is here, corresponding to unpark() of release().
if(! timed)// In this case, the thread is blocked because of insufficient resources, and when woken up, it continues from this position, and then loops again to see if the number of resources is sufficient
LockSupport.park(this);
else if ((nanos = time - System.nanoTime()) > 0L)
LockSupport.parkNanos(this, nanos);
else
break;
node.clearStatus();
if ((interrupted |= Thread.interrupted()) && interruptible)
break; }}return cancelAcquire(node, interrupted, interruptible);
}
Copy the code
The end!
Other synchronization classes call acquire() almost the same, and understanding AQS is a great help in understanding JDK synchronization tools.