1, the introduction of
AbstractQueuedSynchronizer is the cornerstone of the Java synchronizer, in order to faster, more worry custom template of synchronization lock; This also is introduced CountDownLatch, Semaphore, they also realize AbstractQueuedSynchronizer, I think it is a kind of logical lock, or a condition of synchronization between threads measures; CyclicBarrier is also an application of locks, which helps us to better understand and use locks and build a locking mechanism suitable for our own application
2, Semaphore
In simple terms, it allows a finite number of thread resources at the same time.
2.1 Usage Mode
Method of obtaining resources
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
Copy the code
Resource Release method
public void release() {
sync.releaseShared(1);
}
Copy the code
2.2 Source code analysis:
Set the value of state during initialization
Sync(int permits) {
setState(permits);
}
Copy the code
Release the lock, at this time only to verify that the value after setting exceeds the limit value, if not, set
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true; }}Copy the code
Not fair model for locks: to test the current state value if greater than or equal to 0, then acquiring a lock and reset the state, or queued (see why queuing hangs, AbstractQueuedSynchronizer)
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
returnremaining; }}Copy the code
Fair mode lock acquisition: Fair is based on the first come first served, if there is data in the queue, queue; otherwise, if the current status value is greater than or equal to 0, lock acquisition and reset state; if it is less than 0, queue waiting
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return- 1; int available = getState(); int remaining = available - acquires;if (remaining < 0 ||
compareAndSetState(available, remaining))
returnremaining; }}Copy the code
2.3 Source Code Summary
The state value of AQS is used as the threshold value to control whether the lock can be obtained
3 CountDownLatch
In simple terms, when the condition (state == 0) is met, either the thread waiting for the queue or the thread trying to acquire the lock can be executed, that is, state == 0 is used as the threshold to control whether the resource is open
3.1 Usage
Use method is not used in pairs, need to obtain the lock thread, to try to obtain the lock on the line, and it can perform the conditions met, in the need for logic processing; When the conditions are met, the lock opens naturally
Call this method for threads that require conditions to proceed
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
Copy the code
Subtracting state by 1 in other logic,
public void countDown() {
sync.releaseShared(1);
}
Copy the code
3.2 Synchronizer implementation
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return(getState() == 0) ? 1:1; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zerofor (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
returnnextc == 0; }}}Copy the code
- The initial value of state is passed through the constructor
- If state == 0, you have the execution permission
- If the value of state -1 is 0, it needs to be released. Otherwise, no processing is performed
3.3 Principles
- The initial state must be greater than 0 or the waiting thread will never execute
- The thread can execute only when state == 0
- State == 0, state == 0, state == 0
- Thread nodes in the queue are woken up when state is reset to 0
4, CyclicBarrier
If the current attempt to obtain the conditional lock and the number of lock waiting queue meet the requirements, an additional task can be performed, and the original waiting thread will be released soon. If the extra task is empty, perhaps the thread executes the same logic as the normal ReentrantLock, otherwise the process repeats; You can use the reset method to terminate a process in a loop and restart a new loop
4.1 the use of
Initialize the
Number of waiting threads, and the additional tasks to be performed after the number is satisfied
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
Copy the code
Wait or extra tasks perform logic
Call power to wait for queuing
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
Copy the code
4.2 Source Code Analysis
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException,TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) {
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command! = null) command.run(); ranAction =true;
nextGeneration();
return 0;
} finally {
if(! ranAction)breakBarrier(); }}for (;;) {
try {
if(! timed) trip.await();else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else{ Thread.currentThread().interrupt(); }}if (g.broken)
throw new BrokenBarrierException();
if(g ! = generation)return index;
if (timed && nanos <= 0L) {
breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); }}Copy the code
- Method uses ReentrantLock internally for queuing
- If the number of threads queuing up, which is count–, count– if count is zero, then you do extra work
- If the extra task is empty, the breakBarrier method, which removes the current limit, is used to perform additional tasks. Otherwise, the nextGeneration method repeats the process for the next wave of threads
- If the number of queues is not reached, suspend and wait to wake up (breakBarrier, nextGeneration, reset methods).
- If the number of queues reaches the threshold, the value is returned directly
- The above points are just waiting without a timeout limit; If there is a timeout limit on the wait, if the time is exceeded, the thread will come out of the queue and continue
Reset method: Interrupts the current process and then repeats it
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // breakthe current generation nextGeneration(); // start a new generation } finally { lock.unlock(); }}Copy the code
4.3 Principle Summary
- Multiple threads are teamed up to perform additional tasks
- If the extra task is empty, there is only one loop
- Not empty, multiple tasks, one extra task mode always loop
Technology changes quickly, but the basic technology, the theoretical knowledge is always the same; The author hopes to share the basic knowledge of common technical points in the rest of his life. If you think the article is well written, please pay attention and like it. If there are mistakes in the article, please give me more advice!