Whether for interview preparation or the use of actual project, understand the principle of Java thread pool is very helpful, because the project found why do some of the parameters set or unreasonable setting, as a result of our pile of thread pool are not familiar with, may lead to the decline in the quality of the whole project, such as CPU usage soared, memory OOM what of, well, Bi is not installed, although it is for search engine writing, but it is also a summary, so, in this arrangement of a multi-threaded implementation principle, to a learning summary of this knowledge point.
What does execute commit task do
Here is a classic diagram to illustrate the execution flow of execute:
public void execute(Runnable command) {
int c = ctl.get();
// case1. Create a core thread
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// case2.
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
If the thread pool is not running during enlisting, remove the task and let the saturation policy handle it
if (! isRunning(recheck) && remove(command))
reject(command);
// If there are no running threads, create a new one to handle
else if (workerCountOf(recheck) == 0)
addWorker(null.false);
}
// case3. Create a non-core thread, and perform saturation processing logic if the thread exceeds the threshold
else if(! addWorker(command,false))
reject(command);
}
Copy the code
Reject is a call to a specific reject policy, creating the worker thread addWorker.
private boolean addWorker(Runnable firstTask, boolean core) {
// The main purpose of the for loop is to determine whether there is a limit on the number of threads. If so, change the number of worker threads in the thread pool, otherwise return
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null&&! workQueue.isEmpty()))return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if(runStateOf(c) ! = rs)continue retry;
// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// Instantiate a worker thread
w = new Worker(firstTask);
final Thread t = w.thread;
if(t ! =null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// Join the work queue
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true; }}finally {
mainLock.unlock();
}
// Add a new worker thread and start the internal thread
if (workerAdded) {
t.start();
workerStarted = true; }}}finally {
if (! workerStarted)
// If processing fails, modify the workcount count and remove the worker thread from the queue (if a worker thread has been created but not started properly)
addWorkerFailed(w);
}
return workerStarted;
}
Copy the code
The logic is also easy:
We need to know what the Worker is at that time. Why can we execute our tasks by calling the start method of its internal thread? NewThread Runnable = newThread Runnable = newThread Runnable = newThread Runnable = newThread Runnable = newThread Runnable = newThread Runnable = newThread Runnable = newThread Runnable = newThread Runnable = newThread Runnable = newThread Runnable = newThread Runnable = newThread Runnable = newThread Runnable = newThread Runnable = newThread Runnable
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
// Hold the task
this.firstTask = firstTask;
Instantiate the internal concrete action thread from the thread pool,
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run(a) {
runWorker(this);
}
Copy the code
The rest of the work is left to the runWorker method. In fact, the whole life cycle of a worker ends when the runWorker method ends. The main function of a worker is to read and execute tasks in the runWorker by blocking. It ends when the configured keepAliveTime arrives:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// Loop to get the execution task
while(task ! =null|| (task = getTask()) ! =null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {
// Execute the front hook function
beforeExecute(wt, task);
Throwable thrown = null;
try {
// Call to execute our business logic
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// Execute the post-hook functionafterExecute(task, thrown); }}finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// Handle post-processing, such as changing the number of workCount from worker threads, removing the current worker object from workers, and ensuring that the minimum number of available threads is 1, etcprocessWorkerExit(w, completedAbruptly); }}Copy the code
Ok, submit is basically a call to execute, and if you look at this, you probably know that, oh, you can get results from execute, because submit is basically a simple encapsulation of execute.
public Future<? > submit(Runnable task) {if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
Copy the code
conclusion
It’s time to wrap things up, and we should know:
- Thread pool execution flow;
- The execution process of execute, the core method of thread;
- You know the relationship between execute and Submit and there’s still a lot to learn about thread pools like how to set parameters and how to gracefully close thread pools. See you in the next video.