This paper mainly
Antecedents feed
The above detailed introduction of AQS source code execution process and core ideas, as follows.
- CAS
- The spin
- LockSupport.park() unpark()
- deque
AQS tryAcquire/tryRelease, tryAcquireShared/tryReleaseShared all need specific subclasses according to different strategies to achieve, and specific queuing logic, control lock and release are implemented in AQS
A subclass of AQS
1, already
- An exclusive lock
- reentrant
- Fair and unfair
- If the CAS fails to set the state, the AQS queuing logic is entered
- If the current thread is reentrant, state + 1
Usage:
ReentrantLock lock = new ReentrantLock(true);
lock.lock();
try {
//do something
} finally {
lock.unlock();
}
Copy the code
2, ReentrantReadWriteLock
- Provides ReadLock/WriteLock
- Reading and sharing, reading and writing mutually exclusive
- ReadLock. The lock () acquireShared ()
- ReadLock. The lock () acquire ()
- Suitable for scenarios where you read more and write less
An example of a cached dictionary:
class RWDictionary {
private final Map<String, Data> m = new TreeMap<String, Data>();
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock r = rwl.readLock();
private final Lock w = rwl.writeLock();
public Data get(String key) {
r.lock();
try {
return m.get(key);
}
finally{ r.unlock(); }}public String[] allKeys() {
r.lock();
try {
return m.keySet().toArray();
}
finally{ r.unlock(); }}public Data put(String key, Data value) {
w.lock();
try {
return m.put(key, value);
}
finally{ w.unlock(); }}public void clear(a) {
w.lock();
try {
m.clear();
}
finally{ w.unlock(); }}}Copy the code
3, CountDownLatch
- A Shared lock
- The core is counting, initialize state = count
- CountDown () releases the lock, state-1
- Await () : attempts to obtain the lock, and if it fails to obtain it, enters the queue and waits
- This applies to the main thread waiting for each child thread to finish executing
Example:
package com.lyqiang.aqs;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/ * * *@author lyqiang
*/
public class CountDownLatchTest {
public static void main(String[] args) throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(5);
for (int i = 0; i < 5; i++) {
Student student = new Student(i, countDownLatch);
new Thread(student).start();
}
countDownLatch.await();
System.out.println("All the students are on the bus. Please fasten your seat belts. The old driver is leaving."); }}class Student implements Runnable {
private final int sno;
private final CountDownLatch countDownLatch;
public Student(int sno, CountDownLatch countDownLatch) {
this.sno = sno;
this.countDownLatch = countDownLatch;
}
@Override
public void run(a) {
long prepare = (long) (Math.random() * 1000);
try {
TimeUnit.MILLISECONDS.sleep(prepare);
System.out.println(sno + "Student Number is ready." + prepare + "I'm on the bus in milliseconds.");
} catch (InterruptedException e) {
//
} finally{ countDownLatch.countDown(); }}}Copy the code
Output result:
Student no. 2 got on the bus 476 milliseconds later student No. 3 got on the bus 537 milliseconds later student No. 0 got on the bus 811 milliseconds later student No. 1 got on the bus 918 milliseconds later student No. 4 got on the bus 970 milliseconds later all of you are on the bus, please fasten your seat belts, the old driver is leavingCopy the code
4, StampedLock
- Read/write lock with version stamp
- TryOptimisticRead () gets the version number, and validate() verifies the version number
- During the read operation, the data can be read without locking. If the verification version number is consistent, the logic is executed. If the verification version number is inconsistent, the read lock is obtained
- Optimistic locking, which does not have to be added for every read operation, further improves performance
public class Point {
private final StampedLock stampedLock = new StampedLock();
private double x;
private double y;
public void move(double deltaX, double deltaY) {
long stamp = stampedLock.writeLock(); // Get the write lock
try {
x += deltaX;
y += deltaY;
} finally {
stampedLock.unlockWrite(stamp); // Release the write lock}}public double distanceFromOrigin(a) {
long stamp = stampedLock.tryOptimisticRead(); // Get an optimistic read lock
// Note that the following two lines are not atomic operations
// Suppose x,y = (100,200)
double currentX = x;
// we read that x=100, but x,y may be changed to (300,400) by the writer thread.
double currentY = y;
// If y is not read, it is read correctly (100,200)
// Read error if there are writes (100,400)
if(! stampedLock.validate(stamp)) {// Check for other write locks after optimistic read locks
stamp = stampedLock.readLock(); // Get a pessimistic read lock
try {
currentX = x;
currentY = y;
} finally {
stampedLock.unlockRead(stamp); // Release the pessimistic read lock}}returnMath.sqrt(currentX * currentX + currentY * currentY); }}Copy the code
5, Semaphore
- A semaphore
- Class to set the state of the number of running threads
- Obtain lock state – 1, release lock state + 1
- Sort of the idea of limiting the flow
package com.lyqiang.aqs;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/ * * *@author lyqiang
*/
public class SemaphoreTest {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 5; i++) {
Driver driver = new Driver(i, semaphore);
newThread(driver).start(); }}}class Driver implements Runnable {
private final int dno;
private final Semaphore semaphore;
public Driver(int dno, Semaphore semaphore) {
this.dno = dno;
this.semaphore = semaphore;
}
@Override
public void run(a) {
long stay = (long) (Math.random() * 10000);
System.out.println(dno + "Old driver number one drives to the entrance of the parking lot.");
try {
semaphore.acquire();
System.out.println(dno + "Old driver Number one enters the parking lot.");
TimeUnit.MILLISECONDS.sleep(stay);
System.out.println(dno + "Old driver number one stopped." + stay + "Millisecond away.");
} catch (InterruptedException e) {
e.printStackTrace();
} finally{ semaphore.release(); }}}Copy the code
Output result:
So the old driver 0 is going to drive to the entrance of the parking lot and the old driver 3 is going to drive to the entrance of the parking lot and the old driver 0 is going to drive to the entrance of the parking lot The old driver number 1 stayed for 1,346 milliseconds and the old driver number 2 entered the parking lot for 3,446 milliseconds and the old driver number 2 left the parking lot for 7,403 milliseconds and the old driver number 4 left the parking lot for 9,503 milliseconds The old driver stayed for 6,618 milliseconds and then leftCopy the code
Classes based on the ReentrantLock implementation
1, the CyclicBarrier
- Circulation barrier
- Set parties, the total number of threads
- The thread calls await, count-1, and all threads do not execute until count = 0
- And CountDownLatch distinction
- CountDownLatch is the main thread that executes down after all threads have executed
- CyclicBarrier is when all threads are ready to execute their own logic
- Example: “CountDownLatch” is to wait until everyone is on the bus before starting the train. CyclicBarrier is when everyone is ready and everyone is ready to get on the bus
package com.lyqiang.aqs;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
/ * * *@author lyqiang
*/
public class CyclicBarrierTest {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(5.new Runnable() {
@Override
public void run(a) {
System.out.println("All the students are here. The old driver is ready to go. Three, two, one."); }});for (int i = 0; i < 5; i++) {
Student student = new Student(i, cyclicBarrier);
newThread(student).start(); }}}class Student implements Runnable {
private final int sno;
private final CyclicBarrier cyclicBarrier;
public Student(int sno, CyclicBarrier cyclicBarrier) {
this.sno = sno;
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run(a) {
long prepare = (long) (Math.random() * 1000);
try {
TimeUnit.MILLISECONDS.sleep(prepare);
System.out.println(sno + "Student Number is ready." + prepare + "Milliseconds to the starting line.");
cyclicBarrier.await();
System.out.println(sno + "Step on the gas and head out.");
} catch (Exception e) {
//}}}Copy the code
Output result:
Student no. 4 gets to the starting line 353 milliseconds later student No. 0 gets to the starting line 422 milliseconds later student No. 2 gets to the starting line 669 milliseconds later student No. 3 gets to the starting line 816 milliseconds later student No. 1 gets to the starting line 962 milliseconds later everybody is here, the old driver is ready to go, 3, 2, 1, 1, let's go. 4, let's go. 0, let's go. 2, let's goCopy the code
2, CopyOnWriteArrayList
- ArrayList copied at write time
- Is a thread-safe implementation of ArrayList
- The underlying implementation of the add, set, and remove methods is to copy a new array to operate on, and then assign values to the original reference
- ReentrantLock is used in add, set, and remove
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
// Make a copy
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally{ lock.unlock(); }}public E set(int index, E element) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
E oldValue = get(elements, index);
if(oldValue ! = element) {int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len);
newElements[index] = element;
setArray(newElements);
} else {
// Not quite a no-op; ensures volatile write semantics
setArray(elements);
}
return oldValue;
} finally{ lock.unlock(); }}public E remove(int index) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
E oldValue = get(elements, index);
int numMoved = len - index - 1;
if (numMoved == 0)
setArray(Arrays.copyOf(elements, len - 1));
else {
Object[] newElements = new Object[len - 1];
System.arraycopy(elements, 0, newElements, 0, index);
System.arraycopy(elements, index + 1, newElements, index,
numMoved);
setArray(newElements);
}
return oldValue;
} finally{ lock.unlock(); }}Copy the code
3, CopyOnWriteArraySet
- A collection of sets copied at write time
- The bottom layer uses CopyOnWriteArrayList to check the existence of additional elements and ensure that there are no duplicate elements
public boolean addIfAbsent(E e) {
Object[] snapshot = getArray();
return indexOf(e, snapshot, 0, snapshot.length) >= 0 ? false :
addIfAbsent(e, snapshot);
}
Copy the code
4, ArrayBlockingQueue
- A bounded blocking queue supported by arrays
- Specify the size during initialization
- Use notEmpty, notFull to notify threads waiting for locks
- The offer and PUT methods use ReentrantLock to lock
- Offer: Returns false when the queue is full
- Put: Waits when the queue is full
- The peek, poll, and take methods also use ReentrantLock to lock
- Peek: Gets, but does not remove, the header element of the queue
- Poll: Gets and removes the header element of the queue
- Take: Gets and removes the head element of the queue, and waits if it is empty
notEmpty = lock.newCondition();
notFull = lock.newCondition();
Copy the code
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true; }}finally{ lock.unlock(); }}public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally{ lock.unlock(); }}Copy the code
public E peek(a) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex); // null when queue is empty
} finally{ lock.unlock(); }}public E poll(a) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0)?null : dequeue();
} finally{ lock.unlock(); }}public E take(a) throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally{ lock.unlock(); }}Copy the code
5, LinkedBlockingQueue
- A linked list based blocking queue
- NotEmpty and notFull are also used to notify threads waiting for locks
- The logic is basically similar to ArrayBlockingQueue, so I won’t go into details