“This is my 28th day of participating in the First Challenge 2022. For details: First Challenge 2022.”

The principle of shared acquisition and release of synchronous state in AQS and the simple realization of exclusive lock and shared lock are introduced in detail.

AQS Related articles:

AQS (AbstractQueuedSynchronizer) source depth resolution (1) – design and general structure of AQS

AQS (AbstractQueuedSynchronizer) source depth resolution (2) – Lock interfaces and implementation of custom Lock

AQS (AbstractQueuedSynchronizer) source depth resolution (3) – synchronous queue and exclusive access to the principle of lock, lock is released [ten thousand words]

AQS (AbstractQueuedSynchronizer) source depth resolution (4), the principle of Shared locks, lock is released [ten thousand words]

AQS (AbstractQueuedSynchronizer) source depth resolution (5) – condition queue waiting, the realization of the notification and summary of AQS [ten thousand words]

In the previous article, we introduced synchronous queues and the principle of exclusive lock acquisition and lock release. Now we will take a look at the principle of shared lock acquisition and lock release and how to implement a more advanced custom exclusive and shared lock.

1 acquireShared The shared lock is obtained

The difference between shared and exclusive acquisition is whether multiple threads can acquire the lock at the same time.

The exclusive lock implementation uses an exclusiveOwnerThread attribute to record which thread currently holds the lock. When an exclusive lock is already held by a thread, other threads must wait for it to be released before they can claim the lock, and only one thread can claim the lock at a time.

For shared locks, if one thread succeeds in acquiring the shared lock, then other threads waiting on the shared lock can also attempt to acquire the lock, and will most likely succeed. Components based on shared implementations include CountDownLatch, Semaphore, and so on.

The lock is shared by calling AQS ‘acquireShared template method, which also does not respond to interrupts. In fact, if you understand the source of the exclusive lock, then read the source of the shared lock is very simple.

The steps are as follows:

  1. First use tryAcquireShared to try to obtain the lock. If the lock is obtained successfully (the return value is greater than or equal to 0), the lock is returned directly.
  2. Otherwise, call doAcquireShared to encapsulate the current thread as a Node in Node.shared mode and add it to the end of the AQS synchronization queue, then “spin” to try to get the lock, if not, then use the park method to suspend yourself and wait to be woken up.
/** * Shared access to lock template method, does not respond to interrupt **@param* / arg parameter
public final void acquireShared(int arg) {
    // Try calling the tryAcquireShared method to get the lock
    If the value is greater than or equal to 0, the value is returned.
    if (tryAcquireShared(arg) < 0)
        // Call doAcquireShared to encapsulate the current thread as a Node of node. SHARED type and add it to the end of the AQS queue.
        // Then spin tries to get the synchronization state, and if it still can't, eventually suspends itself using the Park method.
        doAcquireShared(arg);
}
Copy the code

1.1 tryAcquireShared Attempts to obtain the shared lock

The tryAcquireShared method is a subclass of AQS that we implemented to try to get a shared lock, generally for state changes, or reentrant lock checks, etc. Different locks have their own logic. We won’t cover that here, but we’ll cover it later when we talk about specific locks (such as CountDownLatch).

Return a value of type int (such as the remaining state value – the number of resources).

  1. If the return value is less than 0, the current thread fails to share the lock.
  2. If the return value is greater than 0, the current thread shared the lock successfully, and subsequent attempts by other threads to acquire the lock are likely to succeed.
  3. If the return value is 0, the current thread succeeded in sharing the lock, but subsequent attempts by other threads to acquire the lock fail.
    1. In fact, in a real implementation of AQS, even if the return value is zero at one point, subsequent attempts by other threads to acquire the shared lock may succeed. That is, after a thread acquires the lock and returns a value equal to 0, another thread immediately releases the lock, resulting in the actual number of locks available is greater than 0. At this time, successors can still try to acquire the lock.

In AQS, the implementation of tryAcquireShared throws an exception, so subclass override:

protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}
Copy the code

1.2 doAcquireShared Spins to obtain the shared lock

The doAcquireShared method is called after the first call to the tryAcquireShared method fails to obtain the lock. Similar to a combined version of the addWaiter and acquireQueued methods in the exclusive acquire lock method!

The steps are as follows:

  1. Call the addWaiter method to encapsulate the current thread as a Node in Node.SHARED mode and add it to the end of the AQS synchronization queue.
  2. This is followed by logic similar to the acquireQueued method, where the node spins to try to acquire the shared lock. If you still can’t get it, you end up using the Park method to suspend yourself and wait to be woken up.

The requirement that each node can try to acquire the lock is that the precursor node is the head node, so it is itself the second node in the entire queue, and every node that obtains the lock must be the head node. Therefore, if a second node is suspended because it does not meet the conditions and does not obtain the shared lock, the subsequent nodes must not obtain the shared lock even if they meet the conditions.

/** * The spin attempts to acquire the lock in SHARED mode, which may suspend after a period of time. ** The difference between exclusive and SHARED mode is that ** 1 adds nodes in SHARED mode. * 2 After acquiring the lock, modifies the current header and propagates the information to subsequent Node queues **@param* / arg parameter
private void doAcquireShared(int arg) {
    /*1 the difference between the addWaiter method and the exclusive getTable method 1: Add a Node */ in SHARED mode node.shared
    final Node node = addWaiter(Node.SHARED);
    SetHead propagate method = setheadPropagate method = setheadpropagate method = setheadpropagate method = setheadpropagate method = setheadpropagate method = setheadpropagate method = setheadpropagate method = setheadpropagate method = setheadpropagate method = setheadpropagate method = setheadpropagate method = setheadpropagate method
    // The current thread failed to acquire the lock flag
    boolean failed = true;
    try {
        // The interrupt flag of the current thread
        boolean interrupted = false;
        for(; ;) {// Get the precursor node
            final Node p = node.predecessor();
            /* Attempts to acquire locks in shared mode when the precursor is a header */
            if (p == head) {
                int r = tryAcquireShared(arg);
                /* If the return value is greater than or equal to 0, the lock is obtained
                if (r >= 0) {
                    /* And exclusive fetch 2: modify the current header, based on propagation state to determine whether to wake up the successor. * /
                    setHeadAndPropagate(node, r);
                    // Release the precursor that has acquired the lock
                    p.next = null;
                    /* Check to set interrupt flag */
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return; }}/* Determine whether or not to suspend, and the suspended method is exactly the same logic as the acquireQueued method and will not respond to an interrupt */
            if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                interrupted = true; }}finally {
        if(failed) cancelAcquire(node); }}Copy the code

As can be seen from the source code, the main differences with exclusive acquisition are:

  1. AddWaiter adds a Node in SHARED mode node.shared.
  2. After obtaining the lock, call setHeadAndPropagate to set the line head node, and then judge whether to wake up the next node according to the propagation state.

1.2.1 setHeadAndPropagat Sets nodes and propagates information

The setHeadAndPropagat method is called after the node thread succeeds in acquiring the shared lock. Compared to the setHead method, the propagate operation is performed one more step after the head is set:

  1. Set the new head as setHead does
  2. Determine whether to wake up the successor node according to the propagation state.

1.2.1.1 doReleaseShared wakes up the successor node

DoReleaseShared is used to wake up successor nodes in shared mode.

The analysis for PROPAGATE is listed in the summary section below!

/** * The core method of shared lock acquisition, tries to wake up a subsequent thread, the thread that is woken up will try to acquire the shared lock, if successful, it is possible to call setHeadAndPropagate again, propagate the wake up. * an exclusive lock only after a thread to release will wake up the next thread, and Shared in a thread lock in acquiring the lock and release the lock, lock may call this method to wake up the next thread * because in a Shared lock mode, the lock can be jointly owned by multiple threads, now that the current thread has got the Shared lock, You can then notify the successor directly to acquire the lock, rather than waiting for the lock to be released. * /
private void doReleaseShared(a) {
    /* an infinite loop. The condition for breaking out of the loop is break*/
    for(; ;) {// Get the current head. Each loop reads the latest head
        Node h = head;
        // If h is not null and h is not tail, indicating that the queue has at least two nodes, then try to wake up the head successor thread
        if(h ! =null&& h ! = tail) {int ws = h.waitStatus;
            // If the state of the header node is SIGNAL, then the successor node needs to be woken up
            if (ws == Node.SIGNAL) {
                // Try CAS to set the state of h from node. SIGNAL to 0
                // There may be multiple operations, but only one of them will succeed
                if(! compareAndSetWaitStatus(h, Node.SIGNAL,0))
                    // The failed thread completes the loop and continues the next loop
                    continue;            // loop to recheck cases
                // The successful thread will call the unparksucceeded method to wake up an uncancelled successor of the head
                // For a head, only one thread is needed to wake up the head's successor. The CAS above is to ensure that the unparksucceeded methods are executed only once for a head
                unparkSuccessor(h);
            }
            /* * If the h state is 0, it means that the thread on the next node is already in the wake state or will be woken up, it is not needed to wake up * then try to set the h state from 0 to PROPAGATE, if it fails, continue the next loop, At this point, the PROPAGATE state is set to ensure that the propagation of the wake up operation can be continued * because the PROPAGATE state of the original head node <0 can be read in the PROPAGATE method when the next node becomes the head node, so that it can try to wake up the next node (if it exists) * */
            else if (ws == 0 &&
                    !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                // The failed thread completes the loop and continues the next loop
                continue;                // loop on failed CAS
        }
        Succeeding to this step indicates that in the preceding judgment the queue may have only one node, or the unparksucceeded method has been called, or h state is PROPAGATE(no need to wake up the successor)
        // Check again to see if h is still the latest head. If not, loop again; If so, head has not changed and exits the loop
        if (h == head)                   // loop if head changed
            break; }}Copy the code

2 reaseShared Shared release lock

The shared lock is released by calling the releaseShared template method. The general steps are as follows:

  1. Call tryReleaseShared to try to release the shared lock, which must be thread-safe.
  2. If the lock is released, the doReleaseShared method is called to wake up the successor nodes and propagate the wake.

Support for Shared synchronous components (i.e., multiple threads access) at the same time, the primary difference between them and the exclusive must ensure that the release of the lock is tryReleaseShared method is thread-safe, because now that is more than one thread can access, there will be multiple threads is released, will need to ensure the safety of the release time of threads).

Since we implemented the tryReleaseShared method ourselves, we needed to implement thread-safety ourselves, so CAS is often used to release synchronization state.

/** * Template method for releasing locks in shared mode. *, and */ is called if successfully released
public final boolean releaseShared(int arg) {
    //tryReleaseShared releases lock resources. This method is implemented by subclasses themselves
    if (tryReleaseShared(arg)) {
        // If the release succeeds, doReleaseShared must be called to try to wake up the successor
        doReleaseShared(); 
        return true;
    }
    return false;
}
Copy the code

3 acquireSharedInterruptibly Shared interruptible acquiring a lock

The exclusive lock acquireShared method analyzed above does not respond to interrupts. But AQS provides another acquireSharedInterruptibly template method, call the method of threads waiting for the lock, if the current thread is interrupted, will return immediately, and throws InterruptedException.

/** * Shared interruptible lock template method **@paramArg parameter *@throwsInterruptedException Throws this exception */ if the thread is interrupted
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    If the current thread is interrupted, clear the interrupted state and throw an exception
    if (Thread.interrupted())
        throw new InterruptedException();
    // Try to get the lock
    if (tryAcquireShared(arg) < 0)
        / / can't get is executed doAcquireSharedInterruptibly method
        doAcquireSharedInterruptibly(arg);
}
Copy the code

3.1 doAcquireSharedInterruptibly Shared interruptible acquiring a lock

The internal operation of this method is similar to doAcquireShared, with the slight difference that the following suspended thread will be handled differently if it returns because the thread has been interrupted.

Shared uninterruptible lock acquisition simply records this status, interrupted = true, and the lock acquisition cycle continues. Shared interruptible lock acquisition throws an exception directly, so you jump right out of the loop to execute the finally block.

/** * in shared interruptible mode. * *@param* / arg parameter
private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
    /* The internal operation is similar to doAcquireShared, with a slight difference */
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for(; ;) {final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return; }}if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                If the thread is interrupted, only this status is recorded, interrupted = true, Then the loop continues to acquire the lock * * but in this shared interruptible lock acquisition method * if the thread is interrupted, an exception is thrown directly here, so the finally block * */ breaks out of the loop directly
                throw newInterruptedException(); }}/* The finally block */ is executed when a lock is acquired or an exception is thrown
    finally {
        /* If the lock fails to be obtained. CancelAcquire cancelAcquire cancelAcquire cancelAcquire cancelAcquire cancelAcquire cancelAcquire cancelAcquire
        if(failed) cancelAcquire(node); }}Copy the code

4 tryAcquireSharedNanos Shared Timeout obtaining lock

Shared lock timeout for tryAcquireSharedNanos template method can be regarded as a Shared interrupt response “enhanced”, acquiring the lock acquireSharedInterruptibly method support, support timeout!

/** * Shared timeout lock, support interrupt **@paramArg parameter *@paramNanosTimeout Timeout, nanoseconds *@returnCheck whether the lock is successfully obtained *@throwsInterruptedException Throws an InterruptedException */ if interrupted

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    If the current thread is interrupted, clear the interrupted state and throw an exception
    if (Thread.interrupted())
        throw new InterruptedException();
    / / the following is a | | operation for short circuit connection code
    //tryAcquireShared attempts to obtain the lock, and returns true
    // Execute the doAcquireSharedNanos method if the left expression is false
    return tryAcquireShared(arg) >= 0 ||
            doAcquireSharedNanos(arg, nanosTimeout);
}
Copy the code

4.1 doAcquireSharedNanos Shared Timeout Obtain lock

The doAcquireSharedNanos (Int ARG, Long nanosTimeout) method adds the feature of timeout fetch on the basis of supporting interrupt response.

In this method, in the spin process, when the node’s precursor node is the head node, it tries to acquire the lock, and if it succeeds in obtaining the lock, it will return from this method. This process is similar to the process of shared synchronous acquisition, but there are differences in the processing of lock acquisition failure.

If the current thread fails to acquire the lock, determine whether it has timed out (nanosTimeout is less than or equal to 0, indicating that it has timed out), if not, recalculate the timeout interval nanosTimeout, and then make the current thread wait nanosTimeout nanoseconds (when the set timeout has been reached, The thread returns from locksupport. parkNanos(Objectblocker,long Nanos).

If nanosTimeout is less than or equal to the spinForTimeoutThreshold (1000 nanoseconds), the thread is not made to wait out the timeout, but to enter a fast spin process. The reason is that very short timeout waits cannot be very precise, and if timeout waits are made at this point, nanosTimeout timeouts as a whole appear imprecise.

Therefore, in the case of very short timeouts, the AQS will go into unconditional fast spin instead of suspending the thread.

static final long spinForTimeoutThreshold = 1000L;

/** * In shared timeout mode. * *@paramArg parameter *@paramNanosTimeout Remaining timeout, nanoseconds *@returnTrue on success; * or false on failure@throwsInterruptedException Throws an InterruptedException */ if interrupted
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    If the remaining timeout period is less than or equal to 0, the value is returned directly
    if (nanosTimeout <= 0L)
        return false;
    // Can wait to get the last nanosecond time
    final long deadline = System.nanoTime() + nanosTimeout;
    // Also call addWaiter to add the current thread as a node to the end of the synchronization queue
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        /* As with the shared non-interruptible method doAcquireShared, spin gets the lock */
        for(; ;) {final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return true; }}/* This is the difference */
            // If the new remaining timeout is less than 0, exit the loop and return false to indicate that the lock was not acquired
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            // If suspend is required and the remaining nanosTimeout is greater than spinForTimeoutThreshold, i.e., greater than 1000 nanoseconds
            if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                // Then call the locksupport. parkNanos method to suspend nanosTimeout from the current thread
                LockSupport.parkNanos(this, nanosTimeout);
            // If the thread is interrupted, throw an exception
            if (Thread.interrupted())
                throw newInterruptedException(); }}/* The finally block */ is executed when a lock is acquired, a timeout expires, and an exception is thrown
    finally {
        /* If the lock fails to be obtained. CancelAcquire () : cancelAcquire () : cancelAcquire () : false * */
        if(failed) cancelAcquire(node); }}Copy the code

5 Summary of shared lock acquisition/release

We can call acquireShared template method to get an interrupt sharing lock, can call acquireSharedInterruptibly template method to interruptible access to a Shared lock, You can call the tryAcquireSharedNanos template method to interrupt and timeout the shared lock. To do so, you need to override the tryAcquireShared method. You can also call the releaseShared template method to release the shared lock, before overriding the tryReleaseShared method.

For shared locks, because locks can be acquired by multiple threads at the same time. If one thread succeeds in acquiring the shared lock, then other threads waiting on the shared lock can also attempt to acquire the lock, and will most likely succeed. Therefore, doReleaseShared must be called to try to wake up subsequent nodes when a node thread successfully releases the shared lock, and doReleaseShared may also be called to try to wake up subsequent nodes when a node thread successfully obtains the shared lock.

Components based on shared implementations include CountDownLatch, Semaphore, ReentrantReadWriteLock, and so on.

5.1. The Node. The PROPAGATE analysis

5.1.1 Timing

The doReleaseShared method may be executed after the thread has successfully acquired the shared lock and must be executed after the thread has successfully released the shared lock.

In the doReleaseShared method, it may be possible to set the thread state to PROPAGATE, however, it is also the only place in the whole AQS class that is directly involved in the PROPAGATE state, and it is only set, the direct use of the state is never seen anywhere else. Since the status value is -3, it is possible that this is included in the determination of the waitStatus size range in other methods (guess)!

The direct code for PROPAGATE is as follows:

else if (ws == 0&&! compareAndSetWaitStatus(h,0, Node.PROPAGATE))
    continue;                // loop on failed CAS
Copy the code

Else if branch needs to be entered first, then ws(which is not necessarily the latest reference in the source code) needs to be 0, then try CAS to set the Node state to PROPAGATE, which may fail. After a failure, simply continue to continue the next loop.

For the effect of the Node.PROPAGATE state, there are different opinions, I have read a lot of articles, many of them seem to be reasonable, but there are some mistakes after careful thinking, here, I don’t do too much personal analysis, first let’s look at the else if branch and WS is 0 what kind of case!

The initial conditions

Suppose that the implementation of A shared lock allows A maximum of three threads to hold the lock, at this time, threads A, B and C have acquired the lock, and there is A suspended node thread D in the synchronization queue waiting for the release of the lock, and the queue structure is as follows:

If thread A then releases the lock, A will call the doReleaseShared method, but obviously A will go into the if block, change the state of the head to 0 and call the unparksucceeded thread, in this case obviously D.

At this point, the synchronization queue structure is:

Case 1

If thread B and thread C both release the lock, then both thread B and thread C will call the doReleaseShared method, assuming they are executing at the same speed, then they will both go into the else if, because the head state is now 0, then they will both call CAS to change the 0 to PROPAGATE, Only one thread will succeed and the other will fail.

So that’s a case where when you release the lock, you go else if. If multiple lock-releasing nodes operate the same head, only one of them can eventually call the unparksucceeded in if, the others will all fail and eventually go to the else IF. In the same way, else if may be entered when acquiring the lock for the above reasons.

Case 2

If a new node E is added, the addWaiter will be added to D as a new tail node since the lock has not been acquired.

Then the Node will try in shouldParkAfterFailedAcquire method E will not cancel the precursors of D waitStatus modification for the Node. SIGNAL, and then hang up.

Executed addWaiter, so in the new node E shouldParkAfterFailedAcquire before, this time synchronization queue structure as follows:

Since A has released the lock, thread D will wake up and call tryAcquireShared to acquire the lock, which will return 0.

At this point, if B releases the lock again, it will be “even if at some point the return value is equal to 0, the subsequent attempts by other threads to acquire the shared lock may be successful”. That is, after a thread acquires the shared lock and returns a value equal to 0, another thread holding the lock immediately releases the lock, resulting in the number of locks that can be acquired is greater than 0. In this case, the subsequent thread can still try to acquire the lock.

That’s a digression. Let’s get back to the text. If B releases the lock at this point, then the doReleaseShared method must still be used, because in the initial case, the state of head has been changed to 0 by A, then B will still use else if to change the state to node.propagate.

At this time, thread D will go to the setHeadAndPropagate method after acquiring the lock. After the sheHead method is called, the structure of this time is as follows (suppose thread E is inefficient during this period due to resource allocation and has not changed the state of the precursor D to -1, Or thread E has not been allocated a time slice due to a single-core CPU thread switch) :

After sheHead, it will determine whether it is necessary to call the doReleaseShared method to wake up subsequent threads.

propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || 
h.waitStatus < 0
Copy the code

According to the structure, only the third condition H.waitStatus <0 is met, at which point thread D can call doReleaseShared to wake up the subsequent Node, in this process, the key is that thread B sets the state of the old head to node.propagate, that is, -2, less than 0, At this point, the wake up can be propagated, otherwise the awakened thread A will not call the doReleaseShared method because it does not meet the condition!

Perhaps this is one of the cases in which the so-called Node.propagate might PROPAGATE the awakening?

However, when the thread D that obtains the lock at this time calls the doReleaseShared method, the head state is already 0, so the else if is directly entered to change the state to node.propagate, which means that at this time the subsequent Node does not need to wake up, but the wake operation needs to continue to PROPAGATE.

This is also the case where the doReleaseShared method goes directly to else if for the first time as head when acquiring the lock.

Case 3

Since A releases the lock, if D acquires the lock and the method completes, then the synchronization queue structure is as follows:

Now there is a new node E, and since there is no lock, we call addWaiter to add the new tail node after the head node.

Then Node will try in shouldParkAfterFailedAcquire method E will not cancel the precursors of head waitStatus modification for the Node. The SIGNAL, and then hang up.

Executed addWaiter, so in the new node E shouldParkAfterFailedAcquire before, this time synchronization queue structure as follows:

At this time, thread A tries to release the lock, the doReleaseShared method will be called after the lock is successfully released. At this time, the head state is already 0, so the else if is directly entered to change the state to node.propagate, which means that the subsequent Node does not need to wake up at this time. But the wake up operation needs to be propagated further.

This is also the case where the doReleaseShared method first goes to else if as head when the lock is released.

5.1.2 summary

Here are a few cases where else if can occur, and there may be more:

  1. Multiple threads concurrently operate on the same head in the doReleaseShared method, and the head does not change during that time. Then the first thread could succeed in setting the if to 0 and then wake up by calling the unparksucceeded thread. The next thread, because its state was 0, could only do the else IF. This can happen for either the doReleaseShared method that acquires the lock or releases the lock! When this happens, the first time a doReleaseShared method uses a node as its head, the else if is not entered. It must only be entered when subsequent threads use the same node as their head.
  2. For the doReleaseShared method that acquires the lock, there is a case in which the first node in the doReleaseShared method goes directly to the else if as head. Let D be the tail of the old queue with a state of 0, and then a new node E comes along, after the thread of the new node E calls addWaiter, ShouldParkAfterFailedAcquire before (before modify the state of the precursor D to 1) the scope of this special time, capturing the thread lock of the node D and now became the new head node, and the former head of state of node value less than zero, This is where doReleaseShared is called when the lock is acquired and else if is directly entered, which is extremely demanding. Maybe it doesn’t exist, but what’s wrong with my analysis?
  3. For the doReleaseShared method that releases the lock, there is a case where the node (a) is the head of the doReleaseShared method and goes directly to the else if. Node D is set as the tail of the original queue, and the status value is 0 and the lock has been obtained. E, then to new nodes in new node E thread calls addWaiter (join the queue became the new tail), after shouldParkAfterFailedAcquire before (before modify the state of the precursor D to 1) the scope of this special time, the node D thread to release the lock, This is where doReleaseShared is called when the lock is released and else If is directly entered, which is extremely demanding. Maybe it doesn’t exist, but what’s wrong with my analysis?

So according to the above situation, even if there is no else if judgment or if there is no setting of the Node.PROPAGATE state, there is no major problem for the wake up of the following nodes eventually, and it will not cause the queue inactivation.

With the Node.PROPAGATE state setting, the immediate result may be an increase in the number of doReleaseShared method calls, but also an increase in the number of invalid, meaningless wakes. In the setHeadAndPropagate method, we can find the description in the source comment for determining whether the propagate method needs to wake up the next generation:

The conservatism in both of these checks may cause unnecessary wake-ups, but only when there are multiple racing acquires/releases, so most need signals now or soon anyway.

That is, these judgments might result in meaningless awakenings, but if the doReleaseShared method is called a lot, it’s like multiple threads trying to wake up subsequent threads, which might speed up lock acquisition? Or is the code here just a more general way to ensure correctness? In fact, there is a lot of code in AQS that might cause meaningless calls!

6 simple implementation of lock

6.1 Implementation of reentrant exclusive lock

In the beginning we implemented a simple non-reentrant exclusive lock, now we try to implement a reentrant exclusive lock, which is actually relatively simple!

The state value of AQS indicates how many times the thread has reentered the lock. By default, a state value of 0 indicates that the current lock is not held by any thread. When a thread acquires the lock for the first time, it attempts to use CAS to set state to L. If CAS succeeds, the current thread acquires the lock, and records that the holder of the lock is the current thread. After the thread acquires the lock a second time without releasing it, the status value is set to 2, which means the reentrant count is 2. When the thread releases the lock, it tries to use CAS to decrease the state value by 1, and if the state value is 0 by l, the current thread releases the lock.

In the case of a reentrant exclusive lock, a lock acquired several times needs to be released several times, otherwise other threads will block due to incomplete lock release!

/ * * *@author lx
 */
public class ReentrantExclusiveLock implements Lock {
    /** * combine the implementation of AQS inside the implementation of lock */
    private static class Sync extends AbstractQueuedSynchronizer {
        /** * Rewrite the isHeldExclusively method **@returnWhether the lock is occupied */
        @Override
        protected boolean isHeldExclusively(a) {
            // State is equal to 1
            return getState() == 1;
        }

        /** * Overwrite the tryAcquire method, reentrant attempts to acquire the lock **@paramAcquires parameter, we don't use * here@returnReturns true on success, false */ on failure
        @Override
        public boolean tryAcquire(int acquires) {
            /* Try to get the lock */
            if (compareAndSetState(0.1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            /* Failed to acquire lock, check whether the current thread is the same thread */
            else if (getExclusiveOwnerThread() == Thread.currentThread()) {
                // If so, state+1 indicates that the lock is reentrant
                setState(getState() + 1);
                return true;
            }
            return false;
        }

        /** * Override the tryRelease method to reentrant an attempt to release the lock **@paramSo the releases parameter, we're not using * here@returnReturn true on success, false */ on failure
        @Override
        protected boolean tryRelease(int releases) {
            // If the thread trying to unlock is not the one holding the lock, an exception is thrown
            if(Thread.currentThread() ! = getExclusiveOwnerThread()) {throw new IllegalMonitorStateException();
            }
            boolean flag = false;
            int oldState = getState();
            int newState = oldState - 1;
            // If state becomes 0, set the thread that currently has exclusive access to null and return true
            if (newState == 0) {
                setExclusiveOwnerThread(null);
                flag = true;
            }
            // Reentrant lock release, release state once minus 1
            setState(newState);
            return flag;
        }

        /** * returns a Condition, each of which contains a Condition queue * for the thread to actively wait and wake up on the specified Condition queue **@returnReturns a new ConditionObject */ each call
        Condition newCondition(a) {
            return newConditionObject(); }}/** * Simply delegate the action to the Sync instance */
    private final Sync sync = new Sync();

    /** * lock interface lock method */
    @Override
    public void lock(a) {
        sync.acquire(1);
    }

    /** * the tryLock method of the lock interface */
    @Override
    public boolean tryLock(a) {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }

    @Override
    public void lockInterruptibly(a) throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    /** * Lock interface unlock method */
    @Override
    public void unlock(a) {
        sync.release(1);
    }

    /** * newCondition of the lock interface */
    @Override
    public Condition newCondition(a) {
        return sync.newCondition();
    }

    public boolean isLocked(a) {
        return sync.isHeldExclusively();
    }

    public boolean hasQueuedThreads(a) {
        returnsync.hasQueuedThreads(); }}Copy the code

6.1.1 test

/ * * *@author lx
 */
public class ReentrantExclusiveLockTest {
    /** * create lock */
    static ReentrantExclusiveLock reentrantExclusiveLock = new ReentrantExclusiveLock();
    /** * increment */
    static int i;

    public static void main(String[] args) throws InterruptedException {
        // Three threads
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3.3.1L, TimeUnit.MINUTES,
                new LinkedBlockingQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());
        Runa runa = new Runa();
        for (int i1 = 0; i1 < 3; i1++) {
            threadPoolExecutor.execute(runa);
        }
        threadPoolExecutor.shutdown();
        while(! threadPoolExecutor.isTerminated()) { }// Three threads execute, output the final result
        System.out.println(i);

    }


    /** * thread task, loop 50000 times, each time I increment by 1 */
    public static class Runa implements Runnable {
        @Override
        public void run(a) {
            // Incorrect results may be obtained when commenting lock and unlock
            // It will get the correct result every time it is turned on
            // Support multiple lock acquisition (reentrant)
            reentrantExclusiveLock.lock();
            reentrantExclusiveLock.lock();
            for (int i1 = 0; i1 < 50000; i1++) {
                i++;
            }
            // How many times must be freedreentrantExclusiveLock.unlock(); reentrantExclusiveLock.unlock(); }}}Copy the code

6.2 Implementation of reentrant shared lock

You can define a shared lock and specify the number of shared locks. By default, a maximum of three threads are allowed to acquire the lock at any one time, and access from more than three threads will be blocked.

We must override the tryAcquireShared(int Args) and tryReleaseShared(int Args) methods. Since it is a shared access, CAS method compareAndSet(int expect,int update) should be used in both methods to ensure atomicity when updating the synchronous state.

Assuming that a thread only needs to acquire one resource at a time, the lock is acquired. Since at most three threads are allowed to access at the same time, the number of synchronized resources is 3. In this way, the initial state can be set to 3 to represent synchronized resources. When a thread acquires, status decreases by 1, and the thread is released, status increases by 1, and the legal range of status is 0, 1 and 2. 0 indicates that two threads have already obtained the synchronization resource. If another thread attempts to obtain the synchronization state, the thread may block.

Finally, the custom AQS implementation is aggregated into the custom Lock through the method of the inner class. The custom Lock also needs to implement the Lock interface, and the internal implementation of the external method directly calls the corresponding template method.

A thread can acquire multiple shared locks, but must release multiple shared locks at the same time, otherwise it may be inefficient or even deadlocked due to reduced lock resources (which can be avoided by using tryLock)!

public class ShareLock implements Lock {
    /** * Default constructor, default shared resource 3 */
    public ShareLock(a) {
        sync = new Sync(3);
    }

    /** * specifies the number of resources of the constructor */
    public ShareLock(int num) {
        sync = new Sync(num);
    }

    private static class Sync extends AbstractQueuedSynchronizer {
        Sync(int num) {
            if (num <= 0) {
                throw new RuntimeException("Lock resources need to be greater than 0");
            }
            setState(num);
        }

        /** * Rewrite tryAcquireShared to get the shared lock */
        @Override
        protected int tryAcquireShared(int arg) {
            /* General idea */
            Int currentState = getState(); Int newState = currentState - arg; Return newState < 0? Return newState < 0? newState : compareAndSetState(currentState, newState) ? newState : -1; * /

            /* Better idea * In the above implementation, if the remaining state value is greater than 0, then the CAS is only tried once, and if it fails, the lock is not acquired, at which point the thread enters the synchronization queue * in the following implementation, if the remaining state value is greater than 0, then the CAS update attempt fails, and the for loop is retried. Surplus until the state value less than zero or updated * * the difference between two methods, whether to retry to CAS operation, here suggest that the second * because there may be multiple threads at the same time get more lock, but because of CAS can only be guaranteed success only one thread at a time, so the other threads must lose * but at the moment, In fact, there are some remaining locks that have not been acquired, so it is more efficient to let another thread try again than to join the synchronized queue directly! * * /
            for(; ;) {int currentState = getState();
                int newState = currentState - arg;
                if (newState < 0 || compareAndSetState(currentState, newState)) {
                    returnnewState; }}}/** * Rewrite tryReleaseShared to release the shared lock **@paramArg parameter *@returnReturns true on success and false */ on failure
        @Override
        protected boolean tryReleaseShared(int arg) {
            // It can only succeed
            for(; ;) {int currentState = getState();
                int newState = currentState + arg;
                if (compareAndSetState(currentState, newState)) {
                    return true; }}}}/** * internally initializes a sync object and then simply proxy the action to the sync object */
    private final Sync sync;

    /* The following are the calls to the template methods */
    
    @Override
    public void lock(a) {
        sync.acquireShared(1);
    }

    @Override
    public void lockInterruptibly(a) throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    @Override
    public boolean tryLock(a) {
        return sync.tryAcquireShared(1) > =0;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock(a) {
        sync.releaseShared(1);
    }


    / * * *@returnThe custom Condition is not implemented, and relying solely on the original Condition implementation does not support shared locking */
    @Override
    public Condition newCondition(a) {
        throw newUnsupportedOperationException(); }}Copy the code

6.2.1 test

public class ShareLockTest {

    static final ShareLock lock = new ShareLock();

    public static void main(String[] args) {
        /* Start 10 threads */
        for (int i = 0; i < 10; i++) {
            Worker w = new Worker();
            w.setDaemon(true);
            w.start();
        }
        ShareLockTest.sleep(20);
    }

    /** * Sleep **@paramSeconds Specifies the time, in seconds */
    public static void sleep(long seconds) {
        try {
            TimeUnit.SECONDS.sleep(seconds);
        } catch(InterruptedException e) { e.printStackTrace(); }}static class Worker extends Thread {
        @Override
        public void run(a) {
            /* If you start with a few locks, you must eventually release a few locks. Otherwise, you may lose lock resources, resulting in inefficiencies and deadlocks (this can be avoided by using tryLock)! * * /
            while (true) {
                / * * / tryLock test
                if (lock.tryLock()) {
                    System.out.println(Thread.currentThread().getName());
                    /* If the lock is acquired, the console will sleep for 2 seconds, then wait 2 seconds, then output 3 seconds, then 2 seconds... * /
                    ShareLockTest.sleep(2);
                    lock.unlock();
                }

                /* Lock test, there may always be a fixed thread to acquire the lock, because AQS default implementation is unfair lock */
                /*lock.lock(); System.out.println(Thread.currentThread().getName()); ShareLockTest.sleep(2); lock.unlock(); * /}}}}Copy the code

If you don’t understand or need to communicate, you can leave a message. In addition, I hope to like, collect, pay attention to, I will continue to update a variety of Java learning blog!