Original text: chenmingyu. Top/concurrent -…
The lock
A Lock is used to control how multiple threads access a shared resource. In Java, synchronized and Lock can be used to implement the Lock function
Synchronized is a keyword in Java that hides the process of obtaining and releasing locks. Lock is an interface in Java that requires the active acquisition and release of locks. Synchronized is an exclusive Lock that can be interrupted or timed out
Interface provided by Lock
public interface Lock {
/** * gets the lock. The current thread gets the lock and returns */ from this method
void lock(a);
/** * interruptible lock acquisition process can interrupt the current thread */
void lockInterruptibly(a) throws InterruptedException;
/** * attempts to acquire the lock without blocking, returns immediately after calling the method, true if it got the lock, false otherwise */
boolean tryLock(a);
/** * this method returns */ when the lock is acquired within the timeout period, or when the lock is interrupted within the timeout period
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
/** * release lock */
void unlock(a);
/** * The wait notification component can only be called after the current thread has acquired the lock */
Condition newCondition(a);
}
Copy the code
Queue synchronizer
Queue synchronizer AbstractQueuedSynchronizer (AQS synchronizer) is used to construct the lock or other synchronous component based framework
The implementation of the basic lock in the Java are by aggregating a subclass of synchronizer complete threads access control, synchronizer is the key to realize the lock, so to understand, lock for programmers, hides the implementation details, synchronizer for the realization of the lock, simplifies the lock is implemented, shielding the synchronous state management, thread queue, waiting to awaken the underlying operating, Through AbstractQueuedSynchronizer we can easily achieve a lock
Design principles
The design of synchronizer is based on the template method mode. The template methods mainly include: exclusive lock to obtain lock and release synchronization state, shared to obtain and release synchronization state, and obtain the status of waiting threads in the synchronization queue
Exclusive operation
To implement an exclusive lock, override the following methods
The method name | describe |
---|---|
void acquire(int arg) | Only one thread can obtain the synchronization status at a time. If the thread fails to obtain the synchronization status, it enters the synchronization queue and waits |
void acquireInterruptibly(int arg) | Exclusive access to the synchronization state, response to interrupt operations, interrupted when throwing an exception and return |
boolean tryAcquireNanos(int arg, long nanosTimeout) | Exclusivity retrieves synchronization state in response to interrupt operations, and adds a timeout limit that returns false if synchronization state has not been obtained by a specified time, and true otherwise |
boolean release(int arg) | An exclusive release of synchronization state that wakes up the thread contained by the first node in the synchronization queue |
Shared operation
To implement a shared lock, override the following methods
The method name | describe |
---|---|
void acquireShared(int arg) | The synchronization state can be shared. Multiple threads can obtain the synchronization state at the same time |
void acquireSharedInterruptibly(int arg) | Shared gets synchronization status and responds to interrupt operations |
boolean tryAcquireSharedNanos(int arg, long nanosTimeout) | Shared retrieves synchronization status in response to interrupt operations, and adds a timeout limit that returns false if synchronization status has not been obtained by a specified time, and true otherwise |
boolean releaseShared(int arg) | Release synchronization state in shared mode |
Gets synchronization queue thread information
The method name | describe |
---|---|
Collection getQueuedThreads() | Gets a collection of threads on the synchronization queue |
In these template methods, synchronous queues are mentioned several times. Let’s look at how AQS implements synchronous queues
First of all see AbstractQueuedSynchronizer class diagram
Node
Node is AbstractQueuedSynchronizer class inner classes, synchronizer rely on internal state of a synchronous queue to complete synchronization of management, the current thread when acquiring the synchronization state failure, synchronizer will the current thread and waiting for the information structure into a Node Node into the synchronous queue
attribute | describe |
---|---|
waitStatus | The thread wait state contains the following: CANCELLED is 1, indicating that the wait needs to be CANCELLED from the synchronous queue The SIGNAL value is -1, indicating that the successor node is in the waiting state. If the current node releases the synchronization state, the successor node is notified so that the thread of the successor node can run The value of CONDITION is -2, indicating that the node is in the wait queue PROPAGATE value is -3, which means that the next shared synchronization state acquisition will PROPAGATE unconditionally The value of INITIAL is 0, indicating the INITIAL state |
prev:Node | Precursor node |
next:Node | The subsequent nodes |
thread:Thread | The current thread |
nextWaiter:Node | Next wait node |
It can be seen that the node information in AQS includes precursor and successor nodes, so we know that the synchronous queue of AQS is bidirectional linked list structure
AQS
Several important properties in AQS
attribute | describe |
---|---|
state:int | Synchronization: If it is 0, the lock is idle. If it is 1, the lock is occupied. If it is greater than 1, the lock has been locked multiple times by the current thread, that is, the reentrant state |
head:Node | The head node of the queue |
tail:Node | The tail node of the queue |
unsafe:Unsafe | AQS cas algorithm implementation |
AQS provides three methods for manipulating the synchronization state
getState()
The synchronization status is obtainedsetState(int newState)
Setting synchronization StatuscompareAndSetState(int expect, int update)
useCAS
Sets the current state. This method ensures atomicity of the setting
The basic structure of AQS is shown below
The references to the head and tail nodes in the synchronizer point to the head and tail nodes of the synchronization queue, so that only the head and tail nodes in the synchronizer need to be operated on when the operation nodes are in and out of the column
Exclusive lock
ReentrantLock
ReentrantLock the implementation of the internal AQS is based on exclusive get/release synchronization state. Let’s take a look at the implementation of ReentrantLock to further understand AQS
Re-entry means that any thread can acquire the lock again without being blocked by the lock. It means that a thread can repeatedly lock resources, and it supports the use of fair or unfair locks when acquiring the lock
Ex. :
/ * * *@author: chenmingyu
* @date: 2019/4/12 15:09
* @description: ReentrantLock
*/
public class ReentrantLockTest {
private static Lock LOCK = new ReentrantLock();
public static void main(String[] args) {
Runnable r1 = new TestThread();
new Thread(r1,"r1").start();
Runnable r2 = new TestThread();
new Thread(r2,"r2").start();
}
public static class TestThread implements Runnable{
@Override
public void run(a) {
LOCK.lock();
try {
System.out.println(Thread.currentThread().getName()+": Lock obtained"+LocalTime.now());
TimeUnit.SECONDS.sleep(3L);
}catch (Exception e){
e.printStackTrace();
}finally{ LOCK.unlock(); }}}}Copy the code
The output
Only after R1 thread releases the lock does R2 thread acquire the lock to execute the code print data
Source code analysis
Creates an instance that uses an unfair lock by default and calls the constructor with arguments if a fair lock is required
/** * Unfair lock * creates an instance of ReentrantLock. By default, unfair lock */ is used
public ReentrantLock(a) {
sync = new NonfairSync();
}
/** * Fair lock creates an instance of ReentrantLock. Fair is true to use fair lock */
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
Copy the code
NonfairSync and FairSync are already kind of inner classes, inherit from already internal class Sync, Sync class inherited AbstractQueuedSynchronizer
The class diagram below
Exclusive lock acquisition
Implementation of an unfair lock
/** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */
final void lock(a) {
if (compareAndSetState(0.1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
Copy the code
An unfair lock will first attempt to acquire the lock by calling the compareAndSetState(0, 1) method when the lock() method is called, or acquire(1) method if no lock is obtained
CompareAndSetState (0, 1) is a CAS operation. If the value is set successfully, setExclusiveOwnerThread(thread.currentThread ()) is called. Method sets the current thread as the owner of the exclusive mode synchronization state
What we mean by obtaining synchronization state actually refers to obtaining lock state. If the synchronization state is successfully obtained, the lock is successfully added
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
Copy the code
Acquire(1) methods are provided template methods that call tryAcquire(arg) and acquireQueued(addWaiter(node.exclusive), arg)
public final void acquire(int arg) {
if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code
The tryAcquire(ARG) method calls the subclass’s implementation, the tryAcquire method of NonfairSync
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
Copy the code
NonfairTryAcquire (acquires) method
/** * Unfair attempt to get synchronization status */
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true; }}else if (current == getExclusiveOwnerThread()) {
/** * Obtain the synchronization status according to the 'getState()' method. If = 0, try calling 'compareAndSetState(0, * acquires)' method to obtain the synchronization status. Sets the current thread to the * owner */ of the exclusive mode synchronization state
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
Copy the code
- According to the
getState()
Method to get synchronization status, if equal to 0 try to callcompareAndSetState(0, acquires)
Method to get the synchronization state, if set successfully, the synchronization state is successfully obtained, and the current thread is set as the owner of the synchronization state in exclusive mode - If the current thread is equal to the thread with the exclusive synchronous state owner, then
state
+1: the current thread locks multiple times
AcquireQueued (addWaiter(Node.exclusive), arg) ¶ If tryAcquire(ARG) returns false, the synchronization status has not been obtained, and the lock has not been obtained, call acquireQueued(addWaiter(Node.exclusive), arG) And in the way of an infinite loop to obtain the synchronization state, if not, the thread in the node is blocked, and the blocked thread can only be through the preceding node out of the queue, or the blocking thread is interrupted to realize the wake up
The addWaiter(Node.exclusive) method constructs the Node information of the synchronization queue and adds it to the end of the synchronization queue
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if(pred ! =null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
Copy the code
We first create an instance by calling the constructor of the Node class, tail being the tail Node of the queue in the AQS
If the tail node is not empty, set the precursor node of the instance to the node pointed to by tail and call compareAndSetTail(pred, node). CompareAndSetTail (Mr Pred, node) method calls unsafe.com pareAndSwapObject (this, tailOffset, expect, update), and this method is a CAS operation, do not interrupt, To ensure that the node can be added safely by thread, the tail node of the tail node points to the current instance to add the current instance to the end of the synchronization queue
If the tail node is empty or the compareAndSetTail(pred, node) setting fails, the enq(node) method is called
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
returnt; }}}}Copy the code
If the tail Node of the current AQS is empty, then there is no thread waiting in the queue, that is, there is no Node. Call compareAndSetHead(new Node()) method to construct a head Node. A loop call to compareAndSetTail(t, node) adds the current instance to the end of the queue, and if it fails, continues until it succeeds
AcquireQueued (Final Node Node, int arg) will be invoked after the addWaiter(Node mode) method is called. The final Node Node (int arg) will enter a spin state after each Node enters the queue. And whether the synchronization state is obtained is judged as the condition. If the condition is met, it exits from the spin and is responsible for keeping the spin
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true; }}finally {
if(failed) cancelAcquire(node); }}Copy the code
The node.predecessor() method is used to obtain the precursor nodes of the incoming node instance and compare them with the HEAD node of AQS. If they are equal, the synchronized state acquisition lock is attempted. If the acquisition is successful, setHead(node) is called. Method sets the current Node instance Node to head and sets the successors of the original HEAD Node to null to facilitate GC collection
setHead(node);
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
Copy the code
If the incoming Node with AQS head precursor for instance Node is not equal to or acquiring the synchronization state failure, call the shouldParkAfterFailedAcquire (p, Node) and parkAndCheckInterrupt () method
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/* * This node has already set status asking a release * to signal it, so it can safely park. */
return true;
if (ws > 0) {
/* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
Copy the code
Using the CAS operation, set the Node’s precursor Node waiting status to Node.SIGNAL. If the setting fails, false is returned because the outer loop is infinite and the current method is repeated until the setting succeeds
The parkAndCheckInterrupt() method calls looksupport.park () to block the thread and then clear the interrupt flag
private final boolean parkAndCheckInterrupt(a) {
LockSupport.park(this);
return Thread.interrupted();
}
Copy the code
After returning from the acquireQueued(addWaiter(Node.exclusive), ARg) method, selfInterrupt() is called to interrupt the thread
Implementation of fair locks
In the understanding of acquire (1); Once the method is in place, it is easy to understand the implementation of fair locking
final void lock(a) {
acquire(1);
}
Copy the code
Compared to the implementation of an unfair lock, there is one less step to get the synchronization state, and the rest of the operation is the same as the implementation of an unfair lock
Fair lock and unfair lock summary:
- Fair lock. If there is a synchronization column before the lock is added, it is added to the end of the synchronization queue
- Unfair lock. Before locking, try to obtain the synchronization status regardless of whether there is a synchronization queue. If the synchronization status is not obtained, the lock is added to the end of the synchronization queue
- The efficiency of non-fair lock is much higher than that of fair lock. Fair lock ensures the acquisition of synchronous state. In accordance with THE FIFO principle, the cost is that a large number of thread switching is required. It is possible, however, that a thread in the synchronization queue never acquirethe lock
The process of acquiring locks exclusively
Exclusive lock release
ReentrantLock’s unlock() method actually calls AQS’s Release (int arg) method
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if(h ! =null&& h.waitStatus ! =0)
unparkSuccessor(h);
return true;
}
return false;
}
Copy the code
TryRelease (ARG) is first called to release the synchronization state
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if(Thread.currentThread() ! = getExclusiveOwnerThread())throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
Copy the code
If c==0, release the lock, set the thread owning the current exclusive lock to null, and set state to 0
The unparksucceeded (Node Node) method is then called to wake up the threads of the succeeding nodes
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
s = t;
}
if(s ! =null)
/* * Wake up the thread of the subsequent node */
LockSupport.unpark(s.thread);
}
Copy the code
To summarize the process of acquiring and releasing locks exclusively:
- When the lock is obtained, the synchronization status will be obtained first. If the lock is successfully obtained, the lock will be successfully added. If the lock fails, the current thread information will be constructed as node information and added to
AQS
Maintain the tail of the synchronization queue, and start to spin, the condition to jump out of spin is that the precursor node isAQS
The node is removed from the synchronization queue - When a lock is released, synchronization state is first released and subsequent nodes of the node are woken up
- After a thread locks N times, it must release the lock N times before another thread acquires the lock
Implement an exclusive lock yourself
Now that we know how ReentrantLock works, we can emulate our own implementation of a custom exclusive lock
steps
- To create a
LockTest
Classes that implementLock
Interface, override the necessary interface - in
LockTest
Class to create an inner classSync
, inheritanceAQS
, because to implement an exclusive lock, so rewritetryAcquire(int arg)
andtryRelease(int arg)
The method will do
LockTest code
/ * * *@author: chenmingyu
* @date: 2019/4/11 * 3@description: Custom exclusive lock */
public class LockTest implements Lock{
private final Sync SYNC = new Sync();
public static class Sync extends AbstractQueuedSynchronizer{
@Override
protected boolean tryAcquire(int arg) {
if(compareAndSetState(0.1)){
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
if(getState()<1) {throw new IllegalMonitorStateException("Release synchronization state not less than 1");
}
int c = getState() - arg;
if (c == 0) {
setExclusiveOwnerThread(null);
}
setState(c);
return true; }}@Override
public void lock(a) {
SYNC.acquire(1);
}
@Override
public void lockInterruptibly(a) throws InterruptedException {
SYNC.acquireInterruptibly(1);
}
@Override
public boolean tryLock(a) {
return SYNC.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public void unlock(a) {
SYNC.release(1);
}
@Override
public Condition newCondition(a) {
return null; }}Copy the code
validation
/ * * *@author: chenmingyu
* @date: 2019/4/12 15:09
* @description: LockTest
*/
public class ReentrantLockTest {
private static Lock LOCKTEST = new LockTest();
public static void main(String[] args) {
Runnable r1 = new TestThread();
new Thread(r1,"LockTest 1").start();
Runnable r2 = new TestThread();
new Thread(r2,"LockTest 2").start();
}
public static class TestThread implements Runnable{
@Override
public void run(a) {
LOCKTEST.lock();
try {
System.out.println(Thread.currentThread().getName()+": Lock obtained"+LocalTime.now());
TimeUnit.SECONDS.sleep(3L);
}catch (Exception e){
e.printStackTrace();
}finally{ LOCKTEST.unlock(); }}}}Copy the code
The output
A Shared lock
Read-write lock
ReentrantReadWriteLock is an implementation of read/write locks and implements the ReadWriteLock interface
ReentrantReadWriteLock also maintains an internal Sync class that implements AQS and overrides the corresponding methods to implement read and write locks
The state is an integer variable maintained by AQS. When an exclusive lock acquires a lock, it is increded by 1. ReentrantReadWriteLock also uses an integer variable to maintain the read and write lock states. To maintain two states on a variable, you need to segment integer variables by bit. A variable of the int type contains four characters, and a character has eight bits. The value is 32 bits
Write lock acquisition
Write locks in read/write locks, exclusive locks that support re-entry
Override the tryAcquire(int Acquires) method in Sync of ReentrantReadWriteLock’s internal class
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
/* * 1, if the synchronization state c is not equal to 0, it indicates that there is a read or write lock */
if(c ! =0) {
// 2, if c is not equal to 0, w write lock synchronization status is 0, cut the current thread is not the thread holding the lock, return false
if (w == 0|| current ! = getExclusiveOwnerThread())return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if(writerShouldBlock() || ! compareAndSetState(c, c + acquires))return false;
setExclusiveOwnerThread(current);
return true;
}
Copy the code
Interpretation of the
If there is a read lock, the write lock cannot be acquired. The write lock cannot be acquired until the other read thread releases the read lock. The reason for this is to ensure that the operations performed by the write lock are visible to the read lock
Write lock release
Read locks of read and write locks, shared locks that support re-entry
The write lock release process is similar to that of the exclusive lock release. Each time, the synchronization state of the write lock is reduced until it reaches 0, indicating that the write lock is released
Read lock acquisition and release
A read lock is a shared lock that supports reentrancy. Override the tryAcquireShared(int unused) method in Sync, the internal class of ReentrantReadWriteLock
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null|| rh.tid ! = getThreadId(current)) rh = readHolds.get();int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0; }}Copy the code
If another thread obtains the write lock, the current thread fails to obtain the read lock status and enters the waiting state. If the current thread obtains the write lock or the write lock is not obtained, the current thread obtains the synchronization status successfully and obtains the read lock
When the read lock is released, the synchronization status is -1 each time. When the synchronization status is 0, the read lock is released
Lock down
Lock degradation refers to the process of demoting a write lock to a read lock. When the current thread has obtained the write lock, it obtains the read lock and then releases the write lock to ensure data visibility
After the current thread A obtains the write lock, it modifies the data, then obtains the read lock, releases the write lock, and completes the lock degradation. At this time, before thread A releases the read lock, other threads cannot obtain the write lock, so they cannot make logarithmic modification, so as to ensure the visibility of data
Reference: The Art of Concurrent Programming in Java
Recommendation:
Concurrent Java programming | thread explanation