This is the 14th day of my participation in the November Gwen Challenge. Check out the event details: The last Gwen Challenge 2021

Writing in the front

SynchronousQueue SynchronousQueue SynchronousQueue I gave it another name called the pairing queue. Why is it called the pairing queue

By the way, you are welcome to click on your avatar to view the theme. The design patterns theme is finished, and the queue theme and Security theme are currently in progress.

Review of the queue

Blocking queues can be divided into the following types:

Blocking queues provide Blocking put and take methods that are equivalent to offer and poll that can be timed. If the queue is full, the PUT method blocks until there is space to insert elements; If the queue is empty, the take method also blocks until an element is available. The PUT and take methods never block when the queue is never full.

Understand SynchronousQueue will

SynchronousQueue, it’s not really a queue, because SynchronousQueue has no capacity. Unlike the other BlockingQueues,SynchronousQueue is a BlockingQueue that does not store elements. It just maintains a set of threads waiting to add or remove elements from the queue.

We can simply divide into the following characteristics:

  • No internal storage (capacity 0)

  • Blockingqueue (also an implementation of blockingqueue)

  • The sending or consuming threads block and exit at the same time only if one pair of consuming and sending threads matches.

  • Pairing has fair mode and unfair mode (default)

To demonstrate what we have analyzed above, here is an example code:

Code case

SynchronousQueue * SynchronousQueue *@Date 2021/11/16 11:10 下午
 * @Author yn
 */
public class SynchronousQueueDemo {

    public static void main(String[] args) {
        SynchronousQueue<String> synchronousQueue = new SynchronousQueue<>();
        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + "\t enter queue 1");
                synchronousQueue.put("1");
                System.out.println(Thread.currentThread().getName() + "\t enter queue 2");
                synchronousQueue.put("2");
                System.out.println(Thread.currentThread().getName() + "\t enter queue 3");
                synchronousQueue.put("3");
            } catch(InterruptedException e) { e.printStackTrace(); }},"AAAAA").start();

        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(5);
                System.out.println(Thread.currentThread().getName() + "\t out of queue" + synchronousQueue.take());
                TimeUnit.SECONDS.sleep(5);
                System.out.println(Thread.currentThread().getName() + "\t out of queue" + synchronousQueue.take());
                TimeUnit.SECONDS.sleep(5);
                System.out.println(Thread.currentThread().getName() + "\t out of queue" + synchronousQueue.take());
            } catch(InterruptedException e) { e.printStackTrace(); }},"BBBBB").start(); }}Copy the code

Take a look at the console print

AAAAA in the queue1BBBBB out queue1AAAAA in the queue2BBBBB out queue2AAAAA in the queue3BBBBB out queue3
Copy the code

See, this is the pairing queue from the beginning, which validates that inbound and outbound must be paired. Otherwise block.

So this is an interesting blocking queue, where each insert must wait for a remove from another thread, and each remove must wait for an insert from another thread. Therefore, the queue does not actually have any elements inside it, so the PEEK operation cannot be called because elements are only available when they are removed.

Let’s take a look at how it’s used

Application scenarios

Consider the newCachedThreadPool thread pool

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
}
Copy the code

One use case for SynchronousQueue is in thread pools. Executors. NewCachedThreadPool used SynchronousQueue will (), the thread pool (a new task comes) according to need to create a new thread, if there is a idle thread will be repeated use, thread idle is recycled after 60 seconds.

Because ThreadPoolExecutor’s internal implementation calls the non-blocking offer method of the work queue (the implementation class of the BlockingQueue interface), using the SynchronousQueue as the work queue, When client code submits a task to the thread pool and there are no free threads in the pool to fetch a task from the SynchronousQueue instance, the corresponding offer method call fails (that is, the task is not queued to work).

At this point, ThreadPoolExecutor creates a new worker thread to process the failed task (assuming that the thread pool size has not reached its maximum thread pool size).

Is not the design of the thief wonderful. We can go inside and follow the code

Take a quick look at the source code

SynchronousQueue, like any other BlockingQueue, inherits AbstractQueue and implements the BlockingQueue interface:

SynchronousQueue provides two constructors:

public SynchronousQueue(boolean fair) {
        // Use the fair value to determine fairness and unfairness
        // In fairness, the TransferQueue is used. In non-fairness, the TransferStack is used
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }
Copy the code

TransferQueue, TransferStack inherits Transferer, an internal class of SynchronousQueue, which provides a method transfer() that defines the specification for transferring data as follows:

abstract static class Transferer<E> {
        abstract E transfer(E e, boolean timed, long nanos);
    }
Copy the code

Transfer () method is mainly used to complete the transfer of data, if E! If e == null, it is equivalent to receiving data from a producer and from a consumer.

Ok, so that’s all for today’s SynchronousQueue, so let’s briefly summarize

conclusion

SynchronousQueue is used to ensure that “submitted tasks are processed by idle threads, if any; Otherwise, create a new thread to handle the task.

SynchronousQueue is also an implementation of BlockingQueue, which uses the blocking primitive of ArrayBlockingQueue internally and is therefore functionally equivalent to ArrayBlockingQueue. However, SynchronousQueue is lightweight. SynchronousQueue does not have any internal capacity, not even one capacity, that can be used to safely exchange single elements between threads. So the function is relatively single, the advantage should be light weight bar ~

All right, see you next time about queues, come on!!

overtones

Thank you for reading, and if you feel like you’ve learned something, you can like it and follow it. Also welcome to have a question we comment below exchange

Come on! See you next time!

To share with you a few I wrote in front of a few SAO operation

Copy object, this operation is a little SAO!

Dry! SpringBoot uses listening events to implement asynchronous operations

You are also welcome to click on the queue topic below. We study together