1 introduction
AbstractQueuedSynchronizer abstract queue type synchronizer (abstract class). FIFO queue provides a FIFO queue, which can be regarded as a core component used to achieve synchronization lock and other synchronization functions, such as ReentrantLock and CountDownLatch.
Among the more important concepts are:
- State Shared resource
- FIFO
- CAS
- park unpark
2 AQS thread queue
Node is the most core part of the whole queue, which contains CANCELLED, SIGNAL, CONDITION and PROPAGATE, Pointers to nodes before and after, and thread is the value stored by nodes.
Both fair and unfair locks in ReentrantLock inherit from this abstract class.
ReentrantLock contains fair and unfair locks. Each lock also contains the abstract lock Sync, which performs AQS.
If thread 1 currently holds the lock, threads 2,3, and 4 call the park operation to be suspended and enqueued.
The unpark method is called when Thread1, which owns the lock, is done, head pointing to the next node. Since ReentrantLock is a ReentrantLock, the number of reentrants is recorded by aqs. state, with a value of + 1 for each reentrant and -1 for each exit.
3 ReentrantLock instance construction
The ReentrantLock default constructor creates an unfair lock, and you need to pass true in the constructor if you want a fair lock.
4 Multi-thread Lock grab icon (Unfair lock)
The thread grabs the lock using CAS.
At the beginning, there is only one Head node in the waiting queue, in which the Thread stored is null, which is used to hold the position. Threads B and C are connected with Head in the queue in the way of bidirectional linked list, and call park to suspend.
See below for a step-by-step process of the figure above.
4.1 Thread A locks
When thread A takes the lock, change the state to 1 by CAS operation and set the exclusiveOwnerThread attribute of AQS to A, indicating the owner of the current lock.
4.2B thread locks
At this time, thread B will grab the lock and find that the state is already 1, so the CAS operation fails and it enters the queue. WaitStatus is set to 0
The first time a new Node is created, the head is set to NULL, and waitStatus is set to SIGNAL
4.3 Lock the C thread
If thread C sets waitStatus of thread B to SIGNAL, it will set waitStatus of thread B to SIGNAL
The reason why the waitStatus value of the newly inserted node is set to 0 is that it is used to mark the last node in the queue. In this case, unpark operation on subsequent nodes is unnecessary. When there is no node in the wait queue, a null node will be used as a placeholder.
4.4 A Thread is unlocked
After thread A performs the unlocking operation, thread B can grab the lock. The process is as follows:
Unlock operation of thread A:
- The state is set to 0
- ExclusiveOwnerThread Set to null: The lock is released
- The head node moves back one bit to unpark thread B
- The previous head node will be freed
4.5B thread is unlocked
4.6 Unlocking the C Thread
5 Source code analysis (taking unfair lock as an example)
5.1 Thread A preempts the lock first
Construction method:
public ReentrantLock(a) {
sync = new NonfairSync();
}
Copy the code
The lock () :
public void lock(a) {
sync.lock();
}
Copy the code
Suppose we now have two threads, A and B, as follows:
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
// Thread A comes in
Thread B comes in
final void lock(a) {
State is set to 1, indicating that the lock has been preempted
// Thread B, CAS is false because thread A has AQS. State set to 1
if (compareAndSetState(0.1))
/ / thread A, will AbstractOwnableSynchronizer exclusiveOwnerThread Settings for the current thread
setExclusiveOwnerThread(Thread.currentThread());
else
Thread B performs this part, attempting to acquire a lock
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
returnnonfairTryAcquire(acquires); }}Copy the code
Acquire (1) :
Thread B calls: arg = 1
public final void acquire(int arg) {
/ / thread B
TryAcquire (arg) = false; // Try to acquire an unfair lock because it was grabbed by thread A
// addWaiter method to build a Node hosting thread B, add it to the end of the queue, and return the Node
/ / acquireQueued method
if(! tryAcquire(arg) && acquireQueued(addWaiter(AbstractQueuedSynchronizer.Node.EXCLUSIVE), arg)) {// Sets the interrupt flag for the current threadselfInterrupt(); }}Copy the code
TryAcquire (ARG) finally calls: tryAcquire(ARG)
* 1 If the lock is grabbed, returns true * 1.1 if the current thread grabbed the lock for the first time: * aqs.status changes from 0 to 1 * aqs.exclusiveownerthread = thread.currentthread () * returns true * 1.2 if the currentThread snatches the lock again (reentre) * aqs.status ++ * Returns true * 2 failed to capture the lock and false * */
Thread B acquires = 1
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
Thread A has set state to 1, so c=1
int c = getState();
if (c == 0) { // Thread B will not execute if it does not meet the requirement
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true; }}else if (current == getExclusiveOwnerThread()) { // Thread B will not execute if it does not meet the requirement
/** * get the current exclusive thread, if it is the current thread, then reentrant * when tryLock() is executed: * If you enter for the second time, nexTC = 0 + 1 = 1 * If you enter for the third time, nexTC = 1 + 1 = 2 * If you enter for the fourth time, nexTC = 2 + 1 = 3 */
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
Thread B, return false
return false;
}
Copy the code
Back to the addWaiter method in the Acquire code:
Thread B: mode = node. EXCLUSIVE = null
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// Thread B: Tail = null does not enter the if method since thread A is not in the list
if(pred ! =null) {
node.prev = pred; // (old tail node) <-- new node.prev
if (compareAndSetTail(pred, node)) {
pred.next = node; // (old tail node) next --> (new node)
returnnode; }}// thread B: node = the node hosting thread B
enq(node);
return node;
}
Copy the code
The addWaiter method uses the Node constructor:
Thread B: thread = thread.currentThread () mode = null
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
Copy the code
NextWaiter is null. The Node constructor does not assign waitStatus, so it defaults to 0
Back to the addWaiter method, which also uses the ENQ method:
If the queue is empty, initialize an empty content node as the first node, and then append the input node to the end of the queue. If the queue is not empty, append the input node to the end of the queue@param node
* @return* /
// thread B: node = the node hosting thread B
private Node enq(final Node node) {
for (;;) {
// Thread B: the first loop tail = null
// Thread B: tail = head = empty content node
Node t = tail;
/** * the queue has been initialized by 1 node, so t = null. = null */
if (t == null) { // Thread B: the first loop satisfies t == null and enters the module
if (compareAndSetHead(new Node())) // Initialize an empty content node as the aqs.head node
tail = head;
} else { // Thread B: the second loop satisfies t! = null into the module, i.e. empty content node <-> new node
node.prev = t; // The old tail node <-- the entry node.prev
if (compareAndSetTail(t, node)) { // Update the aqs. tailOffset content to the input node
t.next = node; // Old tail node --> entry node
returnt; }}}}Copy the code
AcquireQueued: addWaiter () acquireQueued: acquireQueued () acquireQueued () acquireQueued ();
// thread B: node = the node hosting B, arg = 1
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
Thread B: loops until the lock is acquired
for (;;) {
Thread B: p= empty content node (head)
final Node p = node.predecessor();
Thread B: p == head = true; But since thread A grabbed the lock first, tryAcquire(ARg)=false, so the following if is not entered
if (p == head && tryAcquire(arg)) { // tryAcquire(ARG) : lock grab
setHead(node); // Update the header to an input node
p.next = null; // help GC
failed = false;
return interrupted;
}
/ / thread B: shouldParkAfterFailedAcquire method, set up the waitStatus = p Node Node. The SIGNAL (the original is 0), if the spin two didn't get the lock will return true hung thread
// Thread B: the parkAndCheckInterrupt method suspends thread B
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true; }}finally {
if(failed) cancelAcquire(node); }}Copy the code
5.2 Thread A Releases the lock
If A calls unlock:
public void unlock(a) {
sync.release(1);
}
Copy the code
The lock can be released as follows:
Thread A: arg = 1
public final boolean release(int arg) {
if (tryRelease(arg)) { / / thread A: AQS. State = 0, AQS. ExclusiveOwnerThread = null
Node h = head;
Thread A: meet the condition h.waitStatus == SIGNAL(value -1)
if(h ! =null&& h.waitStatus ! =0)
unparkSuccessor(h);
return true;
}
return false;
}
Copy the code
The tryRelease method is called:
Thread A: release = 1
protected final boolean tryRelease(int releases) {
Thread A: c = 1-1 = 0
int c = getState() - releases;
// If the current thread is not preempted, an exception is thrown
if(Thread.currentThread() ! = getExclusiveOwnerThread())throw new IllegalMonitorStateException();
boolean free = false;
Thread A: c == 0 is true
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
Copy the code
Corresponding:
TryRelease followed by going back to the release method and running the unparkprecursor method:
private void unparkSuccessor(Node node) {
// thread A: node.waitStatus == SIGNAL(-1)
int ws = node.waitStatus;
if (ws < 0) {
Thread A: Set waitStatus == 0
compareAndSetWaitStatus(node, ws, 0);
}
Node s = node.next;
// thread A: s = thread B, s.waitStatus == SIGNAL(-1)
if (s == null || s.waitStatus > 0) { // Node is tail or the tail node of node waitStatus > 0
s = null; // Disconnect Node from Node.next
for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
s = t;
}
// Thread A: s == thread B, activate thread B
if(s ! =null)
LockSupport.unpark(s.thread);
}
Copy the code
In some column calls to thread B’s lock method after the release, there is the parkAndCheckInterrupt() method, that is, thread B is suspended, and thread A is released and returns to the suspended position to continue execution, so thread B’s acquireQueued() method can grab the lock
Corresponding: