Writing in the front

This article summarizes the common classes in the JDK and packages. It will summarize the common BlockingQueue blocking queues, CountDownLatch, CyclicBarrier, and Semaphore.

Blocking queue

BlockingQueue is the Java JUC package’s most useful class for solving concurrent producer-consumer problems. It allows only one thread to take or put operations at any time. BlockingQueue also provides a timeout return null mechanism. This tool can be found in many production scenarios.

Queue type

  • Unbounded queues: Can grow almost indefinitely
  • Bounded queue: defines the maximum capacity

Common blocking queues

  • ArrayBlockingQueue A bounded queue supported by an array
  • LinkedBlockingQueue An optional bounded queue supported by linked nodes
  • PriorityBlockingQueue The unbounded priority queue supported by the priority heap
  • DelayQueue A time-based scheduling queue supported by the priority heap

These are the four common blocking queues, with ArrayBlockingQueue and LinkedBlockingQueue being relatively common. Next, I’ll briefly summarize the principle using ArrayBlockingQueue as an example.

ArrayBlockingQueue

Add element API:

methods instructions
add() Return true if the insertion was successful, otherwise IllegalStateException is thrown
put() Inserts the specified element into the queue. If the queue is full, it blocks until there is room to insert
offer(E e) Return true if the insert was successful, false otherwise
offer(E e,long timeout,TimeUnit unit) Attempts to insert elements into the queue, and if the queue is full, blocks until there is room to insert

Query element API:

methods instructions
take() Gets the header element of the queue and removes it, blocking and waiting for the element to become available if the queue is empty
poll(long timeout, TimeUnit unit) Retrieves and deletes the head of the queue, if necessary, waiting a specified amount of time to make the element available, or returning NULL if timeout occurs

The above is not the complete API but just a few key major API methods.

Add () and offer() methods



From the source code, you can see that the add() method ultimately calls the offer() method, but the add() method ifThe queue is fullFailure to add elementsAn exception is thrown, and the offer() method eventually returnsIs the true and false.

Put () and offer(E E,long timeout,TimeUnit Unit) methods



As you can see from the source code, the difference between the put() method and the offer() method is that offer() useslock.lock()Lock, while put() useslock.lockInterruptibly()The difference is that lockInterruptibly will determine if the thread is interrupted and will throw an interrupt exception, while Lock will not, as shown in the following figure:

There is another place where you can see that the put() method blocks with notfull.await () when the array is full. NotFull and notEmpty are initialized in the constructor, which is the conditional wait state of the lock. As mentioned in the previous article, the Node of AQS contains several states, one of which represents the conditional wait.

When the queue is full, it breaks out of the while loop and enters the enqueue() method, which uses notempty.signal () to wake up the waiting thread.

Offer (E E, long timeout, TimeUnit Unit) is similar to the PUT method, but adds the wait time to the wait condition.



The above is a cursory look at the source code for the added element. BlockingQueue’s underlying dependency is the use ofReentrantLockTo lock. Both the put() and offer() methods use conditional waiting to block if the queue is full. So if the queue is empty at the time the element is fetched, is conditional waiting also used to block?

Take () method and poll() method

Again, compare the poll(), take(), and poll() methods with parameters to see the difference.

The thread is also awakened in the dequeue() method.

Semaphore

Semaphore is a tool class that controls the number of threads accessing a particular resource. It relies on the State of AQS.

A constructor

public Semaphore(int permits)
public Semaphore(int permits, boolean fair)
Copy the code
  • Permitting indicates the number of permitting threads
  • Fair means fair, and if this is set to true, the next thread to execute will be the one that has waited the longest

Important method

Public void acquire() throws InterruptedException public void release() tryAcquire (int args,long timeout, TimeUnit unit)Copy the code
  • Acquire () blocks and acquires permission
  • Release () represents the release permission

Code sample

public class SemaphoreTest { public static void main(String[] args) { Semaphore semaphore = new Semaphore(2); for (int i=0; i<10; i++){ new Thread(new Task(semaphore,i)).start(); } } static class Task extends Thread{ Semaphore semaphore; public Task(Semaphore semaphore,String tname){ this.semaphore = semaphore; this.setName(tname); } public void run() { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName()+":aquire() at time:"+System.currentTimeMillis()); Thread.sleep(1000); semaphore.release(); System.out.println(Thread.currentThread().getName()+":aquire() at time:"+System.currentTimeMillis()); } catch (InterruptedException e) { e.printStackTrace(); }}}}Copy the code



If you view the output, you can find that 10 threads are started. The output is performed every 5s, and only two threads are displayed each time.

Semaphore constructor

The Semaphore object needs to be passed as a parameter, which is ultimately set toState Synchronization status in AQSRepresents the maximum parallelism.

Semaphore is also divided into fair mode and unfair mode. The default mode is unfair mode.

Acquire () method source code analysis

Acquire () provides two methods, one with no arguments and one with arguments. When acquire() is called, state decreases by one each time the semaphore is fetched.





Acquire () call is sync. AcquireSharedInterruptibly () method.

TryAcquireShared (arG) is first called to try to get the semaphore.



Based on fair mode, it will first judge whether it is in the queue of AQS, and then obtain the value of state synchronization state, that is, during initialization, state will be set to represent the number of semaphore. useThe current number of available semaphores minus the semaphore acquire required by the requestIf theIf the value is less than 0, the remaining semaphore is not enough to request the required semaphore, it will execute doAcquireSharedInterruptibly (arg). ifIf the value is greater than 0, CAS directly updates the value of state.



The logic is similar to ReentrantLock’s acquireQueued method. First of all beCreate a node and add it to the CLH queueBut at this timeNodes are SHARED(SHARED mode)Similarly, the precursor node of the current node will be taken out before blocking. If it is the head node, another attempt will be made to obtain the semaphore. If the semaphore is obtained successfully, it will enterSetHeadAndPropagate method. Otherwise, would be the shouldParkAfterFailedAcquire (p, node) && parkAndCheckInterrupt (), then these two methods described above, here is no need to do. These two methods are mainly toNode changes the state of the node to SIGNAL and blocks the thread.



The propagate method is basicallySet the queue headerAnd check whether a successor is likely to beWait in shared modeIf propagation > 0 or setPROPAGATE state, PROPAGATE. Set the object header, as described in the previous article,Set node.thread and node.prev to null. Once the queue header is set, the doReleaseShared() method is entered.

It’s mostly implementationWake up the next node and ensure propagation. I.e., nodes start from scratch, if there are any blocked thread (in shouldParkAfterFailedAcquire (p, node) && parkAndCheckInterrupt () block, and the state is also a SIGNAL), If ws is SIGNAL, the state is changed to 0, the change fails, continue to continue the loop, the thread is woken up if the change is successful, the thread is PROPAGATE(PROPAGATE mode) for the node with WS ==0, continue to continue the loop until the change is successful. If the head node does not change (multiple threads accessing it may cause the H head node to change), the loop is broken. Legend:



T2, T3, and T4 threads need to queue. The default waitStatus of node creation is 0. When queueing, the status of the precursor node is changed to SIGNAL(-1). So the state of waitStaus on the last node is 0.

T0 then releases the semaphore that wakes the next node, the T2 thread, changing the waitStatus to 0 (the first if of the doReleaseShared() method to determine the CAS success and unparkprecursor () to wake the thread).

Next, the head is determined to have a waitStatus of 0, and the CAS operation is changed to -3 (else if determination of the doReleaseShared() method).

The release() method source code

Release () also provides two methods, one with and one without parameters. Call release(), so every time you get a semaphore, state is incremented by one.



The release() method call isReleaseShared () method, will first try to release a semaphore. The CAS operation plus the value of state. Again, if successful, it is calledThe doReleaseShared() method wakes up the next node and ensures that it is propagated.

CountDownLatch

The CountDownLatch class enables a thread to wait for other threads to finish their work before executing. For example, the main thread of an application wants to execute after the thread responsible for starting the framework service has started all the framework services. The CountDownLatch principle is similar to the Semaphore principle and is based on AQS, but there is no fair or unfair difference. Code sample

public class CountDownTest { private static CountDownLatch countDownLatch = new CountDownLatch(2); public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.submit(new Runnable() { @Override public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally { countDownLatch.countDown(); } system.out. println(" thread 1 completes execution "); }}); executorService.submit(new Runnable() { @Override public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally { countDownLatch.countDown(); } system.out. println(" thread 2 completes execution "); }}); System.out.println(" wait for child thread to finish "); countDownLatch.await(); System.out.println(" All threads end "); executorService.shutdown(); }}Copy the code

The execution result

A constructor

Similarly, CountDownLatch is implemented using AQS. Using the constructor below, you can see that the counter value is actually assigned to the state variable of AQS, i.e. the state value of AQS is used to represent the counter value.



Await () method

After a thread calls the await method of the CountDownLatch object, the current thread is blocked and will not return until one of the following situations occurs: after all threads have called the countDown method of the CountDownLatch object, that is, when the count is 0; If another thread interrupts the current thread by calling its interrupt () method, the current thread throws InterruptedException and returns.







It can be seen from the source code that the feature of this method is that the thread can be interrupted when obtaining resources, and the resources obtained are shared resources. AcquireSharedInterruptibly first determines whether the current thread has been interrupted, if an exception is thrown, otherwise call the sync tryAcquireShared methods achieve view the current status value (counter value) to 0, is the current thread await () method returns directly, Otherwise call AQS doAcquireSharedInterruptibly method for the current thread is blocked. As you can see, the arG parameter passed by tryAcquireShared is not used. The tryAcquireShared method is only called to check whether the current state is 0. CAS is not called to decrease the current state by 1.

CountDown () method







CountDownLatch’s countDown () method delegate sync to call AQS’s releaseShared method, which then calls tryReleaseShared(), which returns false if the current status is 0. CountDown () returns, then decrement the counter by 1 with CAS, retry if CAS fails, otherwise return true if the current count is 0, and countDown() is called by the last thread. You also need to wake up threads that are blocked by calling the await method of CountDownLatch, specifically AQS ‘doReleaseShared method to activate the blocked thread. The doReleaseShared() method was introduced earlier and won’t be repeated here.

CyclicBarrier

A CyclicBarrier, which stands for loop barrier, allows a group of threads to reach a state before they all execute at the same time. The CyclicBarrier is called loopback because it can be reused after all waiting threads have finished executing and reset the state of the CyclicBarrier. It is called a barrier because a thread will block after calling an await method. This block point is called a barrier point. After all the threads have called an await method, they will break through the barrier and continue running. Example:

public class CyclicBarrierTest { private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2); public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.submit(new Runnable() { @Override public void run() { try { System.out.println(Thread.currentThread()+" step1"); cyclicBarrier.await(); System.out.println(Thread.currentThread()+" step2"); cyclicBarrier.await(); System.out.println(Thread.currentThread()+" step3"); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); }}}); executorService.submit(new Runnable() { @Override public void run() { try { System.out.println(Thread.currentThread()+" step1"); cyclicBarrier.await(); System.out.println(Thread.currentThread()+" step2"); cyclicBarrier.await(); System.out.println(Thread.currentThread()+" step3"); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); }}}); executorService.shutdown(); }}Copy the code

Output result:



In the code above, each child thread calls the await method after executing phase 1 and waits until all threads have reached the barrier point before executing phase 2. This ensures that all threads have completed phase 1 before executing phase 2. The await method is then called after phase 2, which ensures that all threads have completed phase 2 before phase 3 can begin.

The last

The CyclicBarrier source code will be reviewed later.