Based on the analysis of Java and contract awarding Java. Util. Concurrent source, without need to know about AbstractQueuedSynchronizer (hereinafter abbreviated AQS) this abstract class, because it is the foundation of Java and contracting out the tools, Is the basis for implementing classes such as ReentrantLock, CountDownLatch, Semaphore, FutureTask, etc.
After watching many articles and videos, I didn’t have a deep understanding of them. Every time I recalled them, I forgot them and needed to learn them again, which was a waste of time. Therefore, I decided to write some articles to improve my memory and understanding.
I am learning ReentrantLock source code to start, understand how the SOURCE code of AQS work, I hope to give you some help.
Note:
Source code environment JDK1.7 (1.8 no changes), see do not understand or confused parts, it is best to open the source code to see. You can also watch the video, and then understand the source code step by step.
1. The framework
It maintains a volatile int state (representing shared resources) and a FIFO thread wait queue (which is entered when multithreaded contention for resources is blocked). Volatile is the key keyword here. The semantics of volatile are not covered here. There are three ways to access state:
- getState()
- setState()
- compareAndSetState()
AQS defines two resource sharing modes: Exclusive (which can be executed by only one thread, such as ReentrantLock) and Share (which can be executed by multiple threads at the same time, such as Semaphore/CountDownLatch).
Different custom synchronizers compete for shared resources in different ways. The implementation of custom synchronizer only needs to realize the acquisition and release of shared resource state. As for the maintenance of specific thread waiting queue (such as failure to acquire resources in queue/wake up queue, etc.), AQS has been implemented at the top level. The implementation of a custom synchronizer is mainly implemented in the following ways:
- IsHeldExclusively () : Whether the thread is monopolizing resources. You only need to implement it if you use condition.
- TryAcquire (int) : Exclusive mode. Attempts to obtain the resource return true on success and false on failure.
- TryRelease (int) : Exclusive mode. Attempts to free resources return true on success and false on failure.
- TryAcquireShared (int) : Share mode. Attempt to obtain resources. Negative numbers indicate failure; 0 indicates success, but no available resources are available. A positive number indicates success and free resources.
- TryReleaseShared (int) : share mode. Attempts to free the resource, returning true if subsequent wait nodes are allowed to wake up after release, false otherwise.
In the case of ReentrantLock, state is initialized to 0, indicating that the state is not locked. When thread A locks (), tryAcquire() is called to monopolize the lock and state+1. After that, another thread will fail to tryAcquire() until the unlock() of thread A reaches state=0. Of course, thread A can repeatedly acquire the lock itself before releasing it (state will accumulate), which is the concept of reentrant. But be careful how many times you get it and how many times you release it, so that state can go back to zero.
In the CountDownLatch example, the task is divided into N sub-threads to execute, and state is initialized to N (note that N must be consistent with the number of threads). The N child threads are executed in parallel, countDown() once for each child thread, and state is reduced by 1. After all child threads have finished executing (i.e., state=0), unpark() the calling thread, and then the calling thread returns from the await() function to continue the residual action.
In general, custom synchronizers are either exclusive or shared methods, and they only need to implement either Tryacquire-TryRelease or tryAcquireShared. However, AQS also supports both exclusive and shared custom synchronizers, such as ReentrantReadWriteLock.
2. The structure of AQS
Let’s take a look at the attributes of AQS:
Private TRANSIENT volatile Node head; private transient volatile Node head; Private transient volatile Node tail; private transient volatile Node tail; // This value can be greater than 1 because locks can be reentrant, and 1 is added each time they are reentrant. // represents the thread currently holding the exclusive lock. For the most important use example, since the lock can be reentrantLock.lock() can be nested multiple times, So every time use this to judge whether the current thread already has the lock / / if (currentThread = = getExclusiveOwnerThread ()) {state++} / / since the AbstractOwnableSynchronizer inheritance private transient Thread exclusiveOwnerThread;Copy the code
AbstractQueuedSynchronizer waiting queue beckoned as shown below, note, after analysis in the process of the queue, which is blocking queue does not contain the head
Each thread in the wait queue is wrapped as a Node instance and the data structure is a linked list
Static final class Node {// Static final Node SHARED = new Node(); Static final Node EXCLUSIVE = null; Static final int CANCELLED = 1; static final int CANCELLED = 1; Static final int SIGNAL = -1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; Volatile int waitStatus; volatile int waitStatus; volatile int waitStatus; volatile int waitStatus; // References to the precursor Node volatile Node prev; // References to subsequent nodes volatile Node next; // Volatile Thread Thread; }Copy the code
The data structure of Node is thread + waitStatus + Pre + Next
Let’s take a look at how ReentrantLock is used:
public class OrderService { private static ReentrantLock reentrantLock = new ReentrantLock(true); Public void createOrder() {// ReentrantLock. lock(); // Usually, the lock is followed by the try statement try {// only one thread can come in at a time (the thread that acquired the lock), // other threads block on the lock() method, waiting for the lock to come in, and then execute the code... // Execute code... // Execute code... } finally {// unlock reentrantlock. unlock(); }}} //ReentrantLock internally uses the internal Sync class to manage locks, so the actual lock acquisition and lock release are controlled by the Sync implementation class. The abstract static class Sync extends AbstractQueuedSynchronizer {} / / Sync has two implementations, Public ReentrantLock(Boolean fair) {sync = fair? new FairSync() : new NonfairSync(); }Copy the code
Thread locked
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
// 争锁
final void lock() {
acquire(1);
}
// 我们看到,这个方法,如果tryAcquire(arg) 返回true, 也就结束了。
// 否则,acquireQueued方法会将线程压到队列中
public final void acquire(int arg) { // 此时 arg == 1
// 首先调用tryAcquire(1)一下
// 因为有可能直接就成功了呢,也就不需要进队列排队了,
// 对于公平锁的语义就是:本来就没人持有锁,根本没必要进队列等待(又是挂起,又是等待被唤醒的)
if (!tryAcquire(arg) &&
// tryAcquire(arg)没有成功,这个时候需要把当前线程挂起,放到阻塞队列中。
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
selfInterrupt();
}
}
// 尝试直接获取锁,返回值是boolean,代表是否获取到锁
// 返回true:1.没有线程在等待锁;2.重入锁,线程本来就持有锁,也就可以理所当然可以直接获取
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// state == 0 此时此刻没有线程持有锁
if (c == 0) {
// 虽然此时此刻锁是可以用的,但是这是公平锁,既然是公平,就得讲究先来后到,
// 看看有没有别人在队列中等了半天了
if (!hasQueuedPredecessors() &&
// 如果没有线程在等待,那就用CAS尝试一下,成功了就获取到锁了,
// 不成功的话,只能说明一个问题,就在刚刚几乎同一时刻有个线程抢先了 =_=
// 因为刚刚还没人的,我判断过了
compareAndSetState(0, acquires)) {
// 到这里就是获取到锁了,标记一下,告诉大家,现在是我占用了锁
setExclusiveOwnerThread(current);
return true;
}
}
// 会进入这个else if分支,说明是重入了,需要操作:state=state+1
// 这里不存在并发问题
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 如果到这里,说明前面的if和else if都没有返回true,说明没有获取到锁
// 回到上面一个外层调用方法继续看:
// if (!tryAcquire(arg)
// && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// selfInterrupt();
return false;
}
// 假设tryAcquire(arg) 返回false,那么代码将执行:
// acquireQueued(addWaiter(Node.EXCLUSIVE), arg),
// 这个方法,首先需要执行:addWaiter(Node.EXCLUSIVE)
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
// 此方法的作用是把线程包装成node,同时进入到队列中
// 参数mode此时是Node.EXCLUSIVE,代表独占模式
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 以下几行代码想把当前node加到链表的最后面去,也就是进到阻塞队列的最后
Node pred = tail;
// tail!=null => 队列不为空(tail==head的时候,其实队列是空的,不过不管这个吧)
if (pred != null) {
// 将当前的队尾节点,设置为自己的前驱
node.prev = pred;
// 用CAS把自己设置为队尾, 如果成功后,tail == node 了,这个节点成为阻塞队列新的尾巴
if (compareAndSetTail(pred, node)) {
// 进到这里说明设置成功,当前node==tail, 将自己与之前的队尾相连,
// 上面已经有 node.prev = pred,加上下面这句,也就实现了和之前的尾节点双向连接了
pred.next = node;
// 线程入队了,可以返回了
return node;
}
}
// 仔细看看上面的代码,如果会到这里,
// 说明 pred==null(队列是空的) 或者 CAS失败(有线程在竞争入队)
// 读者一定要跟上思路,如果没有跟上,建议先不要往下读了,往回仔细看,否则会浪费时间的
enq(node);
return node;
}
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
// 采用自旋的方式入队
// 之前说过,到这个方法只有两种可能:等待队列为空,或者有线程竞争入队,
// 自旋在这边的语义是:CAS设置tail过程中,竞争一次竞争不到,我就多次竞争,总会排到的
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 之前说过,队列为空也会进来这里
if (t == null) { // Must initialize
// 初始化head节点
// 细心的读者会知道原来 head 和 tail 初始化的时候都是 null 的
// 还是一步CAS,你懂的,现在可能是很多线程同时进来呢
if (compareAndSetHead(new Node()))
// 给后面用:这个时候head节点的waitStatus==0, 看new Node()构造方法就知道了
// 这个时候有了head,但是tail还是null,设置一下,
// 把tail指向head,放心,马上就有线程要来了,到时候tail就要被抢了
// 注意:这里只是设置了tail=head,这里可没return哦,没有return,没有return
// 所以,设置完了以后,继续for循环,下次就到下面的else分支了
tail = head;
} else {
// 下面几行,和上一个方法 addWaiter 是一样的,
// 只是这个套在无限循环里,反正就是将当前线程排到队尾,有线程竞争的话排不上重复排
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
// 现在,又回到这段代码了
// if (!tryAcquire(arg)
// && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// selfInterrupt();
// 下面这个方法,参数node,经过addWaiter(Node.EXCLUSIVE),此时已经进入阻塞队列
// 注意一下:如果acquireQueued(addWaiter(Node.EXCLUSIVE), arg))返回true的话,
// 意味着上面这段代码将进入selfInterrupt(),所以正常情况下,下面应该返回false
// 这个方法非常重要,应该说真正的线程挂起,然后被唤醒后去获取锁,都在这个方法里了
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// p == head 说明当前节点虽然进到了阻塞队列,但是是阻塞队列的第一个,因为它的前驱是head
// 注意,阻塞队列不包含head节点,head一般指的是占有锁的线程,head后面的才称为阻塞队列
// 所以当前节点可以去试抢一下锁
// 这里我们说一下,为什么可以去试试:
// 首先,它是队头,这个是第一个条件,其次,当前的head有可能是刚刚初始化的node,
// enq(node) 方法里面有提到,head是延时初始化的,而且new Node()的时候没有设置任何线程
// 也就是说,当前的head不属于任何一个线程,所以作为队头,可以去试一试,
// tryAcquire已经分析过了, 忘记了请往前看一下,就是简单用CAS试操作一下state
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 到这里,说明上面的if分支没有成功,要么当前node本来就不是队头,
// 要么就是tryAcquire(arg)没有抢赢别人,继续往下看
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 什么时候 failed 会为 true???
// tryAcquire() 方法抛异常的情况
if (failed)
cancelAcquire(node);
}
}
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
// 刚刚说过,会到这里就是没有抢到锁呗,这个方法说的是:"当前线程没有抢到锁,是否需要挂起当前线程?"
// 第一个参数是前驱节点,第二个参数才是代表当前线程的节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 前驱节点的 waitStatus == -1 ,说明前驱节点状态正常,当前线程需要挂起,直接可以返回true
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
// 前驱节点 waitStatus大于0 ,之前说过,大于0 说明前驱节点取消了排队。
// 这里需要知道这点:进入阻塞队列排队的线程会被挂起,而唤醒的操作是由前驱节点完成的。
// 所以下面这块代码说的是将当前节点的prev指向waitStatus<=0的节点,
// 简单说,就是为了找个好爹,因为你还得依赖它来唤醒呢,如果前驱节点取消了排队,
// 找前驱节点的前驱节点做爹,往前遍历总能找到一个好爹的
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 仔细想想,如果进入到这个分支意味着什么
// 前驱节点的waitStatus不等于-1和1,那也就是只可能是0,-2,-3
// 在我们前面的源码中,都没有看到有设置waitStatus的,所以每个新的node入队时,waitStatu都是0
// 正常情况下,前驱节点是之前的 tail,那么它的 waitStatus 应该是 0
// 用CAS将前驱节点的waitStatus设置为Node.SIGNAL(也就是-1)
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
// 这个方法返回 false,那么会再走一次 for 循序,
// 然后再次进来此方法,此时会从第一个分支返回 true
return false;
}
// private static boolean shouldParkAfterFailedAcquire(Node pred, Node node)
// 这个方法结束根据返回值我们简单分析下:
// 如果返回true, 说明前驱节点的waitStatus==-1,是正常情况,那么当前线程需要被挂起,等待以后被唤醒
// 我们也说过,以后是被前驱节点唤醒,就等着前驱节点拿到锁,然后释放锁的时候叫你好了
// 如果返回false, 说明当前不需要被挂起,为什么呢?往后看
// 跳回到前面是这个方法
// if (shouldParkAfterFailedAcquire(p, node) &&
// parkAndCheckInterrupt())
// interrupted = true;
// 1. 如果shouldParkAfterFailedAcquire(p, node)返回true,
// 那么需要执行parkAndCheckInterrupt():
// 这个方法很简单,因为前面返回true,所以需要挂起线程,这个方法就是负责挂起线程的
// 这里用了LockSupport.park(this)来挂起线程,然后就停在这里了,等待被唤醒=======
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
// 2. 接下来说说如果shouldParkAfterFailedAcquire(p, node)返回false的情况
// 仔细看shouldParkAfterFailedAcquire(p, node),我们可以发现,其实第一次进来的时候,一般都不会返回true的,原因很简单,前驱节点的waitStatus=-1是依赖于后继节点设置的。也就是说,我都还没给前驱设置-1呢,怎么可能是true呢,但是要看到,这个方法是套在循环里的,所以第二次进来的时候状态就是-1了。
// 解释下为什么shouldParkAfterFailedAcquire(p, node)返回false的时候不直接挂起线程:
// => 是为了应对在经过这个方法后,node已经是head的直接后继节点了。剩下的读者自己想想吧。
}
Copy the code
The process is summarized as follows: 1. Call tryAcquire() of the custom synchronizer to try to obtain resources directly, and return directly if it succeeds; 2. If the thread fails, addWaiter() adds the thread to the end of the wait queue and marks it as in exclusive mode. AcquireQueued () causes the thread to rest in the queue and attempt to acquire the resource when it has an opportunity (unpark() in its turn). Return after obtaining the resource. Returns true if it was interrupted during the entire wait, false otherwise. 4. If a thread has been interrupted while waiting, it does not respond. SelfInterrupt () is performed only after the resource has been retrieved, reclaiming the interrupt.
Unlock operation
Finally, we need to introduce the action of awakening. We know that normally, if the thread does not acquire the lock, the thread will be locksupport-park (this); Suspend to stop and wait to be awakened.
Public void unlock() {sync.release(1); public void unlock() {sync.release(1); } public final Boolean release(int arg) {if (tryRelease(arg)) {Node h = head; if (h ! = null && h.waitStatus ! = 0) unparkSuccessor(h); return true; } return false; } // Go back to ReentrantLock and look at the tryRelease method protected Final Boolean tryRelease(int releases) {int c = getState() -releases; if (Thread.currentThread() ! = getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); Boolean free = false; If (c ==0) {free = true; if (c ==0) {free = true; setExclusiveOwnerThread(null); } setState(c); return free; } /** * Wakes up node's antecedents, if one exists. ** @param node the node */ / Know from the preceding call, Private void unparksucceeded (node node) {/* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; If (ws <0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled nodes. */ / S = node.next; s= node.next; s= node.next; if (s == null || s.waitStatus > 0) { s = null; For (Node t = tail; waitStatus==1; waitStatus==1; t ! = null && t ! = node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s ! = null) // Wake up thread locksupport. unpark(s.read); } // After waking up the thread, the awakened thread will proceed from the following code: private final Boolean parkAndCheckInterrupt() {locksupport.park (this); Return thread.interrupted (); AcquireQueued (final Node Node, int arg) acquireQueued(final Node Node, int argCopy the code
In a concurrent environment, locking and unlocking require the coordination of the following three components:
-
The lock state. We need to know if the lock is being held by another thread, and that’s what state is for. When it’s 0, it means that no thread is holding the lock, so we can try to grab the lock. You add +1 to state, you subtract 1 to unlock, until state goes to 0 again, so lock() and unlock() have to pair. It then wakes up the first thread in the wait queue to claim the lock.
-
Blocking and unblocking of threads. AQS uses locksupport. park(Thread) to suspend threads and unpark to wake them up.
-
Block the queue. A queue is needed to manage these threads. AQS uses a FIFO queue, which is a linked list. Each node holds a reference to its successor node. AQS uses a variant of CLH locks to achieve this
Fair locks and unfair locks
ReentrantLock defaults to unfair locking unless you pass true in the constructor.
Public ReentrantLock() {sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }Copy the code
Lock method for fair locking:
static final class FairSync extends Sync { final void lock() { acquire(1); } // AbstractQueuedSynchronizer.acquire(int arg) public final void acquire(int arg) { if (! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); If (c == 0) {// 1. If (c == 0) {// 1. hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }}Copy the code
The lock method for an unfair lock:
static final class NonfairSync extends Sync { final void lock() { // 2. If (compareAndSetState(0, 1)) setExclusiveOwnerThread(thread.currentThread ())); else acquire(1); } // AbstractQueuedSynchronizer.acquire(int arg) public final void acquire(int arg) { if (! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } /** * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); If (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }Copy the code
Summary: There are only two differences between fair and unfair locks:
-
When an unfair lock is called, CAS will be called to grab the lock. If the lock is not occupied at this time, CAS will return the lock.
-
After a CAS failure, an unfair lock will enter the tryAcquire method just like a fair lock. In the tryAcquire method, if the lock is released (state == 0), the unjust lock will be directly seized by CAS. However, fair lock will determine whether there is a thread waiting in the queue, if there is, it will not grab the lock, obediently queue to the back.
There are only two differences between a fair lock and an unfair lock. If both CAS are unsuccessful, then the unfair lock is the same as the fair lock.
Relatively speaking, unfair locks have better performance because of their higher throughput. Of course, unfair locking makes the timing of lock acquisition more uncertain and can lead to chronically hungry threads in the blocking queue.
Condition
Condition can often be used in producer-consumer scenarios
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; class BoundedBuffer { final Lock lock = new ReentrantLock(); // Condition depends on lock to produce final condition notFull = lock.newcondition (); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[100]; int putptr, takeptr, count; Public void put(Object x) throws InterruptedException {lock.lock(); try { while (count == items.length) notFull.await(); // Queue is full, wait until not full to continue production items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal(); } finally {lock.unlock(); Throw InterruptedException {lock.lock(); try { while (count == 0) notEmpty.await(); Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notFull.signal(); Return x; return x; } finally { lock.unlock(); }}}Copy the code
Condition are implemented based on already, and already is dependent on AbstractQueuedSynchronizer.
Ondition relies on a ReentrantLock. Either call await to wait or signal to wake up, the lock must be acquired.
Condition of the implementation class AbstractQueuedSynchronizer ConditionObject in class
public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; Private transient Node firstWaiter; private transient Node firstWaiter; Private TRANSIENT Node lastWaiter; .Copy the code
Condition Queue
NextWaiter is used to implement a one-way linked list of conditional queues
-
Both conditional and blocking queue nodes are instances of Node, because the conditional queue nodes need to be moved to the blocking queue.
-
We know that an instance of ReentrantLock can generate multiple Condition instances by calling newCondition() multiple times, which corresponds to condition1 and condition2. Notice that ConditionObject has only two attributes: firstWaiter and lastWaiter;
-
Each condition has an associated condition queue. If thread 1 calls condition1.await(), it will wrap the current thread 1 as Node and join the condition queue. The condition queue is a one-way linked list.
-
Condition1.signal () triggers a wake up call and moves the “firstWaiter” in condition1 to the end of the blocking queue to await the lock and await the return of the await method.
Wait:
AwaitUninterruptibly () this method blocks until signal(signal() and signalAll() are called, Public final void await() throws InterruptedException { If (thread.interrupted ()) throw new InterruptedException(); // add to the condition queue Node Node = addConditionWaiter(); // await() before the current thread must hold the lock, we must release int savedState = fullyRelease(node); int interruptMode = 0; / / here to exit the loop there are two kinds of circumstances, after careful analysis / / 1. IsOnSyncQueue (node) returns true, the current node has been transferred to the blocking queue / / 2. CheckInterruptWhileWaiting (node)! = 0 to break, then exit the loop, representing the thread interrupt while (! isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) ! = 0) break; If (acquireQueued(node, savedState) && interruptMode! = THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter ! = null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode ! = 0) reportInterruptAfterWait(interruptMode); }Copy the code
1. Add the node to the conditional queue
AddConditionWaiter () adds the current node to the conditional queue, which is thread-safe.
Private Node addConditionWaiter() {Node t = lastWaiter; // If waitStatus is not equal to Node.CONDITION, the CONDITION is unqueued. if (t ! = null && t.waitStatus ! UnlinkCancelledWaiters () = node.condition) {// This method works through the CONDITION queue and clears the cancelled nodes from the queue; t = lastWaiter; CONDITION node node = new node (thread.currentThread (), node.condition); If (t == null) firstWaiter = node; if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }Copy the code
In the addWaiter method, there is an unlinkCancelledWaiters() method, which is used to clear nodes from the queue that have been unwaited for.
This method is called once if a cancel operation occurs while await (this will be said later), or if the last node is found to be cancelled while the node is being queued.
// Wait queue is a one-way linked list, traversing the list to remove the node that has canceled the wait. Private void unlinkCancelledWaiters() {Node t = firstWaiter; Node trail = null; while (t ! = null) { Node next = t.nextWaiter; // If (t.waitStatus!) the Node is cancelled if (t.waitStatus! = Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; }}Copy the code
2. Release the exclusive lock completely
After the node is queued, int savedState = fullyRelease(node) is called; Fully release: ReentrantLock fully release: ReentrantLock fully release: ReentrantLock fully release: ReentrantLock fully release
// For the simplest operation: lock. Lock (), condition1.await(). // savedState == n if the lock is reentered n times // If this method fails, the node is set to "cancel", And throw an exception IllegalMonitorStateException final int fullyRelease (Node Node) {Boolean failed = true; try { int savedState = getState(); If (release(savedState)) {failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; }} Consider that if a thread calls condition1.await() without holding a lock, it can enter the condition queue, but in this case, because it does not hold a lock, Release (savedState) must return false, go to the exception branch, and then go to the finally block and set Node.waitStatus = Node.cancelled so that the queued node will be "CANCELLED" by its successor.Copy the code
3. Wait to enter the blocking queue
So once you release the lock, then you have this, and then you spin, and if you find that you’re not in the blocking queue, then you hang up and wait to be transferred to the blocking queue.
int interruptMode = 0; // If not in the blocking queue, notice that the blocking queue is while (! IsOnSyncQueue (node)) {// Thread suspens locksupport. park(this); / / here you can without looking at the first, and wait to see what time it is unpark if ((interruptMode = checkInterruptWhileWaiting (node))! = 0) break; } isOnSyncQueue(Node Node) CONDITION = node. CONDITION; // Signal is queued from a CONDITION to a blocking queue. Final Boolean isOnSyncQueue(node node) {// The node waitStatus is set to 0, // If waitStatus is still node. CONDITION (-2), it must be in the CONDITION queue. // If the Node prev is still null, There is in a blocking queue (prev is used in the blocking queue list) if (node. WaitStatus = = node. CONDITION | | node. The prev = = null) return false. // If node has a successor node next, it must be blocking the queue. = null) return true; Node.prev (); node.prev(); node.prev(); = null to infer that node is blocking on the queue? The answer is: no. // Set node.prev to tail, and then CAS to set itself to the new tail, but this time the CAS may fail. return findNodeFromTail(node); } // Return true private Boolean findNodeFromTail(Node Node) {Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; }} Back to the previous loop, isOnSyncQueue(node) returns false, then locksupport. park(this); Here the thread hangs.Copy the code
4. Signal wakes up the thread and moves to the blocking queue
The wake up operation is usually performed by another thread, as in the producer-consumer pattern, if the thread is suspended for consumption, then when the producer produces something, it calls signal to wake up the waiting thread to consume it.
Public final void signal() {// The thread calling signal must hold the current exclusive lock if (! isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first ! = null) doSignal(first); } // Go back through the conditional queue to find the first node that needs to be transferred // Because we said earlier that some threads cancel the queue, Private void doSignal(Node first) {do {// set firstWaiter to first after the first Node, // If first is removed and there are no nodes waiting, If ((firstWaiter = first.nextwaiter) == null) lastWaiter = null; First.nextwaiter = null; first.nextwaiter = null; } while (! transferForSignal(first) && (first = firstWaiter) ! = null); If first fails, select the first node after first to do so, and so on} // Move the node from the conditional queue to the blocking queue // true for successful transfer // false for before signal, Final Boolean transferForSignal(Node Node) {// CAS fails if waitStatus of this Node is not Node.CONDITION, the Node has been cancelled. // If waitStatus is set to 0 if (! compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // enq(node): spins into the end of the blocking queue // Note that the return value p is node in the front of the blocking queue. int ws = p.waitStatus; // ws > 0 indicates that node unlocks the wait lock on the precursor node in the blocking queue, and directly wakes up the corresponding node thread. // If ws <= 0, compareAndSetWaitStatus will be called. Need to set the state of the precursor Node to Node. The SIGNAL (1) if (ws > 0 | |! CompareAndSetWaitStatus (p, ws, node.signal)) // Locksupport.unpark (node.thread); return true; } under normal circumstances, the ws > 0 | |! CompareAndSetWaitStatus (p, ws, Node.SIGNAL) ws <= 0 and compareAndSetWaitStatus(p, ws, Node.SIGNAL) returns true. So you don't wake up the node thread in the if block. This method then returns true, which means that the signal method ends and the node enters the blocking queue.Copy the code
5. Check the interruption status after waking up
After signal in the previous step, our thread is moved from the conditional queue to the blocking queue, and is ready to acquire the lock. Once the lock is retrieved, continue to execute.
int interruptMode = 0; while (! IsOnSyncQueue (node)) {// Thread suspens locksupport. park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) ! = 0) break; }Copy the code
InterruptMode is explained first. InterruptMode can be REINTERRUPT (1), THROW_IE (-1), or 0
- REINTERRUPT: When an await is returned, the interrupt status needs to be reset
- THROW_IE: To return an await, InterruptedException is thrown
- 0: indicates that no interrupt has occurred during await
There are three situations that will make locksupport.park (this); This return continues:
- Regular path. Signal -> Transfer node to blocking queue -> Lock acquired (unpark)
- The thread is interrupted. While in park, another thread interrupts this thread
- Signal when we said that the precursor node after the transfer was cancelled or the CAS operation to the precursor node failed
- Wake up. Object.wait() also has this problem
Thread wakes up after the first step is to call checkInterruptWhileWaiting (node) this method, this method is used to determine whether interruption happened during thread hanging, if interrupt comes up is interrupted before signal invocation, or signal after the interrupt.
If the signal has been interrupted before, return THROW_IE // 2. If interrupt is after signal, return REINTERRUPT // 3. There was no interruption, Return 0 private int checkInterruptWhileWaiting (Node Node) {return Thread. Interrupted ()? (transferAfterCancelledWait (Node) ? THROW_IE : REINTERRUPT) : 0; } // This method is called only if the thread is interrupted. // If necessary, the node that has been canceled is moved to the blocking queue. // Returns true: If this thread before the signal is cancelled, final Boolean transferAfterCancelledWait (Node to Node) {/ / the Node status is set to 0 with CAS / / step if the CAS is successful, The interrupt occurred before signal, because if signal had occurred first, If (compareAndSetWaitStatus(node, node.condition, 0)) {// Put the node on a blocking queue. The blocking queue enq(node) will still be moved; return true; } // This is because the CAS failed, it must be because the signal method has set waitStatus to 0. // The signal method will transfer the node to the blocking queue, but it is not finished yet, so the spin waits for it to complete. After the signal call, an interrupt occurs while (! isOnSyncQueue(node)) Thread.yield(); return false; }Copy the code
Even if an interrupt occurs, the node is still moved to the blocking queue.
Obtain an exclusive lock
After the while loop comes out, here’s the code:
if (acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;Copy the code
Since the while comes out, we determine that the node has entered the blocking queue and is ready to acquire the lock.
AcquireQueued (node, savedState) acquireQueued(node, savedState) acquireQueued(node, savedState) acquireQueued(node, savedState) This means that the current thread has acquired the lock, and state == savedState.
Note that the blocking queue is entered regardless of whether an interrupt has occurred, and the return value of the acquireQueued(Node, savedState) is to indicate whether the thread has been interrupted. If true is returned, it is interruptMode! = THROW_IE indicates that interrupts occurred before signal. Set interruptMode to REINTERRUPT for future interrupts.
Moving on:
if (node.nextWaiter ! = null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode ! = 0) reportInterruptAfterWait(interruptMode);Copy the code
In the event of an interrupt being determined, did it occur before or after signal? Node.nextwaiter = null If the signal has been interrupted before, the node will be transferred to the blocking queue.
7. Handle the interruption status
What does interruptMode do?
-
0: do nothing, not interrupted;
-
The THROW_IE: await method throws InterruptedException because it represents an interruption during await();
-
REINTERRUPT: Re-interrupts the current thread, since it represents an interrupt that has occurred after signal() and has not been interrupted during await()
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); }
The basic analysis of the above process is completed !!!!!!
Await with timeout mechanism
public final long awaitNanos(long nanosTimeout) throws InterruptedException public final boolean awaitUntil(Date deadline) throws InterruptedException public final boolean await(long time, TimeUnit unit) throws InterruptedException public final boolean await(long time, TimeUnit Unit) throws InterruptedException {// Long nanosTimeout = unit.tonanos (time); if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); Final Long deadline = system.nanotime () + nanosTimeout; Boolean timedOut = false; int interruptMode = 0; while (! IsOnSyncQueue (node)) {if (nanosTimeout <= 0L) { Cancel your waiting must call transferAfterCancelledWait (node) / / this method returns true if the method, in this method, moving nodes to blocking queue success / / returns false, the signal has occurred, The signal method transfers the node. That is no timeout, timedout = transferAfterCancelledWait (node); break; } // The value of spinForTimeoutThreshold is 1000 nanoseconds, that is, 1 millisecond // that is, if less than 1 millisecond, do not select parkNanos, If (nanosTimeout >= spinForTimeoutThreshold) locksupport. parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) ! = 0) break; NanosTimeout = deadline-system.nanotime (); } if (acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter ! = null) unlinkCancelledWaiters(); if (interruptMode ! = 0) reportInterruptAfterWait(interruptMode); return ! timedout; } // Await with no timeout parameter is park and wait for someone to wake up. Now it is time to call the parkNanos method to sleep for the specified time, wake up and determine if signal has been called, if it has not timed out, otherwise it has timed out. If you time out, do it yourself, move to the blocking queue, and grab the lock.Copy the code
The await of InterruptedException is not thrown
public final void awaitUninterruptibly() { Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean interrupted = false; while (! isOnSyncQueue(node)) { LockSupport.park(this); if (Thread.interrupted()) interrupted = true; } if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); }Copy the code
The cancellation of the AbstractQueuedSynchronizer exclusive lock queue
How to eliminate lock competition?
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } public final void acquire(int arg) { if (! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } static void selfInterrupt() { Thread.currentThread().interrupt(); }Copy the code
First, the node must have been enqueued by this method.
If we want to unqueue one thread, we need to interrupt it in another thread. For example, if a thread calls lock() for a long time and does not return, I want to interrupt it. Once it is interrupted, the thread is removed from locksupport.park (this); Wake up, then thread.interrupted (); Returns true.
One problem we found was that even if the interrupt woke up the thread, it simply set interrupted = true and continued the next loop. Also, because thread.interrupted (); Clears the interrupt state and returns false the second time parkAndCheckInterrupt is entered.
So, see that in this method the interrupted is used only to record whether or not an interruption has occurred, and then used for the method to return the value, and does nothing else.
So let’s see how the outer method handles the acquireQueued return false.
The lock() method handles interrupts as if you interrupt and I grab the lock anyway. It doesn’t matter if I grab the lock, but after I grab the lock, I set the thread’s interrupt state without throwing any exceptions. Once the lock is acquired, the caller can either check to see if the interrupt occurred or ignore it.
Another lock method for ReentrantLock:
public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (! tryAcquire(arg)) doAcquireInterruptibly(arg); } private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } the if (shouldParkAfterFailedAcquire (p, node) && parkAndCheckInterrupt ()) / / is here, once the exception, immediately put an end to this method, an exception is thrown. LockInterruptibly throw new InterruptedException(); lockInterruptibly throw new InterruptedException(); If (failed) cancelAcquire(node); if (interrupted dexception) cancelAcquire(node); } } private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; node.thread = null; // Skip cancelled toraise // Find a suitable predecessor. Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. Node predNext = pred.next; // Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. node.waitStatus = Node.CANCELLED; // If we are the tail, remove ourselves. if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pred ! = head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread ! = null) { Node next = node.next; if (next ! = null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; // help GC } }Copy the code
CountDownLatch
The CountDownLatch class is a typical use of the shared mode of AQS, which is a highly used class. “Latch” is the Chinese word for “door latch”. I don’t want to talk about it.
Assuming we have N (N > 0) tasks, we initialize a CountDownLatch with N and pass the latch reference to each thread. After each thread completes its task, latch.countdown () is called to represent the completion of a task.
The thread calling the latch.await() method blocks until all tasks are complete.
class Driver2 { // ... void main() throws InterruptedException { CountDownLatch doneSignal = new CountDownLatch(N); Executor e = Executors.newFixedThreadPool(8); // Create N tasks and submit them to the thread pool to execute for (int I = 0; i < N; ++i) // create and start threads e.execute(new WorkerRunnable(doneSignal, i)); Donesignal.await () is returned after all tasks are completed; // wait for all to finish } } class WorkerRunnable implements Runnable { private final CountDownLatch doneSignal; private final int i; WorkerRunnable(CountDownLatch doneSignal, int i) { this.doneSignal = doneSignal; this.i = i; } public void run() { try { doWork(i); Donesignal.countdown (); // When the task for this thread is complete, call the countDown method donesignal.countdown (); } catch (InterruptedException ex) { } // return; } void doWork() { ... }}Copy the code
It is common to break up a large task, start multiple threads to execute it, and wait for all threads to finish before moving on to other tasks. In this example, only the main thread calls the await method.
Two countdownlatches were used:
class Driver { // ... void main() throws InterruptedException { CountDownLatch startSignal = new CountDownLatch(1); CountDownLatch doneSignal = new CountDownLatch(N); for (int i = 0; i < N; ++i) // create and start threads new Thread(new Worker(startSignal, doneSignal)).start(); // Insert some code here to make sure that each line above is started before executing the code below. doSomethingElse(); CountDown (); countDown(); countDown(); countDown(); // let all threads proceed doSomethingElse(); Donesignal.await (); // wait for all to finish } } class Worker implements Runnable { private final CountDownLatch startSignal; private final CountDownLatch doneSignal; Worker(CountDownLatch startSignal, CountDownLatch doneSignal) { this.startSignal = startSignal; this.doneSignal = doneSignal; } public void run() {try {// To get all threads to start the task at the same time, let all threads block here. doWork(); doneSignal.countDown(); } catch (InterruptedException ex) { } // return; } void doWork() { ... } } public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } // Internal packaging a Sync class inherits from AQS private static final class Sync extends AbstractQueuedSynchronizer {Sync (int count) {/ / this is the state = = count the setState (count); }... } // the state in AQS is an integer value, the state in AQS is an integer value, the state in AQS is an integer value, the state in AQS is an integer value, the state in AQS is an integer value, the state in AQS is an integer value. When state is reduced to 0, the thread that has reduced state to 0 is responsible for waking up all threads that have called the await method. It's all tricks, except Doug Lea's tricks are deep and the code is clever, otherwise we wouldn't have to analyze the source code.Copy the code
CountDownLatch, we only need to care about two methods, the countDown() method and the await() method.
The countDown() method subtracts state by one each time it is called until the value of state is zero; Await is a blocking method and returns await method when state is reduced to 0. “Await” can be called by multiple threads. Readers should have a picture in their mind: all threads calling the “await” method are blocked in the AQS blocking queue, waiting for the condition (state == 0) to wake up the threads from the queue one by one.
T1 and T2 are responsible for calling the countDown() method, t3 and T4 block calling the await method:
public class CountDownLatchDemo { public static void main(String[] args) { CountDownLatch latch = new CountDownLatch(2); Thread t1 = new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(5000); } catch (InterruptedException ignore) {} // Call countDown() latch.countdown () after resting for 5 seconds (the simulated thread worked for 5 seconds); } }, "t1"); Thread t2 = new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(10000); } catch (InterruptedException ignore) {} call countDown() latch.countdown () after 10 seconds of rest (the simulated thread worked for 10 seconds); } }, "t2"); t1.start(); t2.start(); Thread t3 = new Thread(new Runnable() {@override public void run() {try {latch.await(); System.out.println(" thread t3 returns from await "); } catch (InterruptedException e) {system.out.println (" thread T3 await interrupted "); Thread.currentThread().interrupt(); } } }, "t3"); Thread t4 = new Thread(new Runnable() {@override public void run() {try {latch.await(); System.out.println(" thread T4 returns from await "); } catch (InterruptedException e) {system.out.println (" thread T4 await interrupted "); Thread.currentThread().interrupt(); } } }, "t4"); t3.start(); t4.start(); } } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public final void acquireSharedInterruptibly (int arg) throws InterruptedException {/ / this is old pattern, If (thread.interrupted ()) throws new InterruptedException(); if (thread.interrupted ()) throws new InterruptedException(); // state is greater than 0 when t3 and T4 call await (state is 2). / / that is to say, to see this if returns true, then the if (tryAcquireShared (arg) < 0) doAcquireSharedInterruptibly (arg); Protected int tryAcquireShared(int acquires) {return (getState() == 0)? 1:1; } private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 1. Final Node Node = addWaiter(node.shared); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); Int r = tryAcquireShared(arg); if (p == head) {// If (p == head) { if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // 2 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); }}Copy the code
After thread t3 joins the queue after step 1, addWaiter, we should get this:
Since tryAcquireShared returns -1, the if (r >= 0) branch will not go in. By shouldParkAfterFailedAcquire t3 will head waitStatus value is set to 1, as follows:
T3 then hangs when parkAndCheckInterrupt is entered.
When T4 joins the queue, it will set the waitStatus of the node where t3, the precursor node, is located as -1. After T4 joins the queue, it should look like this:
Then t4 also hangs. Next, T3 and T4 wait to be awakened.
Next, let’s look at the wake up process. To enrich the diagram below, let’s assume that CountDownLatch is initialized with 10.
Of course, in our example, we don’t actually have 10 threads, we just have 2 threads T1 and T2
CountDown () method:
public void countDown() { sync.releaseShared(1); } public final Boolean releaseShared(int arg) { TryReleaseShared returns true otherwise it's simply state = state-1 then countDown() ends If (tryReleaseShared(arg)) {// Wake up the await thread doReleaseShared(); return true; } return false; } // State minus 1 protected Boolean tryReleaseShared(int Releases) {for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; CountDown (countDown) {countDown (countDown) {countDown (countDown) {countDown (countDown) {countDown (countDown) {countDown (countDown) {countDown (countDown) {countDown (countDown) {countDown (countDown) {countDown (countDown); Private void doReleaseShared() {for (;); { Node h = head; if (h ! = null && h ! = tail) { int ws = h.waitStatus; If (ws == node.signal) {// Set head waitStatus to 0 if (! compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // Wake up the next node of the head, the first node in the blocking queue // Wake up the T3 unparksucceeded (h); } else if (ws == 0 && ! compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // todo continue; // loop on failed CAS } if (h == head) // loop if head changed break; }} Once T3 has been awaked, we go back to the code in await, parkAndCheckInterrupt returns, and we leave the interrupt case out of the way: private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); // 2. Here is the next step p.ext = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && // 1. Upon awakening, this method returns parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); // If t4 is followed by T5, then t4 will wake up. T5 to immediately aroused the if (the propagate > 0 | | h = = null | | h.w. aitStatus < 0 | | (h = head) = = null | | h.w. aitStatus < 0) {Node s = node.next; If (s = = null | | s.i sShared ()) / / it is the method, the head just now is not the original blank nodes, is the node t3 doReleaseShared (); }} state == 0 private void doReleaseShared() {for (;); { Node h = head; H == null: the blocking queue is empty // 2. h == tail: If (h! H) {if (h! H) {if (h! H) {if (h! H) {if (h! = null && h ! = tail) { int ws = h.waitStatus; If (ws == node.signal) {// If (! compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // Wake up the successor of head, the first node in the blocking queue // Wake up the T4 unparksucceeded (h); } else if (ws == 0 && // CAS fails if (ws == 0 &&) compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS} // loop on failed CAS} // loop on failed CAS} If (h == head) // loop if head changed break; }}Copy the code
Let’s examine the last if statement to explain why the first CAS might fail:
- H == head: indicates that the head node has not yet been possessed by the threads (here understood as T4) just awakened with the unparksucceeded, at which point break exits the loop.
- h ! = head: the head node is occupied by the thread that just woke up (t4 in this case), so it re-enters the next loop to wake up the next node (T4 in this case). As we know, when T4 is awakened, it will actively awaken T5, T6, T7… So why do we have to go through the next loop here to wake up T5? I think it’s for throughput.
The CAS operation compareAndSetWaitStatus(h, node.signal, 0) failed.
Because by the time the current for loop reaches this point, it is possible that the newly awakened thread T4 has just reached this point, so it is possible that CAS has failed.
T4 will wake up on the first turn of the for loop, and t4 will set itself as the head node when it wakes up. If (h == head) the for loop reaches if (h == head) after t4 sets the head node, then false will be returned, and the for loop will proceed to the next turn. T4 will also wake up in this method, so it is possible for the second round of the for loop and T4 will meet in this CAS, and only one will succeed.
CyclicBarrier
Cyclicbarriers are simpler than countdownlatches, and their source code doesn’t have much to offer. It is a combination of ReentrantLock and Condition. CyclicBarrier and CountDownLatch are similar, except that CyclicBarrier can have more than one Barrier because its Barrier can be reused.
First, the source code implementation of CyclicBarrier is very different from CountDownLatch, which is based on the use of AQS sharing mode, and CyclicBarrier, which is based on Condition.
Because CyclicBarrier source code is relatively simple, once the reader is familiar with the previous Condition analysis, the source code here is no pressure, just a few special concepts.
Semaphore
With the basis of CountDownLatch, analyzing Semaphore is much easier. What is Semaphore? It is similar to a resource pool (the reader can use the analogy of a thread pool). Each thread needs to call acquire() to acquire the resource before it can execute, and when it has executed, it needs to release the resource for other threads to use.
As you can probably guess, Semaphore is also the use of shared locks in AQS, since each thread shares a pool.
Routine interpretation: With permits, Semaphore instance creation requires an permitting parameter, which can be confirmed to state of AQS. Then, when each thread calls acquire, execute state = state-1. Release state = state + 1, of course, acquire state, if state = 0, it means that there are no resources, need to wait for other thread release.
Here the source code is not introduced, later to add !!!!