Simple application
Java also provides the ReentrantLock class to provide an extension to synchronized, and ReentrantLock can completely replace synchronized.
static final ReentrantLock lock = new ReentrantLock();
static int i = 0;
@Override
public void run(a) {
lock.lock();
try {
for (int j = 0; j < 10000; j++) { i++; }}finally{ lock.unlock(); }}Copy the code
As you can see from the example code above, compared to synchronized, ReentrantLock needs to perform its own execution of when to lock and when to release the lock.
What is a reentrant
lock.lock();
lock.lock();
try {
for (int j = 0; j < 10000; j++) { i++; }}finally {
lock.unlock();
lock.unlock();
}
Copy the code
ReentrantLock supports the acquisition of the same lock multiple times. This provides more flexibility in locking for certain scenarios.
Important extensions
-
Able to respond to interrupts. Synchronized’s problem is that, after holding lock A, if an attempt to acquire lock B fails, the thread enters A blocked state, and in the event of A deadlock, there is no chance of waking up the blocked thread. But if the blocked thread can respond to the interrupt signal, that is, when we send the interrupt signal to the blocked thread, it can wake it up, then it has A chance to release the lock it once held.
-
Timeout is supported. If a thread does not acquire a lock for a period of time, instead of blocking, it returns an error, and the thread has a chance to release the lock it once held.
-
Acquire the lock nonblocking. If an attempt to acquire a lock fails and the thread returns instead of blocking, the thread also has a chance to release the lock it once held.
Three types of apis are provided in response
// Interrupt support API void lockInterruptibly(a) throws InterruptedException; // API that supports timeouts boolean tryLock(long time, TimeUnit unit) throws InterruptedException; // API that supports non-blocking lock acquisition boolean tryLock(a); Copy the code
These three extensions break non-preemption conditions, which can be easily resolved in some deadlock scenarios.
Example 1:static Lock lock1 = new ReentrantLock();
static Lock lock2 = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(new ThreadDemo(lock1, lock2));// The thread obtains lock 1 and then lock 2
Thread t2 = new Thread(new ThreadDemo(lock2, lock1));// The thread obtains lock 2 first and then lock 1
thread.start();
thread1.start();
thread.interrupt();// Is the first thread to interrupt
}
static class ThreadDemo implements Runnable {
Lock firstLock;
Lock secondLock;
public ThreadDemo(Lock firstLock, Lock secondLock) {
this.firstLock = firstLock;
this.secondLock = secondLock;
}
@Override
public void run(a) {
try {
firstLock.lockInterruptibly();
TimeUnit.MILLISECONDS.sleep(10);// Triggers deadlocks better
secondLock.lockInterruptibly();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
firstLock.unlock();
secondLock.unlock();
System.out.println(Thread.currentThread().getName()+"Normal end!"); }}}Copy the code
After threads t1 and T2 are started, T1 takes lock1 and then lock2. T2 occupies lock2 and then lock1. Therefore, T1 and T2 can easily form a mutual wait and deadlock occurs. The lockInterruptibly() method is used to respond to interrupts, and upon receiving the interrupt signal, the thread releases the lock it acquired.
Example 2:static ReentrantLock lock = new ReentrantLock();
@Override
public void run(a) {
try {
if (lock.tryLock(5, TimeUnit.SECONDS)) {
Thread.sleep(6000);
} else {
System.out.println("get lock failed"); }}catch (InterruptedException e) {
e.printStackTrace();
} finally {
if(lock.isHeldByCurrentThread()) { lock.unlock(); }}}Copy the code
Boolean tryLock(long time, TimeUnit Unit) receives two arguments, one for the wait time and one for the unit. Set this to 5 seconds, which means that the maximum wait time in this lock request is 5 seconds. Any longer than 5 seconds returns false.
TryLock () is used in a similar way; the current thread will attempt to acquire the lock, and if the lock is occupied by another thread, it will immediately return false.
Fair lock
Reentrant locking allows fairness to be set. It has the following constructor:
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
Copy the code
When fair is true, the lock is fair. Fair locking looks nice, but there must be some performance impact to implement fair locking. Do not use fair locks unless there is a special need.
public static ReentrantLock lock = new ReentrantLock(true);
@Override
public void run(a) {
while (true) {
try {
lock.lock();
System.out.println(Thread.currentThread().getName() + "Get the lock");
} finally{ lock.unlock(); }} Result: Thread-0Get the Thread - lock1Get the Thread - lock0Get the Thread - lock1Gets the lock...Copy the code
Condition Condition
Implementing a blocking queue:
public class BlockQueue<T> {
private int size;
final List<T> elements = new ArrayList<>();
final Lock lock = new ReentrantLock();
// Condition variable: queue dissatisfied
final Condition notFull = lock.newCondition();
// Condition variable: queue not empty
final Condition notEmpty = lock.newCondition();
public BlockQueue(a) {
this.size = Integer.MAX_VALUE;
}
public BlockQueue(int size) {
this.size = size;
}
/ / team
void enq(T x) throws InterruptedException {
lock.lock();
try {
while (elements.size() >= size){
// The queue is not satisfied
notFull.await();
}
elements.add(x);
// After joining the team, you can notify to leave the team
notEmpty.signal();
}finally{ lock.unlock(); }}/ / out of the team
T deq(a) throws InterruptedException {
lock.lock();
try {
while (CollectionUtils.isEmpty(elements)){
// The queue is not empty
notEmpty.await();
}
T t = elements.remove(0);
// If you leave the team, you can join the team
notFull.signal();
return t;
}finally{ lock.unlock(); }}}Copy the code
Threads waiting and notifying require calls to await(), signal(), and signalAll(), which have the same semantics as wait(), notify(), and notifyAll(), but do not use each other.
ReentrantLock implementation and AQS source reading
AbstractQueuedSynchronizer this class is to analyze the Java and avoid contracting out of topic, it is the basis of the implementation and contract awarding tools, is already the basis of, such as CountDownLatch, Semaphore, FutureTask class.
Will from already the class today, look at how AbstractQueuedSynchronizer work.
Already used within the Sync class to manage the lock, the inherited from AbstractQueuedSynchronizer.
abstract static class Sync extends AbstractQueuedSynchronizer {}Copy the code
AQS structure
// Wait for the head node of the queue and load lazily. It can only be modified by the setHead() method
// If the head node exists, waitStatus cannot be CANCELLED
private transient volatile Node head;
// Wait for the last node in the queue and load lazily. Can only be added via the enq() method
private transient volatile Node tail;
// The status of the lock. 0 indicates unlocked, and greater than 0 indicates locked
// This value can also be used to reentrant the lock by +1 and release the lock by -1
private volatile int state;
/ / the thread holding the lock inherited from AbstractOwnableSynchronizer
private transient Thread exclusiveOwnerThread;
Copy the code
All thread instances in the waiting queue are wrapped as a Node, and the data structure is a linked list. The structure is as follows:
// The status can be:
// SIGNAL=-1 this node's successor will be awakened
CANCELLED=1 The node was CANCELLED due to timeout or interruption
// CONDITION=-2 indicates that the node is in the CONDITION queue
// PROPAGATE=-3
// 0 Initial or release
volatile int waitStatus;
// Predrive node
volatile Node prev;
// The successor node
volatile Node next;
// The current thread
volatile Thread thread;
// The next condition queue waits for the node
Node nextWaiter;
Copy the code
Those are the AQS data structures, and with that in mind, let’s look at how ReentrantLock is implemented internally (using an unfair lock as an example).
Thread rob lock
// NonfairSync provides two methods to analyze its internal implementation
static final class NonfairSync
extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/ / lock
final void lock(a) {
// CAS performs the gun lock. If the lock is successfully grabbed, the CAS returns the lock
if (compareAndSetState(0.1))
setExclusiveOwnerThread(Thread.currentThread());
else
/ / this method inherited from AbstractQueuedSynchronizer class
// Determine whether the lock is released, if the CAS is directly snatched
acquire(1);
}
// An unfair attempt to acquire the lock
protected final boolean tryAcquire(int acquires) {
// Execute Sync's nonfairTryAcquire method
returnnonfairTryAcquire(acquires); }}// AbstractQueuedSynchronizer.acquire()
public final void acquire(int arg) {
// Try tryAcquire(1) to see if it works
if(! tryAcquire(arg) &&TryAcquire failed to suspend the thread and put it in the waiting queue
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
TryAcquire returns to NonfairSync's tryAcquire method
// Sync.nonfairTryAcquire()
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
State =0, no thread currently holds the lock
if (c == 0) {
// CAS to grab the lock, if not successful, someone at the same time to grab the lock
if (compareAndSetState(0, acquires)) {
// Snatch the lock and mark the current thread as the lock holder
setExclusiveOwnerThread(current);
return true; }}// This branch will be entered, indicating that the lock is reentrant, requiring state+1
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// The lock has not been acquired
return false;
}
// Suppose nonfairTryAcquire returns false
AcquireQueued (addWaiter(node.exclusive), arg))
AddWaiter (node.exclusive)
// AbstractQueuedSynchronizer.addWaiter()
// Node.EXCLUSIVE indicates EXCLUSIVE mode
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// The following code is to put the new node at the end of the blocking queue
Node pred = tail;
// If the precursor node is not empty
if(pred ! =null) {
// Set the front of the current node to the end of the queue
node.prev = pred;
// Use CAS to set the current node to the end of the queue. If successful, tail=node
if (compareAndSetTail(pred, node)) {
pred.next = node;
returnnode; }}// There are two ways to get here
// 1.pred==null
// 2. Failed to set the CAS queue end
// In both cases,AQS uses spin to join the team
enq(node);
return node;
}
// AbstractQueuedSynchronizer.enq()
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 1. The head and tail nodes are initialized to null
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 2. Use CAS to add the current node to the end of the queue
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
returnt; }}}}// At this point, the new node has been added to the blocking queue, and the acquireQueued() method will be executed
// This method suspends the thread and wakes it up
// AbstractQueuedSynchronizer.acquireQueued()
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// Keep spinning until true is returned
for (;;) {
// Get the precursors of the current node, head=tail if initialized
// Head is not a member of the blocking queue, but is the node that currently holds the lock
final Node p = node.predecessor();
// If the current node is equal to head and the attempt to obtain the lock succeeds
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// There are two cases here
// 1. Node is not head
// 2. Failed to grab the lock
// The thread will be suspended in either case
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true; }}finally {
// If it fails, an exception may occur.
if(failed) cancelAcquire(node); }}// AbstractQueuedSynchronizer.shouldParkAfterFailedAcquire()
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// If waitStatus == -1, the status of the precursor node is normal and the current thread needs to be suspended
if (ws == Node.SIGNAL)
return true;
// If the waitStatus of the precursor node is >0, it indicates that the precursor node is cancelled due to timeout or interruption
// All you need to do is loop to remove the cancelled precursor until waitStatus of the precursor <=0
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// Set the value of the precursor node to -1, indicating that there are successor nodes waiting to be awakened
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// AbstractQueuedSynchronizer.parkAndCheckInterrupt()
// This method is simple: suspend the thread
private final boolean parkAndCheckInterrupt(a) {
LockSupport.park(this);
return Thread.interrupted();
}
// This is where the entire acquire() method ends
// Thread snatching with a non-fair lock is divided into the following steps:
// 1. CAS directly grabs the lock, grabs the lock successfully, marks the thread holding the current lock, and returns
// 2. If acquire(1) fails, execute acquire(1). This method determines whether the lock has been released, and if so, acquire(1)
// 3. Failed to grab the lock or the lock is not released, the current thread is added to the waiting queue and suspended
Copy the code
The above is the thread grab the lock logic, the code is very long, may need the reader to see several times.
Thread lock release
/ / unlock
public void unlock(a) {
sync.release(1);
}
// AbstractQueuedSynchronizer.unparkSuccessor().release()
public final boolean release(int arg) {
// Perform the unlock operation. If successful, the head node is not empty and waitStatus! =0 will wake up the successor node of the head node
if (tryRelease(arg)) {
Node h = head;
if(h ! =null&& h.waitStatus ! =0)
unparkSuccessor(h);
return true;
}
return false;
}
// Sync.tryRelease()
// The unlock operation is implemented by subclass Sync
protected final boolean tryRelease(int releases) {
// state-1
int c = getState() - releases;
// Throw an exception if the current thread does not own the lock
if(Thread.currentThread() ! = getExclusiveOwnerThread())throw new IllegalMonitorStateException();
boolean free = false;
// Set the thread holding the lock to null and return true, otherwise
// There are also nested locks, which cannot be released
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
// AbstractQueuedSynchronizer.unparkSuccessor()
// Wake up the successor node of the head node
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// Clear the header status
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// Check whether the successor of the head node is null and has no interrupt. If not, loop until a satisfying successor is found
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
s = t;
}
// If the successor node is not null, wake up
if(s ! =null)
LockSupport.unpark(s.thread);
}
// Thread unlocking is much easier than locking, so no more
Copy the code
conclusion
In concurrent conditions, locking and unlocking are mainly accomplished by following:
- Lock state: If the lock state is 0, it means that the lock can be idle and the lock can be grabbed. Then AQS performs CAS operation and sets state to 1, which means that the lock can be grabbed. For lock reentrance state is increased by 1, and for unlocking state is decreased by 1 until state is equal to 0.
- Thread suspend and thread wake up: LOCKsupport.park () and locksupport.unpark () are used in AQS.
- Blocking queue: when one thread acquires a lock, other threads need to wait. AQS provides a linked list of FIFOs to do this.