Thread pools and custom thread pools

In order to prevent resource abuse and properly control the number of connections, we use pooling.

Database connection pool: Do not randomly new connections

You can’t just create new threads to ensure uniform thread processing.

The thread pool

Java provides four thread pools through Excutors:

  • newCachedThreadPool
  • newFixedThreadPool
  • newScheduledThreadPool
  • newSingleThreadPool

The core parameters of the thread pool

We determine what thread pool we want by passing different values to these parameters.

int corePoolSize, // The number of core threads, whether busy or not, there will be this many threads in the pool, as long as new divides the pool, there will be this many threads
int maximumPoolSize,// Maximum number of threads. If the number of tasks exceeds the number of core threads, new threads will be created, but the upper limit is maximumPoolSize
long keepAliveTime,// How long does it take to reclaim idle time when threads exceed core threads
TimeUnit unit,// The unit of time above
BlockingQueue<Runnable> workQueue,// Block the queue (stuck there waiting to be fetched), where redundant tasks that cannot be processed by the thread pool are queued
ThreadFactory threadFactory,// Thread factory, responsible for creating threads
RejectedExecutionHandler handler// Reject the policy
    
Copy the code

Blocking queue

You put 10 numbers in, you take 12 numbers, and the program doesn’t end, it waits for someone to send two more numbers in.

Note that BlockingQueue is an interface, and the specific implementations are ArrayBlockingQueue and LinkedBlockingQueue

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/ * * *@author SJ
 * @date2021/4/2 * /
public class TestBlockingQueue {
    public static void main(String[] args) throws InterruptedException {
        // Create a blocking queue with capacity 5
        BlockingQueue<Integer> blockingQueue=new ArrayBlockingQueue<>(3);

        // Create two tasks, one for drop and one for fetch
        // Put three numbers
        Runnable t1=()->{
        for (int i = 0; i < 3; i++) {
            System.out.println("Into"+i);
            try {
                blockingQueue.put(i);
            } catch(InterruptedException e) { e.printStackTrace(); }}};// Take 5 numbers
        Runnable t2=()->{
            for (int i = 0; i < 5; i++) {
                System.out.println("Take out"+i);
                try {
                    blockingQueue.take();
                } catch(InterruptedException e) { e.printStackTrace(); }}};// Create a thread pool to drop tasks into
        ExecutorService pools= Executors.newFixedThreadPool(2); pools.submit(t2); pools.submit(t1); }}Copy the code

It doesn’t stop because it needs to get five numbers, but there’s only three numbers in the queue, and it’s not done yet, so the block is waiting here for someone to put the numbers in.

Rejection policies

If you use ArrayBlockingQueue with blocking queues, tasks are queued in an orderly manner. LinkedBlockingQueue can be an unbounded queue.

When the thread is full and the queue is full, the task is rejected.

Rejection strategy:

There are four

AbortPolicy: Throws an exception

CallerRunsPolicy: Try to run before dropping

DiscardOldestPolicy: Discards the queue with the longest waiting time

DiscardPolicy: directly discard the card

newFixedThreadPool

Number of core threads: nThreads

Maximum number of threads: nThreads

If the number of core threads is the same as the maximum number of threads, there is no active time. Active time refers to the active time of the portion beyond the number of core threads

Unit: millisecond

The wait queue is a LinkedBlockingQueue, an unbounded queue that accepts all tasks without a rejection policy

newSingleThreadPool

Number of core threads: 1

Maximum number of threads: 1

Unbounded queue

newCachedThreadPool

Number of core threads: 0

Maximum number of threads: Integer range last, equivalent to unlimited number of threads

Active time: 60

Unit: second

Wait queue: Synchronization queue

Synchronous queue

SynchronousQueueSimple to use

The classic producer-consumer model runs like this:

You have multiple producers, you can produce products at the same time, put products in a queue, and if the queue is full, the producers will block;

There are multiple consumers that concurrently fetch products from the queue, and if the queue is empty, the consumer blocks

SynchronousQueue is also a queue, but it has no internal container. A production thread must block when it produces a product (when put) if no one currently wants to consume the product (when no thread is currently taking a take). The process of waiting for a consumer thread to call the take operation, which will wake up the production thread, and at the same time the consumer thread will take the production thread’s product (i.e., data transfer), which is called a single pairing process (of course, it can also take before put, the principle is the same). The put thread blocks after queue.put(1) and can only return if the take thread consumes it. You can think of this as a model of one-to-one messaging between threads.

Realize the principle of

Unlike blocking queues such as ArrayBlockingQueue and LinkedBlockingQueque, which rely on AQS for concurrent operations, SynchronousQueue uses CAS directly for secure access to threads.

The implementation strategies of queues are usually divided into fair mode and unfair mode.

** Fair mode: ** The underlying implementation uses TransferQueue, an internal queue, which has head and tail Pointers to the thread nodes currently waiting for a match.

The tail of the team matches the head of the team, first in first out, reflecting the principle of fairness

** The bottom implementation uses TransferStack, a stack, with the head pointer pointing to the top of the stack.

Last come first match.

newScheduledThreadPool

Number of core threads: corePoolSize

Maximum number of threads: no limit

Timeout period: 0

Wait queue: delay queue

Delay queue

This queue is a custom priority queue and can only be used to store RunnableScheduledFutures tasks. The bottom layer is a heap structure

We know that when thread pools are running, they’re constantly fetching tasks from the task queue and then executing them. If we want to implement delayed or timed task execution, an important point is that the task queue will be sorted according to the different delay time of the task. The shorter the delay time is, the task queue will be ranked in front of the queue and the task will be executed first.

Custom thread pools

Creating a thread pool manually is even better.

1. Create a thread pool by using ThreadPoolExecutor instead of using Executors.

* If the thread pool object returns by Executors, it has the following disadvantages:

  • FixedThreadPool and SingleThreadPool

    The allowed request queue length is integer. MaxValue, which may cause a large number of requests to pile up, resulting in memory overflow

  • CachedThreadPool:

    The number of threads allowed to be created is integer. MaxValue, which may create a large number of threads, resulting in memory overflow

The instance

Let’s write our own thread pool:

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/ * * *@author SJ
 * @date2021/4/3 * /
public class MyExecutor {
    public static void main(String[] args) {

        // Use the ThreadPoolExecutor class to construct a thread pool
        ThreadPoolExecutor MyThreadPool = new ThreadPoolExecutor(
                4.// Number of core threads
                8.// Maximum number of threads
                5.// Active time
                TimeUnit.SECONDS,/ / unit
                new ArrayBlockingQueue<>(300),// Block queue, up to 300 queues
                new MyThreadFactory(new ThreadGroup("sunjie"),"goods-thread"),// Thread factory
                new ThreadPoolExecutor.AbortPolicy()// Reject policy uses throw exception
        );

    }

    // Create your own thread factory instead of writing the source code to change
    static class MyThreadFactory implements ThreadFactory{
        private static final AtomicInteger poolNumber = new AtomicInteger(0);
        private ThreadGroup group;
        private String namePrefix;

        // Constructor, passing in the group name and prefix
        public MyThreadFactory(ThreadGroup group, String namePrefix) {
            this.group = group;
            this.namePrefix = namePrefix;
        }

        @Override
        public Thread newThread(Runnable r) {
            // Create a new thread, specifying the group name and thread name
            Thread t = new Thread(group, r,
                    namePrefix +"-"+ poolNumber.getAndIncrement());
            // Set it to a non-daemon thread
            if (t.isDaemon())
                t.setDaemon(false);
            // Set thread priority to regular priority
            if(t.getPriority() ! = Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY);returnt; }}}Copy the code

use

Use custom thread pools

import java.util.concurrent.ThreadPoolExecutor;

/ * * *@author SJ
 * @date2021/4/3 * /
public class TestMyExecutor {
    public static void main(String[] args) {
        // Create a thread pool
        MyExecutor myExecutor = new MyExecutor();
        // Call the thread pool you created
        ThreadPoolExecutor myThreadPool = myExecutor.MyThreadPool;

        // Add new tasks to the thread pool
        while (true){

            myThreadPool.execute(()->{
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(123); }); }}}Copy the code

Enter jConsole on the command line to select your class and view the active threads

At this point, there are four active threads, and since the task is output and is executed very fast, four core threads are sufficient.

If we change the task to

   // Add new tasks to the thread pool
        while (true){

            myThreadPool.execute(()->{
                try {
                    Thread.sleep(10000);// The thread runs for 10s
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(123);
            });
        }
Copy the code

The console threw an exception, because the maximum number of queued tasks was 300, while(true) would frantically throw tasks in, so the rejection policy was implemented.

There are not enough core threads, so new threads will be created, and the maximum number of threads is 8, so there are 8 threads.