instructions
- The JDK version used in this article is OpenJDK-1.8
Locking and threading modifications
- The nature of locking? The main purpose is to implement an orderly wake-up operation when accessing a critical resource.
- Locking in Java
sychronized
和Lock
. insychronized
Mainly throughmonitor
To implement it. throughObject.wart/notify
Implement thread to block and wake up; The second is thread-basedLockSupport.park/unpack
Block and wake up. - Thread interrupt problem, how to gracefully interrupt a thread?
-
The Java interrupt mechanism is a cooperative mechanism, which means that interrupts do not terminate a thread directly, but the interrupted thread handles the interrupt itself
-
API usage:
Interrupt (): Sets the thread’s interrupt bit to true;
IsInterrupted (): Checks whether the interrupt flag of the current thread is true.
Thread.interrupted(): Checks whether the current Thread interrupted position is true and clears the interrupt flag, reset to fasle.
-
Does LockSupport cause thread interrupts? LockSupport does not interrupt threads.
-
What is CAS? Comparison exchanges, primarily an optimistic lock concept, are underpinned by the Unsafe API.
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
Copy the code
Principle and implementation of AQS
AQS is based on the queue in the queue AbstractQueuedSynchronizer implementation class for the realization of the template method.
-
The underlying data structure is a bidirectional linked list. Each Node contains the prev and next Pointers, as well as the data data field, which is used as a Thread object.
-
Node has four states: Canceled, wait, conditional wait, and shared
-
There are two common implementations: fair locks and unfair locks. How to Withdraw Fair refers to whether the data added at the same time and the data in the queue header can compete fairly for resources.
-
The state is changed via CAS, with sun.misc.Unsafe’s compareAndSwapInt being called to change the state.
-
A lock is attempted before the thread is enqueued, and if the lock is not available it blocks the current thread and the thread blocks through locksupport.park ().
-
When the lock is released, it will go to the node in the queue to get the head of the queue and wake up. The head node of the synchronization queue is the thread that currently obtains the lock.
-
Here is the core code to get the AQS in the lock
A. Try locking. If the lock fails, enter the queue and retry.
// Obtain the lock exclusively, you can ignore the interrupt, at least one call, if the failure will be queued, until success public final void acquire(int arg) { if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code
B. In the queue, check whether it is the queue head. If it is a lock attempt, if not, change the Node state to wake-up. Change the thread state to blocked after successful state modification.
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }}Copy the code
The following is shouldParkAfterFailedAcquire method of Node status maintenance.
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // The state of the preceding node goes to the default value 0 for the first time if (ws == Node.SIGNAL) return true; if (ws > 0) { do { / / out of the team node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // For the first time, pred.waitStatus = 0 executes the branch // Change the state of the preceding node to SIGNAL, indicating that the pred.next node needs to be woken up (it is ready to block, but not yet blocked, and will only be blocked if it fails to acquire the lock again). compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } Copy the code
The flow chart of AQS
ASQ characteristics
- Blocking wait queue
- Shared exclusive
- Fair/unfair
- reentrant
- Allow the interrupt
ReentrantLock
A simple Demo
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockTest {
static ReentrantLock lock = new ReentrantLock();
static class T extends Thread {
@Override
public void run(a) {
try {
System.out.println(Thread.currentThread() + "Start trying to acquire the lock");
if (lock.tryLock(10, TimeUnit.SECONDS)) {
System.out.println(Thread.currentThread() + "Lock obtained successfully");
TimeUnit.SECONDS.sleep(5); }}catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread() + "Start releasing the lock."); lock.unlock(); }}}public static void main(String[] args) {
T t1 = new T();
T t2 = new T();
T t3 = newT(); t1.start(); t2.start(); t3.start(); }}Copy the code
Locking and unlocking process diagram
Description of locking and unlocking process
- In the above program, there are three threads to acquire the lock at the same time, and only one thread can acquire the lock at the same time. The following is the logic for joining the queue to try to acquire the lock:
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE); / / team
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { // If it is the head node and the lock was successfully acquired
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// The maximum waiting time for the lock is exceeded
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
// Failed to get the lock and blocked and the spin wait time was exceeded
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
// Enter the blocking nanosTimeout for the blocking time
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw newInterruptedException(); }}finally {
if(failed) cancelAcquire(node); }}Copy the code
- If no thread holds the lock, then CAS attempts to hold the lock. If the current thread holds the lock, state + 1 accumulates
ReentrantLock
Reentrant is supported.
// The logic of unfair locking
// Queue-jumping is the thread that is awakened from the current queue, and the thread that is added to the queue can be executed
Queue-jumping occurs if the currently joined thread obtains the lock before the queue awakens
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// No lock, try to compete
if (c == 0) {
if (compareAndSetState(0, acquires)) { // Whether the lock has been obtained
setExclusiveOwnerThread(current);
return true; }}// The current thread holds the lock, and the state count is +1
else if (current == getExclusiveOwnerThread()) { // Check if it is reentrant
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
Copy the code
- If a lock is acquired, the current node is set as the head node. And returns true if the lock acquisition fails and the spin time for the lock acquisition exceeds then the current thread is blocked by a call
LockSupport.parkNanos(this, nanosTimeout);
The implementation. It may be called multiple times in the processshouldParkAfterFailedAcquire
Methods.shouldParkAfterFailedAcquire
Can be used to modify the current node state, and the linked list of invalid nodes out of the queue
/** if the thread fails to obtain the lock, wait for the new node to block. If the thread fails to obtain the lock, wait for the new node to block. The current thread is blocked@paramPred predecessor node *@paramNode Current node *@return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; // The state of the preceding node must be 0 for the first time
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
// discard invalid nodes
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// For the first time, pred.waitStatus = 0 executes the branch
// Change the state of the preceding node to SIGNAL, indicating that the pred.next node needs to be woken up (it is ready to block, but not yet blocked, and will only be blocked if it fails to acquire the lock again).
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
Copy the code
- Unlock logic, here is the unlock logic, first to unlock, if the state is changed to 0, then to wake up the queue queuing thread.
/ / unlock
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// Determine if there are threads to wake up
if(h ! =null&& h.waitStatus ! =0) // The value of waitStatus is 0, which is set to 0 only if there is a successor node, and the successor thread needs to be woken up
unparkSuccessor(h);
return true;
}
return false;
}
// tryRelease
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// Determine if the current thread holds the lock
if(Thread.currentThread() ! = getExclusiveOwnerThread())throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
// If state == 0, the current thread is no longer holding the lock
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
// Wake up the queue thread
private void unparkSuccessor(Node node) {
// Change the current node state to 0
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
s = t;
}
if(s ! =null)
// Wake up the node in the queue
LockSupport.unpark(s.thread);
}
Copy the code
- The current node is awakened by logic, first in
shouldParkAfterFailedAcquire
Method, then attempts to lock and returns true if the lock succeeds.