Summary: The JUC toolkit is a powerful tool for concurrent programming in JAVA. This article describes how to implement a fairly bounded blocking queue with the help of native JAVA synchronization primitives without the help of the JUC toolkit. Hopefully, you’ll also appreciate the intricacies of concurrent programming and the power of the JUC toolkit.

The author | | ali Li Xinran source technology public number

A background

The JUC toolkit is a powerful tool for concurrent programming in JAVA.

This article describes how to implement a fairly bounded blocking queue with the help of native JAVA synchronization primitives without the help of the JUC toolkit.

Hopefully, you’ll also appreciate the complexity of concurrent programming and the power of the JUC toolkit.

The second way

The basic tools used in this article:

  1. Synchronized, method base and code block levels;
  2. Object basic classes wait, notify, and notifyAll;

Based on the above basic tools, to achieve fair bounded blocking queue, here:

  1. The definition of fairness is limited to FIFO, that is, the first blocking waiting request, the first to remove the waiting;
  2. There is no guarantee of the order in which actions will be executed after the wait is released;
  3. Ensure that the size of the queue does not exceed the set capacity at all times; However, there is no limit on the number of blocked waiting requests.

Three implementation

1 Basic Version

First, consider implementing a base version with ADT in a non-concurrent scenario

interface Queue { boolean offer(Object obj); Object poll(); } class FairnessBoundedBlockingQueue implements the Queue {/ / the current size of protected int size; // protected final int capacity; // Head pointer, empty: head.next == tail == null protected Node head; // protected Node tail; public FairnessBoundedBlockingQueue(int capacity) { this.capacity = capacity; this.head = new Node(null); this.tail = head; this.size = 0; Public Boolean offer(Object obj) {if (size < capacity) {Node Node = new Node(obj); tail.next = node; tail = node; ++size; return true; } return false; } // If the queue is empty, head. Next == null; Public Object poll() {if (head.next! = null) { Object result = head.next.value; head.next.value = null; head = head.next; // discard the header --size; return result; } return null; } class Node { Object value; Node next; Node(Object obj) { this.value = obj; next = null; }}}Copy the code

The above

  1. Define two basic interfaces that support queues, poll and offer;
  2. Queue implementation, using classical implementation;
  3. Consider that in the case of an empty queue, poll returns empty and non-blocking;
  4. If the queue is full, offer returns false, the queue is not successful, no exception.

Note: in the queue, this paper is implemented by migrating the header node to avoid modifying the tail node. You’ll see the intent here later when implementing concurrent versions.

2 Concurrent Version

If in a concurrent scenario, the above implementation faces some problems and fails to implement some of the given requirements.

Add synchronized to ensure thread safety under concurrent conditions.

Note that the reason for synchronization here is to keep the class invariant.

Concurrency issues

In concurrent scenarios, basic implementations face atomicity, visibility, and instruction rearrangement issues.

See the related descriptions of the JMM.

The simplest way to solve the concurrency problem is to use synchronized locking to solve the problem once.

Class BoundedBlockingQueue implements Queue {// Current size protected int size; // protected final int capacity; // Head pointer, empty: head.next == tail == null protected Node head; // protected Node tail; public BoundedBlockingQueue(int capacity) { this.capacity = capacity; this.head = new Node(null); this.tail = head; this.size = 0; Public synchronized Boolean offer(Object obj) {if (size < capacity) {Node Node = new Node(obj); synchronized Boolean offer(Object obj) {if (size < capacity) {Node Node = new Node(obj); tail.next = node; tail = node; ++size; return true; } return false; } // If the queue is empty, head. Next == null; Public synchronized Object poll() {if (head.next! = null) { Object result = head.next.value; head.next.value = null; head = head.next; // discard the header --size; return result; } return null; } // omit Node definition}Copy the code

Above, simply adding synchronized can solve the problem, but it introduces a new problem: the system activity problem (which will be solved later).

At the same time, simply adding synchronized synchronization can not achieve blocking wait; namely

  1. If the queue is empty, the exit action is returned immediately.
  2. If the queue is full, the join action will return immediately, and the return operation is unsuccessful.

To implement blocking wait, we need to use PV primitives in JAVA: wait, notify, notifyAll.

See JDK for descriptions of wait, notify, and notifyAll.

Who approach

Blocking wait, which can be implemented with a simple, sanitary approach, can be essentially abstracted as:

  1. Any method can be executed only when certain conditions are met.

  2. The invariant must be verified before the method is executed, and then the change is executed.

  3. After the completion of execution, check whether the posterior invariant is met;

    WHEN(condition) Object action(Object arg) { checkPreCondition(); doAction(arg); checkPostCondition(); }

This abstraction Ada is implemented at the language level. In JAVA, wait, notify, and notifyAll can be translated as:

Synchronized Object action(Object arg) {while(! condition) { wait(); } // Precondition, invariant checkPreCondition(); doAction(); CheckPostCondition (); } // Synchronized Object notifyAction(Object arg) {notifyAll(); }Copy the code

Note:

  1. NotifyAll, rather than notify, is usually used to send notifications. If the current thread is interrupted after receiving notify, the system will wait forever.
  2. If a notifyAll is used, the guard statement must be placed in the while loop; Because when the thread wakes up, the execution condition is no longer met, even though the current thread holds a mutex.
  3. NotifyAll must be sent if there is any change in all variables of the guard condition or you face a system activity problem

Accordingly, it is not difficult to implement a simple blocking version of a bounded queue, as follows

interface Queue { boolean offer(Object obj) throws InterruptedException; Object poll() throws InterruptedException; } class FairnessBoundedBlockingQueue implements the Queue {/ / the current size of protected int size; // protected final int capacity; // Head pointer, empty: head.next == tail == null protected Node head; // protected Node tail; public FairnessBoundedBlockingQueue(int capacity) { this.capacity = capacity; this.head = new Node(null); this.tail = head; this.size = 0; } // If the queue is full, Public synchronized Boolean offer(Object obj) throws InterruptedException {while (size < capacity) {wait(); } Node node = new Node(obj); tail.next = node; tail = node; ++size; notifyAll(); Return true; Public synchronized Object poll() throws InterruptedException {while (head.next == null) {wait(); } Object result = head.next.value; head.next.value = null; head = head.next; // discard the header --size; notifyAll(); Return result; } // omit Node definition}Copy the code

Above, blocking wait is implemented, but it also introduces larger performance problems

  1. Entry and exit action blocking waiting for the same lock, vicious competition;
  2. When the queue changes, all blocking threads wake up, a lot of thread context switching, competing for synchronization locks, and eventually only one thread can execute;

Points to note:

  1. Blocking wait throws an interrupt exception. Questions about exceptions are addressed below;
  2. The interface needs to support throwing interrupt exceptions.
  3. Team changes require notifyAll to avoid thread interrupts or exceptions, missing messages;

3 lock splitting optimization

The first problem above can be solved by locking split, that is: define two locks, read locks and write locks; Read/write separation.

/ / omit interface definition class FairnessBoundedBlockingQueue implements the Queue {/ / capacity protected final int capacity; // Head pointer, empty: head.next == tail == null protected Node head; // protected Node tail; // guard: canPollCount, head protected final Object pollLock = new Object(); protected int canPollCount; // guard: canOfferCount, tail protected final Object offerLock = new Object(); protected int canOfferCount; public FairnessBoundedBlockingQueue(int capacity) { this.capacity = capacity; this.canPollCount = 0; this.canOfferCount = capacity; this.head = new Node(null); this.tail = head; } // If the queue is full, Public Boolean offer(Object obj) throws InterruptedException {synchronized(offerLock) {while(canOfferCount <= 0) { offerLock.wait(); } Node node = new Node(obj); tail.next = node; tail = node; canOfferCount--; } synchronized(pollLock) { ++canPollCount; pollLock.notifyAll(); } return true; } // If the queue is empty, block waiting public Object poll() throws InterruptedException {Object result = null; synchronized(pollLock) { while(canPollCount <= 0) { pollLock.wait(); } result = head.next.value; head.next.value = null; head = head.next; canPollCount--; } synchronized(offerLock) { canOfferCount++; offerLock.notifyAll(); } return result; } // omit Node definition}Copy the code

The above

  1. PollLock and offerLock are defined to split teams and compete with each other.
  2. The variables for enqueue lock synchronization are callOfferCount and tail.
  3. The variables for out-lock synchronization are: canPollCount and HEAD;
  4. Exit action: First get the pollLock guard wait, complete the exit action; OfferLock then sends a notification to remove the queue waiting thread.
  5. The action of joining the team: first get offerLock guard wait, complete the action of joining the team; PollLock is then sent to remove the queue waiting thread.

The above implementation

  1. Ensure atomicity of team entry and team exit through team entry lock and team exit lock respectively;
  2. The exit action, through a special implementation, ensures that the exit only changes the head and avoids getting an offerLock;
  3. Offerlock. notifyAll and Polllock. notifyAll are used to solve the problem of read and write competition.

But there are still unresolved issues with the above implementation:

When more than one enqueue thread is waiting, a single enqueue action triggers all enqueue threads to compete, resulting in a large number of thread context switches, and finally only one thread can execute.

That is, there is the competition between reading and reading and writing and writing.

4 Status tracking uncompete

Here you can eliminate contention between read and read and between write and write through state tracking

Protected class FairnessBoundedBlockingQueue implements the Queue {/ / capacity final int capacity; // Head pointer, empty: head.next == tail == null protected Node head; // protected Node tail; // guard: canPollCount, head protected final Object pollLock = new Object(); protected int canPollCount; protected int waitPollCount; // guard: canOfferCount, tail protected final Object offerLock = new Object(); protected int canOfferCount; protected int waitOfferCount; public FairnessBoundedBlockingQueue(int capacity) { this.capacity = capacity; this.canPollCount = 0; this.canOfferCount = capacity; this.waitPollCount = 0; this.waitOfferCount = 0; this.head = new Node(null); this.tail = head; } // If the queue is full, Public Boolean offer(Object obj) throws InterruptedException {synchronized(offerLock) {while(canOfferCount <= 0) { waitOfferCount++; offerLock.wait(); waitOfferCount--; } Node node = new Node(obj); tail.next = node; tail = node; canOfferCount--; } synchronized(pollLock) { ++canPollCount; if (waitPollCount > 0) { pollLock.notify(); } } return true; } // If the queue is empty, block waiting public Object poll() throws InterruptedException {Object result; synchronized(pollLock) { while(canPollCount <= 0) { waitPollCount++; pollLock.wait(); waitPollCount--; } result = head.next.value; head.next.value = null; head = head.next; canPollCount--; } synchronized(offerLock) { canOfferCount++; if (waitOfferCount > 0) { offerLock.notify(); } } return result; } // omit Node definition}Copy the code

The above

  1. The problem of read-write competition is solved through state tracking of waitOfferCount and waitPollCount.
  2. When the queue changes, according to the tracking state, determine whether to send messages, triggering the thread blocking state release;

However, the above implementation will fail in some scenarios and face the activity problem, consider

Case 1:

  1. The initial status queue is null, and the queue is blocked at pollLock, where waitPollCount==1;
  2. WaitPollCount ==1 is not reset.
  3. The blocking queue is empty, but the waitPollCount==1 class status is abnormal;

Situation 2:

  1. The initial status queue is empty. A and B execute the queue action and are blocked in pollLock, and waitPollCount==2;
  2. Thread C performs the action of joining the queue, which can be executed immediately. After the execution is complete, pollLock is triggered to release a thread and wait for notify.
  3. The thread fired is random in the JVM implementation, assuming thread A is unblocked;
  4. Assuming thread A has been interrupted during the blocking process, the JVM checks the interrupted status after the blocking is cleared and throws InterruptedException.
  5. At this point, there is one element in the queue, but thread A is still blocking in the pollLock and will continue to block.

The above is an example of unblocking message loss, the root of the problem is with exception handling.

5 Resolve abnormal problems

Solve the thread interrupt exit problem, thread check interrupt state scenario

  1. Wait, Thread.join, thread. sleep;

  2. The JVM clears the interruption flag and throws InterruptedException if it detects thread.interrupted ().

  3. Generally, in order to ensure timely response of threads to interrupts, the run method needs to autonomously detect interrupt flags and interrupt threads, especially when they are sensitive to interrupts and need to maintain class invariants.

    Protected class FairnessBoundedBlockingQueue implements the Queue {/ / capacity final int capacity;

    // Head pointer, empty: head.next == tail == null protected Node head; // protected Node tail; // guard: canPollCount, head, waitPollCount protected final Object pollLock = new Object(); protected int canPollCount; protected int waitPollCount; // guard: canOfferCount, tail, waitOfferCount protected final Object offerLock = new Object(); protected int canOfferCount; protected int waitOfferCount; public FairnessBoundedBlockingQueue(int capacity) { this.capacity = capacity; this.canPollCount = 0; this.canOfferCount = capacity; this.waitPollCount = 0; this.waitOfferCount = 0; this.head = new Node(null); this.tail = head; } // If the queue is full, Public Boolean offer(Object obj) throws InterruptedException {if (thread.interrupted ()) {throw new InterruptedException(); Synchronized (offerLock) {while(canOfferCount <= 0) {waitOfferCount++; try { offerLock.wait(); } catch (InterruptedException e) {// Trigger other threads offerlock.notify (); throw e; } finally { waitOfferCount--; } } Node node = new Node(obj); tail.next = node; tail = node; canOfferCount--; } synchronized(pollLock) { ++canPollCount; if (waitPollCount > 0) { pollLock.notify(); } } return true; } // If the queue is empty, Public Object poll() throws InterruptedException {if (thread.interrupted ()) {throw new InterruptedException(); } Object result = null; synchronized(pollLock) { while(canPollCount <= 0) { waitPollCount++; try { pollLock.wait(); } catch (InterruptedException e) { pollLock.notify(); throw e; } finally { waitPollCount--; } } result = head.next.value; head.next.value = 0; // ignore head; head = head.next; canPollCount--; } synchronized(offerLock) { canOfferCount++; if (waitOfferCount > 0) { offerLock.notify(); } } return result; } // omit the definition of NodeCopy the code

    }

The above

  1. When the waiting thread interrupts to exit, interrupt exceptions are captured and polllock. notify and offerlock. notify are used to forward messages.
  2. Trace variables by restoring state in finally;

Lock contention between reads and writes can be solved by state variable tracking.

Consider how to solve the fairness problem between read and read and between write and write.

6 Fairness Resolution

The fairness problem is solved by converting the tracing of state variables to: request monitor tracing.

  1. Each request corresponds to a monitor;
  2. Through internal maintenance of a FIFO queue to achieve fairness;
  3. Release the monitor in the queue when the queue status changes;

The above logic can be uniformly abstracted as

boolean needToWait; synchronized(this) { needToWait = calculateNeedToWait(); if (needToWait) { enqueue(monitor); If (needToWait) {monitor.dowait (); }Copy the code

Need to pay attention to

  1. Monitor.dowait () needs to be outside of this’s guard statement, because monitor.dowait does not release this lock if it is inside;
  2. CalculateNeedToWait () needs to be done within the guard of this to avoid synchronization issues;
  3. Interrupt exceptions need to be considered;

Based on the above logical abstraction, to achieve fair queue

/ / omit interface definition class FairnessBoundedBlockingQueue implements the Queue {/ / capacity protected final int capacity; // Head pointer, empty: head.next == tail == null protected Node head; // protected Node tail; // guard: canPollCount, head, pollQueue protected final Object pollLock = new Object(); protected int canPollCount; // guard: canOfferCount, tail, offerQueue protected final Object offerLock = new Object(); protected int canOfferCount; protected final WaitQueue pollQueue = new WaitQueue(); protected final WaitQueue offerQueue = new WaitQueue(); public FairnessBoundedBlockingQueue(int capacity) { this.capacity = capacity; this.canOfferCount = capacity; this.canPollCount = 0; this.head = new Node(null); this.tail = head; } // If the queue is full, Public Boolean offer(Object obj) throws InterruptedException {if (thread.interrupted ()) {throw new InterruptedException(); } WaitNode wait = null; Synchronized (offerLock) {/ / in blocking the request or when the queue is empty, blocked waiting for the if (canOfferCount < = 0 | |! offerQueue.isEmpty()) { wait = new WaitNode(); offerQueue.enq(wait); } else { // continue. } } try { if (wait ! = null) { wait.doWait(); } if (Thread.interrupted()) { throw new InterruptedException(); } } catch (InterruptedException e) { offerQueue.doNotify(); throw e; } // Synchronized (offerLock) {Node Node = new Node(obj); tail.next = node; tail = node; canOfferCount--; } synchronized(pollLock) { ++canPollCount; pollQueue.doNotify(); } return true; } // If the queue is empty, Public Object poll() throws InterruptedException {if (thread.interrupted ()) {throw new InterruptedException(); } Object result = null; WaitNode wait = null; Synchronized (pollLock) {/ / in blocking the request or when the queue is empty, blocked waiting for the if (canPollCount < = 0 | |! pollQueue.isEmpty()) { wait = new WaitNode(); pollQueue.enq(wait); } else { // ignore } } try { if (wait ! = null) { wait.doWait(); } if (Thread.interrupted()) { throw new InterruptedException(); }} catch (InterruptedException e) {pollqueue.donotify (); throw e; Synchronized (pollLock) {result = head.next-value; head.next.value = 0; // ignore head; head = head.next; canPollCount--; } synchronized(offerLock) { canOfferCount++; offerQueue.doNotify(); } return result; } class WaitQueue { WaitNode head; WaitNode tail; WaitQueue() { head = new WaitNode(); tail = head; } synchronized void doNotify() { for(;;) { WaitNode node = deq(); if (node == null) { break; } else if (node.donotify ()) {// make sure NOTIFY break; } else { // ignore, and retry. } } } synchronized boolean isEmpty() { return head.next == null; } synchronized void enq(WaitNode node) { tail.next = node; tail = tail.next; } synchronized WaitNode deq() { if (head.next == null) { return null; } WaitNode res = head.next; head = head.next; if (head.next == null) { tail = head; // empty, migrate tail node} return res; } } class WaitNode { boolean released; WaitNode next; WaitNode() { released = false; next = null; } synchronized void doWait() throws InterruptedException { try { while (! released) { wait(); } } catch (InterruptedException e) { if (! released) { released = true; throw e; } else {// If NOTIFY is received after the interrupt signal, do not throw an exception; RELAY thread.currentThread ().interrupt(); } } } synchronized boolean doNotify() { if (! released) { released = true; notify(); // Explicitly free a thread, return true; } else {// No new thread is released, return false; }}} // omit Node definition}Copy the code

The above

  1. The core is to replace the state tracking variable with a synchronous node, WaitNode;
  2. WaitNode implements the FIFO protocol through a simple synchronous queue organization, with each thread waiting for its own WaitNode monitor;
  3. The WaitNode maintains an internal release state to indicate whether or not the blocked thread has been released, primarily to handle interrupts.
  4. The WaitQueue itself is fully synchronized, and since the read-write contention and read-write internal contention have been resolved, WaitQueue synchronization is not a problem;
  5. WaitQueue is an unbounded queue and is a potential problem; But since it only does synchronous tracing, and it usually tracks threads, this is usually not a problem;
  6. The final fair bounded queue is realized. Whether to join the queue or to leave the queue, the guard statement determines whether to join the queue and wait. If joining the queue and wait, wait through the fairness protocol.

When the signal is released, the queue is updated synchronously with read/write locks. Finally, the queue update message is triggered by the read/write lock.

7. Wait time

In concurrent scenarios, wait is usually set to time-limited TIMED_WAITING to avoid deadlock or loss of system activity.

Implementing timed waits for synchronous queues is not as difficult as it might seem

class TimeoutException extends InterruptedException {} class WaitNode { boolean released; WaitNode next; WaitNode() { released = false; next = null; } synchronized void doWait(long milliSeconds) throws InterruptedException { try { long startTime = System.currentTimeMillis(); long toWait = milliSeconds; for (;;) { wait(toWait); if (released) { return; } long now = System.currentTimeMillis(); toWait = toWait - (now - startTime); if (toWait <= 0) { throw new TimeoutException(); } } } catch (InterruptedException e) { if (! released) { released = true; throw e; } else {// If the semaphore has been released, no exception is thrown here; Thread.currentthread ().interrupt(); } } } synchronized boolean doNotify() { if (! released) { released = true; notify(); return true; } else { return false; }}Copy the code

Because all waits are blocked on the WaitNode monitor, above

  • Define timeout exceptions first, inheriting InterruptedException just to facilitate exception handling.

  • This relies on the implementation of a timeout wait(long timeout), which is usually not a problem;

Finally, the WaitNode timeout waiting for logic, into a FairnessBoundedBlockingQueue implementation, can.

Four summarizes

This article iterates step by step and finally realizes the fair bounded queue of the first version with the help of JAVA synchronization primitive. The following points can be seen during iterative implementation:

  1. The change of concept transforms the method thinking of calling a class into: the method can be called only when certain conditions are met, the invariant needs to be satisfied before calling, and the invariant needs to be satisfied after calling; Since concurrency problems are difficult to test, it is often used to prove that concurrency is correct.
  2. You will see many patterns in iterative implementations. For example, read/write separation can be abstracted into read and write locks. I get an abstract Lock definition; For example, read/write status tracking, which is used 11003 abstract expression.
  3. In addition, the implementation of this article is far from perfect, and we need to consider supporting Iterator traversal, state query and data migration operations.

Finally, believe everyone to see the tool kit of JUC again realize, have different experience.

The original link

This article is the original content of Aliyun and shall not be reproduced without permission.