This is the fifth day of my participation in the November Gwen Challenge. Check out the details: The last Gwen Challenge 2021
An overview of the
SynchronousQueue is a blocking queue with no data buffers. The producer thread’s insert, put(), must wait for the consumer’s delete, take(), and vice versa. Other blocking queue and the difference is that the inner class will team up unified packaging into an interface operation, the inner class data is saved each operation action, such as the put operation, save the inserted value, and judging by the logo is the team or the team operation, if is to take action, the value is null, through identifier can judge is the team operation. SynchronousQueue is divided into fair policies (FIFO) and unfair policies (LIFO), which correspond to two internal classes. The fair policies are implemented using a queue structure, and the unfair policies are implemented using a stack structure. The non-fair access policy is used by default, but it can also be set to the fair access policy by using the constructor (true).
Inheritance relationships
public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
Copy the code
Important attributes
// Static final int NCPUS = runtime.getruntime ().availableprocessors (); Spins, used when timeout is specified, equal to cycles for CAS operations // Static final int maxTimedSpins = (NCPUS < 2)? Zero: 32; // Spins, use static final int maxUntimedSpins = maxTimedSpins * 16 if timeout is not specified; Static final Long spinForTimeoutThreshold = 1000L; // Spin timeout threshold. @suppressWarnings (" Serial ") static Class WaitQueue implements Java.io.Serializable {} Static class LifoWaitQueue extends WaitQueue { private static final long serialVersionUID = -3633113410248163686L; } static class FifoWaitQueue extends WaitQueue { private static final long serialVersionUID = -3623113410248163686L; } // Serialization uses private ReentrantLock qlock; private WaitQueue waitingProducers; private WaitQueue waitingConsumers; Private transient transferer <E> Transferer; // All queue operations are performed by transferer.Copy the code
SynchronousQueue does not set variables to hold data for queue-in and queue-out operations, which are stored in the Transferer.
A constructor
The constructor is clear, implementing the corresponding Transferer internal interface implementation class according to the selected policy to perform the queue operation
Public SynchronousQueue() {this(false); } // If fair is true, the SynchronousQueue uses a first-in-first-out TransferQueue, Public SynchronousQueue(Boolean fair) {transferer = fair? new TransferQueue<E>() : new TransferStack<E>(); }Copy the code
An abstract class Transferer
The parent class of TransferQueue, Transferer, is a simple transfer method, which needs to be implemented by subclasses.
//e: if it is not empty, it is put. The producer joins the queue, and the consumer needs to leave the queue to obtain the queue value. Nanos: timed time, unit: nanosecond Returned value: If the value is not empty, the operation succeeds and the consumed item or produced item is returned. Empty indicates that the operation failed due to timeout or interruption. abstract static class Transferer<E> { abstract E transfer(E e, boolean timed, long nanos); }Copy the code
Important method
put/offer
The enqueue operation calls Transfer through the inner class
// Add data to the SynchronousQueue if there is no thread fetching data. Public void put(E E) throws InterruptedException {// The added data cannot be null if (E == null) throw new NullPointerException(); Transfer transfer transfer transfer transfer transfer transfer transfer transfer InterruptedException is thrown if (transferer.transfer(e, false, 0) == null) { Thread.interrupted(); throw new InterruptedException(); Public Boolean offer(E E) {if (E == null) throw new NullPointerException(); return transferer.transfer(e, true, 0) ! = null; } // Offer method with timeout, this method will wait for a timeout, Public Boolean offer(E E, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); Transfer (e, true, unit.tonanos (timeout)) if (transferer.transfer(e, true, unit.tonanos (timeout))! = null) return true; // If adding data fails, it is possible that the thread is interrupted, otherwise return false if (! Thread.interrupted()) return false; Throw new InterruptedException(); }Copy the code
take/poll
The dequeue operation calls Transfer through the inner class, with the enqueue element E null
// The take method is used to fetch data from the queue. If the thread that did not add data is suspended, Public E take() throws InterruptedException {E E = transferer.transfer(null, false, 0); If (e! = null) return e; InterruptedException Thread.interrupted() is thrown; throw new InterruptedException(); Public E poll() {return transferer.transfer(null, true, 0); } // poll method with timeout. If no thread has inserted data after the timeout, Public E poll(long timeout, TimeUnit Unit) throws InterruptedException {E E = transferer.transfer(null, true, unit.toNanos(timeout)); // There are two ways to return the result. = null indicates that data is successfully fetched //! Thread.interrupted() means that the return fails because of timeout and e is null if (e! = null || ! Thread.interrupted()) return e; Throw new InterruptedException(); }Copy the code
Other methods treat empty queues as if they were empty, such as the queue length returning 0 and null returning true.
conclusion
SynchronousQueue is suitable for doing exchange work, such as synchronizing producer threads with consumer threads to pass information, events, or tasks. Its characteristics are as follows:
The internal implementation mainly consists of two internal implementation classes. Note that the nodes in the inner class do not store data but the action of each operation.
Through a encapsulated interface transfer to complete the queue and queue operation;