First of all, here’s my penguin colony:
Those interested in springboot for rapid development can add the penguin colony below.
The common thread
- 1. Implementation: Inherit Thread or implement Runnable interface
- 1. Inherit threads only
- 2. Implement the Runnable interface (which can realize internal resource sharing) and implement multiple interfaces
- 3. Classic problem: Window selling tickets
- 2. Instantiate the object
- 3. Perform tasks
- 4. Destroy the thread to reclaim resources
Think about:
What do we do when multiple resources need to be threaded? Has the following process been repeated?
create -> run -> destroy
Copy the code
We know that every time a computer runs, it consumes a lot of resources. Five threads may have no impact. How about 5W? 50,000 creates and destroys only 50,000 executions? The execution task may spend a significant amount of time dealing with the creation and destruction.
The thread pool
The characteristics of
- 1. Solve the execution problem of multiple threads in the processor unit
- 2. Reduce the idle time of processor units
- 3. Increased throughput of the processor unit during working hours (why say so? We reduce the creation and destruction waste of multiple tasks per thread and improve task execution efficiency.
composition
- 1. ThreadPool manager: responsible for creating, managing, destroying thread pools, and adding tasks
- 2. PoolWorker: If no task exists, the PoolWorker waits and executes the task repeatedly
- 3. Task interface: Each Task must implement the interface. The worker thread is responsible for scheduling the execution of the Task, specifying the entry of the Task, the finishing work after the Task is completed, and the execution status of the Task, etc
- 4. TaskQueue: Stores unprocessed tasks and provides a task buffer mechanism
Eg: Supermarket checkout: cashier service group, single cashier, cashier work, waiting to be collected by the crowd
The JDK thread pool class: Java. Util. Concurrent, Executors and JDK thread pool actuator interface: Java. Util. Concurrent. The Executor
Under Executors, the JDK provides the following thread pool:
A static method | The type of thread pool created | The actual implementation of the return value |
---|---|---|
newFixedThreadPool(int) | Fixed thread pool | ThreadPoolExecutor |
newWorkStealingPool() | Processor core number of parallel thread pool | ForkJoinPool |
newSingleThreadExecutor() | A single thread pool for one thread | FinalizableDelegatedExecutorService |
newCachedThreadPool() | Cache thread pool | ThreadPoolExecutor |
newSingleThreadScheduledExecutor() | Single thread timing thread pool | DelegatedScheduledExecutorService |
newScheduledThreadPool(int) | Timed thread pool | ScheduledThreadPoolExecutor |
NewSingleThreadExecutor () Specifies the thread pool of a thread
Why am I using a thread pool here? In fact, we take the simple ones and we take the complicated ones. On the first code:
public static ExecutorService newSingleThreadExecutor(a) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1.1.0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1.1.0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
Copy the code
We can see that the above method is the return value is the ExecutorService, but actually it is instantiated FinalizableDelegatedExecutorService, we went in to see the source code, as follows:
static class FinalizableDelegatedExecutorService extends DelegatedExecutorService {
// constructor
FinalizableDelegatedExecutorService(ExecutorService executor) {
super(executor);
}
// Called when the object is destroyed
protected void finalize(a) {
super.shutdown(); }}Copy the code
The code above, we can see obviously FinalizableDelegatedExecutorService merely to DelegatedExecutorService encapsulation, only implementation is at the end of the object is destroyed when will the ExecutorService.
At this point we should return to analyze the DelegatedExecutorService and the specific code in the method above.
Let’s look at the default single-threaded thread pool implementation as follows:
new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1.1.0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
ExecutorService the code here implements an ExecutorService, with several parameters. Why?
//
public class ThreadPoolExecutor extends AbstractExecutorService {
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }}// We can see the literal meanings of several parameters:
CorePoolSize Number of core threads, including idle threads
//maximumPoolSize Specifies the maximum number of threads
KeepAliveTime Is the maximum number of threads that can wait for a new task before terminating when the number of threads is larger than the core.
//unit keepAliveTime Unit of the keepAliveTime parameter
//workQueue Queue used to hold tasks before execution. This queue only holds Runnable tasks submitted by the execute method
/ / Executors. DefaultThreadFactory () the default thread factory
//defaultHandler is a handler for tasks that are out of the thread and out of the task queue, implemented as new AbortPolicy()
// Here, we look at the default thread factory. After all, the core of the thread pool is to need threads to perform tasks, so we look at the thread source first.
static class DefaultThreadFactory implements ThreadFactory {
// Number of pools, specifying atomic operations
private static final AtomicInteger poolNumber = new AtomicInteger(1);
/ / thread group
private final ThreadGroup group;
// Number of threads, specifying atomic operations
private final AtomicInteger threadNumber = new AtomicInteger(1);
// Thread name prefix
private final String namePrefix;
DefaultThreadFactory() {
// Get the system security manager
SecurityManager s = System.getSecurityManager();
// Create a thread group, depending on whether the system security manager is obtainedgroup = (s ! =null)? s.getThreadGroup() : Thread.currentThread().getThreadGroup();// Construct the thread name
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
// Create a thread
public Thread newThread(Runnable r) {
// Set the thread group, the Runnable interface (the code block that the thread actually executes), the thread name, and the stack size required by the thread to 0
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
// If the thread is daemons, the setDaemon method must be called before the thread executes
if (t.isDaemon())
t.setDaemon(false);
// The default task priority is 5
if(t.getPriority() ! = Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY);returnt; }}// The default thread factory above gives us a thread that is not a daemon thread, which is kept unique by atomic operations, and a default task priority (minimum 1, maximum 10, default 5, where priority is 5).
Copy the code
Single thread pool, default is only one thread and one thread pool, wait time for new tasks is 0, added atomic operations to bind threads.
Is that where it ends? Of course not, we now need to look at a more specific ThreadPoolExecutor to understand thread pools more deeply.
public class ThreadPoolExecutor extends AbstractExecutorService {
/** * all constructors point to this, so it's enough to look at this */
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
The maximum number of threads in the thread pool cannot be less than or equal to 0. The maximum number of threads in the thread pool cannot be less than or equal to the number of core threads. The maximum waiting time for a task cannot be less than or equal to 0
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
// Wait task queue, thread factory, supertask queue handler
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
// The above judgment can be seen as a defensive type of programming, all the problems are handled up front, do not need to worry about similar problems later
// Construct thread pool related threshold
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler; }}AbstractExecutorService = AbstractExecutorService (AbstractExecutorService);
// Abstract execution service
public abstract class AbstractExecutorService implements ExecutorService {
// Execute method
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
if (tasks == null)
throw new NullPointerException();
// Get the number of tasks
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
// Task set
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
// Perform the completion service
ExecutorCompletionService<T> ecs =
new ExecutorCompletionService<T>(this);
try {
// Record an exception
ExecutionException ee = null;
// Timeout time line
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// Use iterators to get tasks
Iterator<? extends Callable<T>> it = tasks.iterator();
// Decide to start a task
futures.add(ecs.submit(it.next()));
// The number of tasks is reduced
--ntasks;
// The task flag is being executed
int active = 1;
// Execute tasks in a loop
for (;;) {
// Get the first task in the task queue
Future<T> f = ecs.poll();
// Task is empty, if there are still tasks, the task will be executed (number of tasks minus 1, submit tasks to the execution queue, number of tasks in progress +1)
// If the number of executing tasks is 0, the task is completed and the task cycle is interrupted
// If there is a timeout check, the timeout check mechanism is implemented
// If none of the above conditions are met, the task queue header is fetched and removed from the queue
if (f == null) {
if (ntasks > 0) {
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
}
else if (active == 0)
break;
else if (timed) {
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
if (f == null)
throw new TimeoutException();
nanos = deadline - System.nanoTime();
}
else
f = ecs.take();
}
// The task is not empty
if(f ! =null) {
// Executing flag -1
--active;
try {
// Returns the execution result
return f.get();
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = newExecutionException(rex); }}}if (ee == null)
ee = new ExecutionException();
throw ee;
} finally {
// Cancel all tasks
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true); }}// Execute method
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
long nanos = unit.toNanos(timeout);
// Create a task queue
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
// Iterate to perform tasks
try {
// Create a task and add it to the task queue
for (Callable<T> t : tasks)
futures.add(newTaskFor(t));
// Set the timeout flag
final long deadline = System.nanoTime() + nanos;
final int size = futures.size();
Alternate execution of time checks and calls in cases where the executor does not have much parallelism.
for (int i = 0; i < size; i++) {
execute((Runnable)futures.get(i));
nanos = deadline - System.nanoTime();
// The task times out and returns to the task queue
if (nanos <= 0L)
return futures;
}
// Iterate over the task and return the task execution result
for (int i = 0; i < size; i++) {
Future<T> f = futures.get(i);
if(! f.isDone()) {/ / timeout
if (nanos <= 0L)
return futures;
try {
// Given execution time to wait for the task to complete and return the result
f.get(nanos, TimeUnit.NANOSECONDS);
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
} catch (TimeoutException toe) {
return futures;
}
nanos = deadline - System.nanoTime();
}
}
done = true;
return futures;
} finally {
// Cancel execution if not completed
if(! done)for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true); }}/** * Create a task queue */
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
/** * Submit the task to the execution queue */
publicFuture<? > submit(Runnable task) {if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
returnftask; }}Copy the code
The above code has given us a basic understanding of how a thread pool creates and executes tasks, so we will only need to focus on some key ThreadPoolExecutor methods to understand how a thread pool works, and the corresponding thread pool patterns can be derived.
First of all, before we look at the source code, we will do some random thinking, sort out the thread pool execution process:
The main parameters of ThreadPoolExecutor have been briefly described, but we can summarize them in more detail:
public class ThreadPoolExecutor extends AbstractExecutorService {
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler; }}Copy the code
- CorePoolSize: maximum number of core threads in the thread pool
Core thread: When a thread pool creates a new thread, if the current total number of threads is smaller than corePoolSize, the new thread is a core thread. If it exceeds corePoolSize, the new thread is not a core thread. A core thread is always alive in the thread pool by default, even if the core thread is idle. If the allowCoreThreadTimeOut property of ThreadPoolExecutor is set to true, the core thread will be destroyed if it is idle for more than a certain amount of time.
- MaximumPoolSize: specifies the maximum number of threads in the thread pool
Total threads = number of core threads + number of non-core threads.
- KeepAliveTime: Specifies the idle timeout duration of non-core threads in the thread pool
AllowCoreThreadTimeOut = True A non-core thread that has been inactive for longer than this parameter is destroyed. AllowCoreThreadTimeOut = true
- Unit: keepAliveTime unit
TimeUnit is an enumeration that includes: NANOSECONDS: 1 microsecond = 1 microsecond / 1000 MICROSECONDS: 1 microsecond = 1 millisecond / 1000 MILLISECONDS: 1 millisecond = 1 second /1000 SECONDS MINUTES: HOURS DAYS: DAYS
- WorkQueue: Queue of tasks in a thread pool: Runnable objects that are maintained waiting to be executed
When all the core threads are working, new tasks are added to the queue for processing, and if the queue is full, a new non-core thread is created to execute the task.
-
ThreadFactory: How threads are created.
-
Handler: Indicates the exception handler.
Now that we know about task execution, how do tasks queue up?
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/* * 1. If the number of threads running is smaller than corepoolSize, the new task will directly start the new thread to execute. * Calls to addWorker atomically check the running state and workerCount, preventing false alarms by returning false alarms that add threads when they shouldn't. * * 2. If a task is successfully queued, we need to check again to see if we need to start a new thread to execute it. * Possible causes are: an existing task has been completed, or the thread pool has been terminated. * * * 3. If the task cannot be queued, try adding a new task thread. * If it fails, we know we are closed or saturated so reject the task. * /
// Run status tag
int c = ctl.get();
// If the number of executing threads is less than the number of core threads, the task is executed immediately
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// Check again to see if it is running and does not exceed the task queue capacity
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// If the task to be executed is not in the running state and not in the waiting queue, the task will be thrown exception
if (! isRunning(recheck) && remove(command))
reject(command);
// If the number of running tasks is empty, thread tasks outside the core thread are empty
else if (workerCountOf(recheck) == 0)
addWorker(null.false);
}
// The number of core threads exceeds and other threads fail to add tasks, raising an exception
else if(! addWorker(command,false))
reject(command);
}
Copy the code
See here we have a vague understanding of the task queue execution, all the task queue is the same queue execution, so what is our task queue?
-
LinkedBlockingQueue: Linear blocking queue. The task is received, and if it does not exceed corePoolSize, a new thread is created to execute it, otherwise the task is queued and blocked
-
ArrayBlockingQueue: ArrayBlockingQueue. The characteristic of an array is that it’s fixed in length, so this queue is fixed in length. A new task is received, and if it does not exceed corePoolSize, a new thread is created to execute it, or if it does, a new thread (total number of threads
-
SynchronousQueue: SynchronousQueue. Since it is a synchronous queue, it means that new tasks are executed as they come in. The number of core threads is infinite.
-
DelayQueue: indicates a DelayQueue. When receiving a task, the queue joins the queue first and executes the task only after the specified delay time is reached.
That said, we have basically analyzed the core of thread pools: JDK native thread pool types, thread factories in thread pools (for producing threads), thread pool task execution, thread pool task queuing, and thread pool queue types. We conclude this article with a diagram, but for other types of thread pool implementations, please check the source code.
Think: What else in Java development does this?