Before the analysis of AQS source code, but only the analysis of the principle of the exclusive lock.

It just so happens that we can use Semaphore to analyze shared locks.

How to use Semaphore

public class SemaphoreDemo {

  public static void main(String[] args) {

    // Apply for number of shared locks
    Semaphore sp = new Semaphore(3);

    for(int i = 0; i < 5; i++) {
      new Thread(() -> {
        try {

          // Get the shared lock
          sp.acquire();

          String threadName = Thread.currentThread().getName();

          / / access API
          System.out.println(threadName + "Get permission to access the API. Number of remaining licenses" + sp.availablePermits());

          TimeUnit.SECONDS.sleep(1);
          
          // Release the shared lock
          sp.release();

          System.out.println(threadName + "Release licenses, currently available licenses are" + sp.availablePermits());

        } catch(InterruptedException e) { e.printStackTrace(); }},"thread-" + (i+1)).start(); }}}Copy the code

Java SDK provides Lock, why Semaphore? Implementing a mutex is only part of Semaphore’s functionality, but there is another feature that Locks can’t easily implement: Semaphore allows multiple threads to access a critical section.

Common requirements are pooled resources such as connection pools, object pools, thread pools, and so on. Of these, you are probably most familiar with database connection pools, which must be used by multiple threads at the same time. Of course, each connection is not allowed to be used by other threads until it is released.

For example, the above code demonstrates that only three threads are allowed to access the API at a time.

How to implement Semaphore based on AQS

abstract static class Sync extends AbstractQueuedSynchronizer {
  Sync(int permits) {
    setState(permits);
  }

  / / acquiring a lock
  final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
      int available = getState();
      int remaining = available - acquires;
      if (remaining < 0 ||
          compareAndSetState(available, remaining))
          returnremaining; }}/ / releases the lock
  protected final boolean tryReleaseShared(int releases) {
      for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current)
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true; }}}Copy the code

The Semaphoreacquire/releaseAQS is used again, which takes the value of state as the number of shared resources. Subtract 1 from state when the lock is acquired and add 1 to state when the lock is released.

To acquire the lock

public void acquire(a) throws InterruptedException {
  sync.acquireSharedInterruptibly(1);
}

// AQS
public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
  if (Thread.interrupted())
    throw new InterruptedException();
  if (tryAcquireShared(arg) < 0)
    doAcquireSharedInterruptibly(arg);
}

Copy the code

Semaphore also has fair and unfair locks, but both are implemented with the help of AQS. The default is non-fair locks, so the nonfairTryAcquireShared method is eventually called.

The release of the lock

public void release(a) {
  sync.releaseShared(1);
}

// AQS
public final boolean releaseShared(int arg) {
  if (tryReleaseShared(arg)) {
    doReleaseShared();
    return true;
  }
  return false;
}

Copy the code

After the lock is released successfully, doReleaseShared() is called, which will be examined later.

Failed to obtain the lock

When the lock fails to be acquired, a new thread is enqueued

 public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
  if (Thread.interrupted())
      throw new InterruptedException();
  // nonfairTryAcquireShared in NonFairSync is actually called
  if (tryAcquireShared(arg) < 0)
      doAcquireSharedInterruptibly(arg);
}
Copy the code

When the number of locks is less than 0, you need to join the queue.

A Shared lock the method called doAcquireSharedInterruptibly (), and an exclusive lock the method called acquireQueued () only some subtle differences.

First, the EXCLUSIVE lock construction mode is EXCLUSIVE, while the SHARED lock construction mode is SHARED. They are distinguished by the nextWaiter variable in AQS.

Secondly, when preparing to join the team, if the attempt to obtain the shared lock is successful, the setHeadAndPropagate() method is called to reset the head node and decide whether the successor node needs to wake up

private void setHeadAndPropagate(Node node, int propagate) {
  // The old head node
  Node h = head;
  // Set the current lock node as the head node
  setHead(node);

  // If there are still many locks (propagate value is the return value of nonfairTryAcquireShared())
  // Either the old header is null, or the ws of the header is less than 0
  // If the new header is null, or ws of the new header is less than 0, the subsequent nodes are awakened
  if (propagate > 0 || h == null || h.waitStatus < 0 ||
      (h = head) == null || h.waitStatus < 0) {
      Node s = node.next;
      if (s == null|| s.isShared()) doReleaseShared(); }}private void setHead(Node node) {
  head = node;
  node.thread = null;
  node.prev = null;
}

private void doReleaseShared(a) {    
  for (;;) {
      Node h = head;
      Ensure that there are at least two nodes in the synchronization queue
      if(h ! =null&& h ! = tail) {int ws = h.waitStatus;

          // The successor node needs to be woken up
          if (ws == Node.SIGNAL) {
              if(! compareAndSetWaitStatus(h, Node.SIGNAL,0))
                  continue;            // loop to recheck cases
              unparkSuccessor(h);
          }
          // Update the node state to PROPAGATE
          else if (ws == 0 &&
                   !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
              continue;                // loop on failed CAS
      }
      if (h == head)                   // loop if head changed
          break; }}Copy the code

Actually, there’s some logic that I can’t understand, like the logic in the propagate () method

if (propagate > 0 || h == null || h.waitStatus < 0 ||
      (h = head) == null || h.waitStatus < 0) {

Copy the code

Isn’t that a bad way to write it?

if(propagate > 0  && h.waitStatus < 0)
Copy the code

Why is it so complicated? I tried to pretend I understood, but I found it hard to fool myself.

There must have been a special reason, or Doug Lea would not have written this…

What’s the use of PROPAGATE state?

I went to the Internet to search, the result found in the Java Bug list because there is a Bug to change this

Bugs.java.com/bugdatabase…

Seeing that this bug was fixed in JDK6 in 2011, I had no idea what Java was at that time…

This change can be found on Doug Lead’s home page, using JSR 166 to find a comparable CSV, comparing the 1.73 and 1.74 versions

Gee.cs.oswego.edu/cgi-bin/vie…

Let’s look at the modification comparison in propagate for the first time

In previous versions, the judgment condition looked like this

if (propagate > 0&& node.waitStatus ! =0)
Copy the code

Well, that’s pretty much what I know. But what are the problems? According to the description of the bug, let’s talk about what can go wrong when there is no PROPAGATE state.

Semaphore first initializes state to 0, then four threads each run four tasks. Threads T1 and T2 acquire the lock simultaneously, and two other threads T3 and T3 release the lock simultaneously

public class TestSemaphore {

  // Set the semaphore to 0
  private static Semaphore sem = new Semaphore(0);

  private static class Thread1 extends Thread {
    @Override
    public void run(a) {
      / / acquiring a locksem.acquireUninterruptibly(); }}private static class Thread2 extends Thread {
    @Override
    public void run(a) {
      / / releases the locksem.release(); }}public static void main(String[] args) throws InterruptedException {
    for (int i = 0; i < 10000000; i++) {
      Thread t1 = new Thread1();
      Thread t2 = new Thread1();
      Thread t3 = new Thread2();
      Thread t4 = newThread2(); t1.start(); t2.start(); t3.start(); t4.start(); t1.join(); t2.join(); t3.join(); t4.join(); System.out.println(i); }}}Copy the code

Based on the code above, we set the semaphore to 0, so T1, T2 will fail to acquire the lock.

Suppose the queue in a loop looks like this

head --> t1 --> t2(tail)
Copy the code

Locks are released by T3 first and t4 later

Time 1: Thread T3 calls releaseShared() and wakes up the node in the queue (thread T1), at which point the head state changes from -1 to 0

Time 2: Thread T1 is woken up by t3 because thread T3 has released the lock and then gets propagate value 0 via nonfairTryAcquireShared()

Time 3: thread T4 calls releaseShared() and reads that waitStatue is 0(the same head as the head in time 1), so it does not wake up the successor node

Thread T1 obtains the lock successfully and calls setHeadAndPropagate(), because propagate > 0 is not met (propagate == 0 in moment 2), so the subsequent node will not be woken up

If there is no PROPAGATE state, the above situation will result in thread T2 not being woken up.

So what happens to this variable after we introduce propagate?

Time 1: Thread T3 calls doReleaseShared and wakes up the node in the queue (thread T1), at which point the head state changes from -1 to 0

Time 2: Thread T1 is woken up by T3 due to the semaphore released by T3, and then gets propagate value 0 via nonfairTryAcquireShared()

At time 3: thread T4 calls releaseShared() and reads that waitStatue is 0(the same head as the head in time 1). Set the node state to PROPAGATE(PROPAGATE = -3).

 else if (ws == 0 &&
        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
   continue;     // loop on failed CAS
Copy the code

Thread T1 obtains the lock successfully, and calls setHeadAndPropagate(), although it does not meet the propagate > 0(propagate == 0 in moment 2), but waitStatus<0, so it will wake up the subsequent node

So now we know what PROPAGATE is, just to avoid the dilemma that the thread can’t wake up. Because there will be many threads to acquire or release the lock in shared lock, so some methods are executed concurrently, which will generate many intermediate states. PROPAGATE is to make the intermediate states not affect the normal operation of the program.

DoReleaseShared – Small ways are wise

The doReleaseShared() method is called both to release the lock and to obtain the lock.

private void doReleaseShared(a) {    
  for (;;) {
    Node h = head;
    Ensure that there are at least two nodes in the synchronization queue
    if(h ! =null&& h ! = tail) {int ws = h.waitStatus;

      // The successor node needs to be woken up
      if (ws == Node.SIGNAL) {
          // There may be other threads calling doReleaseShared(), and the unpark operation only needs one of those calls
          if(! compareAndSetWaitStatus(h, Node.SIGNAL,0))
              continue;     // loop to recheck cases
          unparkSuccessor(h);
      }
      // Set the node state to PROPAGATE
      else if (ws == 0 &&
            !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
          continue;    // loop on failed CAS
    }
    if (h == head)        // loop if head changed
      break; }}Copy the code

There is a judgment condition

ws == 0&&! compareAndSetWaitStatus(h,0, Node.PROPAGATE)
Copy the code

This if condition is also clever

  1. First, the queue now has at least two nodes. To simplify the analysis, we think it has only two nodes,head –> node
  2. Else if, so we skipped the if condition, so the head node is new, its waitStatus is 0, the tail node was added after that, This happens is shouldParkAfterFailedAcquire before () could amend the ws value of a node before to SIGNAL
  3. CAS failure shows that the head node of the ws not to 0, also suggests that shouldParkAfterFailedAcquire () has precursor node waitStatus values change to SIGNAL

And the exit condition for the whole loop is at h==head, why is that?

Since our head node is a virtual node (also known as a sentinel node), assume that our synchronization queue has nodes in the following order:

head --> A --> B --> C

Now assuming that A has acquired the shared lock, it will become the new dummy node,

head(A) --> B --> C

Thread A calls the doReleaseShared method to wake up the successor node B, which quickly acquires the lock and becomes the new head node

head(B) --> C

The thread B will also call the method, and to awaken after the node C, but in B thread calls, thread A may not run over, also is executing this method, when it is executed to h = = head found head has changed, so the for loop will not quit, and will continue to perform the for loop, awakens the subsequent nodes.

At this point we shared lock analysis is finished, in fact, as long as we understand the logic of AQS, Semaphore is very simple to rely on AQS implementation.

In the process of looking at the shared lock source code, it is important to note that methods are executed concurrently by multiple threads, so many of these judgments will only occur when multiple threads are competing. It is also important to note that shared locking does not guarantee thread safety. It is still up to the programmer to ensure that operations on shared resources are safe.