Block queue BlockingQueue

1. Blocking queues support blocking insert and remove methods; Continuing to insert elements when the queue is full will block, and continuing to fetch elements when the queue is empty will block the corresponding thread.

2. Blocking queue perfectly coordinates the relationship between producers and consumers, and solves the problem of imbalance between producer production speed and consumer consumption speed. When producers put data into blocking queue after production, then consumers take out data from blocking queue for consumption; It decouples producers and consumers and balances the processing power of producers and consumers.

BlockingQueue defines three pairs of methods for inserting and removing data:

Add and remove do not block, but throw exceptions directly;

Offer and poll return special values. Offer returns true on success and poll returns null if the queue is empty. Offer and poll also support delayed blocking access;

Put and take are the core blocking methods in blocking queues;

Commonly used blocking queues

ArrayBlockingQueue: a bounded blocking queue composed of array structures

We know that the underlying data structure is an array. We know that the underlying implementation of ArrayList and HashMap is also an array, but their underlying implementation is different. For example, ArrayList uses size++ to find the corresponding index, and HashMap uses the Hash value of the element, and then calculates the index in the array. If there are elements in the index, then a Hash conflict has occurred, which is resolved by a linked list or red-black tree. The underlying ArrayBlockingQueue is like a circular array. Once the size of the array is determined, it cannot be changed. It maintains an index for storing data and an index for fetching data.

The ReentrantLock is used to retrieve the data. Then, if the queue is empty, notempty. await is used to block and wait. If the queue is full while the element is being placed, block with notfull.await and notempty. signal is called after the element is successfully placed. Here notEmpty and notFull are both Condition variables of ReentrantLock;

This queue is sorted on a first-in, first-out basis. By default, threads cannot be guaranteed fair access to the queue. We can pass true in the constructor to make the internal implementation ReentrantLock a ReentrantLock. Fair access means that the thread that blocks first has priority access to the queue; The locks in this queue are undetached, that is, production and consumption use the same lock;

LinkedBlockingQueue: a bounded blocking queue with a linked list structure

The default maximum length is integer. MAX_VALUE, which is also based on the first-in, first-out rule. The locks in the queue are separated. PutLock is used for production and takeLock is used for consumption. The purpose of both locks is to store and fetch at the same time.

Blocking is also done with Condition:

If the queue is notFull, the queue is added directly to the end of the list. Then the count is updated. If the queue is notFull, the notfull. signal is called to notify other threads that are blocking. Finally, if count == 0, it means that there is no element in the list, but after putting there is an element in the list. Call notempty. signal to notify the thread blocking take;

If the queue is notEmpty, the element is fetched directly from the head of the list and the count is updated. If there are still elements after the take, the notEmpty. Signal is called to notify other threads that are blocking the take. Finally, if count == capacity, it means that the queue is full, and it is notFull after the take. Call notfull. signal to notify the thread blocking the put;

We can see that the blocking in put and take works together; The principle of offer and poll timeout blocking is basically the same, that is, the timing logic is modified a little;

PriorityBlockingQueue: An unbounded blocking queue that supports priority sorting

By default, elements are arranged in a natural ascending order. You can pass in a Comparator object in the constructor to sort the elements using a custom compare method.

DelayQueue: An unbounded blocking queue implemented using priority queues

This queue is implemented using PriorityQueue. The elements in the queue must implement the Delayed interface and specify how long they can be removed from the queue when they are created. Often used in the design of cache system;

SynchronousQueue: A blocking queue that does not store elements

Each PUT operation must wait for a take operation or it cannot continue to add elements;

LinkedTransferQueue: an unbounded blocking queue composed of linked lists, with transfer and tryTransfer methods

If there is currently a consumer waiting to receive an element, transfer can immediately transfer the element passed in by the producer to the consumer. If there is no consumer waiting, the element will be put into the tail node in the queue and wait until the consumer consumes it before returning. TryTransfer difference is returned immediately regardless of whether the consumer accepts, if no consumer accepts false;

LinkedBlockingDeque: a two-way blocking queue made up of linked lists

Elements can be inserted and removed from both ends, with one more entry, reducing the general contention when multiple threads are in and out of queues; Internal insert and remove methods in the basic extension to addFirst, addLast structure of the method;

ExecutorThreadPool

1. Why use thread pools

Reduce the consumption of resources; Improve the speed of response; Improving thread manageability;

2. Inheritance structure

Executor–ExecutorService–AbstractExecutorService–ThreadPoolExecutor

ExecutorService–ScheduledExecutorService–ScheduledThreadPoolExecutor; ScheduledThreadPoolExecutor can perform the task after the given delay or perform tasks on a regular basis, than the Timer is more flexible and more powerful;

3. Description of parameters for creating a thread pool

CorePoolSize: number of core threads; If the preStartAllCoreThreads method is executed, the thread pool creates and starts all core threads ahead of time;

MaximumPoolSize: specifies the maximum number of threads.

KeepAliveTime: indicates the lifetime of idle threads. By default, only threads larger than corePoolSize are valid;

TimeUnit: Indicates the keepAliveTime unit

WorkQueue: blocking queue. Use bounded blocking queues whenever possible

ThreadFactory: a threadFactory used to set thread names, daemons, etc.

RejectedExecutionHandler: Saturation policy, or rejection policy

  • DiscardOldestPolicy: Discards the earliest task.
  • AbortPolicy: Throws an exception.
  • CallerRunsPolicy: Let the thread submitting the task handle itself;
  • DiscardPolicy: Directly discards cards.

4. How thread pools work

If the current thread is smaller than the number of core threads, create a thread to execute the task first. If the number of running threads is greater than or equal to the number of core threads, the submitted task is placed in a blocking queue; If the blocking queue is also full and the number of threads running is less than the maximum, a thread is created; If the number of running threads is equal to the maximum number of threads, the reject policy is implemented when the task is submitted.

5. Submit tasks

The execute method is used for tasks that do not require a return value; Submit submits the task that needs to return a value, which returns an object of type Future. The return value can be obtained using the get method, which blocks the current thread until the task is complete.

Close the thread pool

Shutdown: Interrupts all threads that are not executing.

ShutdownNow: Attempts to interrupt all threads;

Both methods work by iterating through threads in the thread pool and then calling the interrupt method to try to interrupt the thread, so tasks that do not respond to interrupts may not be terminated; After calling both methods, the isShutdown method returns true; The thread pool is closed only when all tasks are closed, and isTerminated returns true.

7. Configure thread pools properly

CPU intensive tasks: Generally set the maximum number of threads to the NUMBER of CPU cores +1 or +2;

IO intensive tasks: generally set twice the number of CPU cores;

8, ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor inherited from ThreadPoolExecutor, increased the delay the task and the timing cycle mission;

Schedule method, which can submit a Runnable or Callable task, execute it once after the specified delay time, and return an object of type ScheduledFuture. ScheduledFuture inherits from Future and is used to obtain the task result or cancel the task.

ScheduleAtFixedRate: A task of Runnable type can be submitted. The task is executed for the first time after the initial delay is specified and then runs at a specified interval. The return value is ScheduledFuture, which can be used to cancel a task;

ScheduleWithFixedDelay: A task of Runnable type can be submitted. After the initial delay is specified, the task is executed for the first time. After the task is executed, the task is cycled at intervals. The return value is ScheduledFuture, which can be used to cancel a task; ScheduledFuture inherits only Future and Delayed interfaces and extends nothing;