This is the third day of my participation in the First Challenge 2022
Life is too short to have a dog
The queue
The basic concept
A queue is an ordered linear list that can only be inserted at one end (the end of the queue) and deleted at the other (the head of the queue). Simple understanding, just like the dining hall meal queue, the new students can only line up at the end of the queue, each time the head of the queue after finishing the meal to the next student to continue cooking. An example of a queue is shown below.
The basic operations for queue abstract data types are as follows:
- void enQueue(T data);
- T deQueue();
There are three common ways to implement queues:
- Implementation method based on simple cyclic array
- Implementation method based on dynamic cyclic array
- Implementation method based on linked list
The principle is the same for the first method and the second method, except that the first method uses a fixed length array, while the second method uses a dynamically expanded array.
Queues common in Java
1. ArrayDeque
ArrayDeque is implemented using the dynamic looping array described above. Take a look at the JDK source code:
/**
* Resizable-array implementation of the {@link Deque} interface. Array
* deques have no capacity restrictions; they grow as necessary to support
* usage. They are not thread-safe; in the absence of external
* synchronization, they do not support concurrent access by multiple threads.
* Null elements are prohibited. This class is likely to be faster than
* {@link Stack} when used as a stack, and faster than {@link LinkedList}
* when used as a queue.
**/
Copy the code
As you can see from the documentation, this queue has no capacity limit. Let’s take a look at the code in detail.
First, take a look at the member variables in the queue:
/** * The array in which the elements of the deque are stored. * The capacity of the deque is the length of this array, which is * always a power of two. The array is never allowed to become * full, except transiently within an addX method where it is * resized (see doubleCapacity) immediately upon becoming full, * thus avoiding head and tail wrapping around to equal each * other. We also guarantee that all array cells not holding * deque elements are always null. */
transient Object[] elements; // non-private to simplify nested class access
/** * The index of the element at the head of the deque (which is the * element that would be removed by remove() or pop()); or an * arbitrary number equal to tail if the deque is empty. */
transient int head;
/** * The index at which the next element would be added to the tail * of the deque (via addLast(E), add(E), or push(E)). */
transient int tail;
/** * The minimum capacity that we'll use for a newly created deque. * Must be a power of 2. */
private static final int MIN_INITIAL_CAPACITY = 8;
Copy the code
Excluding constants, there are only three variables:
elements
An array of type Object. The length of the array must be a power of 2, so when expanding the array, it is multiplied by 2.head
Array index of the head of the queue.tail
The array index at the end of the queue.
Let’s take a look at how to get in and out of the team and see if it’s the same as you think:
The team
/**
* Inserts the specified element at the end of this deque.
*
* <p>This method is equivalent to {@link #add}.
*
* @param e the element to add
* @throws NullPointerException if the specified element is null
*/
public void addLast(E e) {
if (e == null)
throw new NullPointerException();
elements[tail] = e;
// A bit operation is used to check whether the queue is full, and if so, expand the queue
if ( (tail = (tail + 1) & (elements.length - 1)) == head)
doubleCapacity();
}
/** * Doubles the capacity of this deque. Call only when full, i.e., * when head and tail have wrapped around to become equal. */
private void doubleCapacity(a) {
assert head == tail;
int p = head;
int n = elements.length;
int r = n - p; // number of elements to the right of p
int newCapacity = n << 1;
if (newCapacity < 0)
throw new IllegalStateException("Sorry, deque too big");
Object[] a = new Object[newCapacity];
System.arraycopy(elements, p, a, 0, r);
System.arraycopy(elements, 0, a, r, p);
elements = a;
head = 0;
tail = n;
}
Copy the code
Out of the team
public E pollLast(a) {
int t = (tail - 1) & (elements.length - 1);
@SuppressWarnings("unchecked")
// No type checking is done here
E result = (E) elements[t];
if (result == null)
return null;
elements[t] = null;
tail = t;
return result;
}
Copy the code
The above code is actually not too difficult, so I won’t explain it line by line here.
2. AQS (AbstractQueueSynchronizer)
Many of you are familiar with AQS, queue synchronizers. We often talk about ReentrantLock, ReentrantReadWriteLock is to use AQS to achieve.
First, take a look at the source documentation of AQS:
* Provides a framework for implementing blocking locks and related
* synchronizers (semaphores, events, etc) that rely on
* first-in-first-out (FIFO) wait queues. This class is designed to
* be a useful basis for most kinds of synchronizers that rely on a
* single atomic {@code int} value to represent state. Subclasses
* must define the protected methods that change this state, and which
* define what that state means in terms of this object being acquired
* or released. Given these, the other methods in this class carry
* out all queuing and blocking mechanics. Subclasses can maintain
* other state fields.but only the atomically updated {@code int}
* value manipulated using methods {@link #getState}, {@link
* #setState} and {@link #compareAndSetState} is tracked with respect
* to synchronization.
Copy the code
As described in the documentation, AQS provides a framework for implementing blocking locks and other associated synchronizers based on a FIFO wait queue (also known as CLH queues). It can be understood as an officially recognized and packet – sending C – bit component.
Unlike ArrayDeque, CLH queues in AQS are implemented using linked lists. So here we need to focus on how the nodes in the linked list are implemented.
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */
static final int PROPAGATE = -3;
/**
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus;
/** * Link to predecessor node that current node/thread relies on * for checking waitStatus. Assigned during enqueuing, and nulled * out (for sake of GC) only upon dequeuing. Also, upon * cancellation of a predecessor, we short-circuit while * finding a non-cancelled one, which will always exist * because the head node is never cancelled: A node becomes * head only as a result of successful acquire. A * cancelled thread never succeeds in acquiring, and a thread only * cancels itself, not any other node. */
volatile Node prev;
/** * Link to the successor node that the current node/thread * unparks upon release. Assigned during enqueuing, adjusted * when bypassing cancelled predecessors, and nulled out (for * sake of GC) when dequeued. The enq operation does not * assign next field of a predecessor until after attachment, * so seeing a null next field does not necessarily mean that * node is at end of queue. However, if a next field appears * to be null, we can scan prev's from the tail to * double-check. The next field of cancelled nodes is set to * point to the node itself instead of null, to make life * easier for isOnSyncQueue. */
volatile Node next;
/** * The thread that enqueued this node. Initialized on * construction and nulled out after use. */
volatile Thread thread;
/** * Link to next node waiting on condition, or the special * value SHARED. Because condition queues are accessed only * when holding in exclusive mode, we just need a simple * linked queue to hold nodes while they are waiting on * conditions. They are then transferred to the queue to * re-acquire. And because conditions can only be exclusive, * we save a field by using special value to indicate shared * mode. */
Node nextWaiter;
/** * Returns true if node is waiting in shared mode. */
final boolean isShared(a) {
return nextWaiter == SHARED;
}
/**
* Returns previous node, or throws NullPointerException if null.
* Use when predecessor cannot be null. The null check could
* be elided, but is present to help the VM.
*
* @return the predecessor of this node
*/
final Node predecessor(a) throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread; }}Copy the code
There are 7 member variables and 4 constants in the inner class of this node, among which we need to pay attention to the following variables:
waitStatus
Wait status, which indicates the status of the node thread to obtain resources. The values are as followsSIGNAL
,CANCELLED
,CONDITION
,PROPAGATE
, 0;prev
A pre-ordered node that points to its predecessor. By checking the status of the predecessor, you can shorten the queuing time of the node (if the predecessor’s waiting status is cancelled).next
Subsequent node, refers to the successor of this node. Note that this variable is set to NULL to assist the GC on the exit of the queue. This means that null does not mean that the current node is the end of the queueprev
Variables to judge the second check;thread
The thread that holds the node, each thread waiting to acquire the resource will have a node;nextWaiter
Waiting for the subsequent Node of the queue, Node Node to obtain synchronization statusModel (Mode). It is used to indicate which mode the node is in.SHARED
,EXCLUSIVE
,#isShared()
).
In fact, the CHL synchronization queue header element is a fake queue header element.
CLH queues need a dummy header node to get started
Of course, the queue head element is not created in the constructor, but only when the actual queue creation takes place after the waiting resource thread is actually generated.
Since we are not here to explore how AQS implement synchronizer functions, we will take a look at the methods of joining and leaving CLH queues.
The team
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// Try it quickly
if(pred ! =null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
returnnode; }}// A quick attempt fails, and spin tries several times until it succeeds
enq(node);
return node;
}
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
// set a false node
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
returnt; }}}}Copy the code
The unsafe-looking code is relatively simple, with the CAS operation (using the Unsafe class) being used for concurrency safety, and the Node variables being volatile.
Out of the team
When the first node of the CLH queue releases the synchronization state, it wakes up its next node, and when the successor node succeeds in obtaining the synchronization state, it sets itself as the first node. Specific methods are as follows:
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
Copy the code
The CAS operation is not used, because only one thread will perform the operation.
These are two of the more common examples of using looping arrays and linked lists to implement queues.
application
Here are some of the more common applications:
- Sequential task scheduling
- Multiprogramming
- Asynchronous data transfer (pipeline)
- As an auxiliary data structure for the algorithm
The above implementation is not shown here, if you are interested, you can Google it.
Finally, I wish you a happy New Year and an early end to the epidemic