preface
JDK 1.8 source code to see about, plan to record a little difficult to understand the source code.
My JDK 1.8 source code comment github address
https://github.com/zhangpanqin/fly-jdk8
Copy the code
Depending on the source code, look at the source code can really learn a lot, both in theory and practice. Don’t look at the source code is not necessarily what all don’t understand.
Skill level is not enough, you see the source code harvest will not be much, some ideas you can not understand.
Threads and thread pools
In Linux, a child process can be created by calling fork. By passing different arguments to fork, the child process can share the memory of the parent process.
In The Linux system, the Java Thread Thread is actually a lightweight child process generated by the system call fork, which shares the memory area of the parent process, so as to achieve the purpose of multi-threading.
System calls require CPU to switch from user mode to kernel mode, which takes a long time compared to CPU execution time and consumes system resources. So you have a thread pool, where the thread is actually created and not destroyed, and the run method has an infinite loop that takes Runable from the blocking queue.
// Thread pool simplified version of the principle, just to understand thread pools
public class ThreadPoolExecutor2 {
private static final BlockingQueue<Runnable> QUEUE = new LinkedBlockingQueue();
public boolean execute(Runnable task) {
return QUEUE.offer(task);
}
static {
new Thread(() -> {
try {
Runnable take;
while (true) {
take = QUEUE.take();
if(Objects.nonNull(take)) { take.run(); }}}catch(Throwable e) { } }).start(); }}Copy the code
Thread pool usage
The JDK provides a thread pool implementation, ThreadPoolExecutor, that we use most in our daily development.
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,
long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {}Copy the code
corePoolSize
Number of core threads in the thread pool
Core threads are the number of threads that are not recycled when they are idle for a period of time. Also can let the core configuration parameters, thread free don’t recycle ThreadPoolExecutor. AllowCoreThreadTimeOut
maximumPoolSize
Maximum number of threads in a thread pool
After the number of core threads is exceeded, threads are reclaimed when idle for a period of time
long keepAliveTime,TimeUnit unit
How long a thread is idle is reclaimedworkQueue
Block queue, where received tasks are stored. To avoid oom, be sure to set queue sizethreadFactory
Create a factory for threads
// We can define the prefix of the thread name in the thread factory to determine which business thread pool has the problem
Threads in the thread pool default to worker threads. You can set the threads created by the thread factory to daemon threads
private static ThreadFactory getThreadFactory(a) {
final ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder();
threadFactoryBuilder.setNameFormat("order-thread-poll-%s");
// Sets whether the thread in the thread pool is a daemon thread
threadFactoryBuilder.setDaemon(true);
/ / when the Thread execution exception happens, the JVM will call Thread. DispatchUncaughtException, then call UncaughtExceptionHandler Settings
threadFactoryBuilder.setUncaughtExceptionHandler((thread, throwable) -> {
System.out.println(StrUtil.format("Thread execution exception: {}", thread.getName()));
System.out.println(StrUtil.format("Thread execution is abnormal. Exception message: {}", throwable.getMessage()));
});
return threadFactoryBuilder.build();
}
Copy the code
- Handler Rejection policy for tasks that cannot be accepted by the thread pool
The tasks in the queue need memory. Due to the limited memory, we cannot accept tasks without limit. When a task cannot be accepted by the thread pool, we need to execute the policy according to how to reject the task or execute the task.
AbortPolicy: throws an exception when calling execute CallerRunsPolicy: executes this task in the caller thread. That's the run method that actually executes the Runable when you call execute synchronously. DiscardOldestPolicy: Discards the longest task in the queue and then calls the execute(Runable) DiscardPolicy of the thread pool again. DiscardOldestPolicy: Discards the task without processing it. The caller is not aware of itCopy the code
Thread pool source code
Threads have the state of threads. Thread pools also have thread pool states.
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* 表示线程池的状态和线程池中线程数量
* int 占四个字节,32 bit
* 高三位表示线程池的状态,后 29 表示线程的数量
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
/ / COUNT_BITS to 29
private static final int COUNT_BITS = Integer.SIZE - 3;
/** * can accept new tasks or process tasks in a blocking queue * the first three digits are 111 */
private static final int RUNNING = -1 << COUNT_BITS;
/** * does not accept new tasks, but can process tasks in the blocking queue * the first three digits are 000 */
private static final int SHUTDOWN = 0 << COUNT_BITS;
/** * does not accept new tasks, does not process tasks that block queue columns, interrupts tasks being processed * the first three digits are 001 */
private static final int STOP = 1 << COUNT_BITS;
/** * transitive state, which means that all tasks are finished and there are no valid threads in the thread pool, the thread pool state will be TIDYING and the terminated method will be called with the first three digits being 010 */
private static final int TIDYING = 2 << COUNT_BITS;
/** * The thread pool is calling the terminated method and the resource is terminated * the first three digits are 011 */
private static final int TERMINATED = 3 << COUNT_BITS;
/** * get the thread pool state */
private static int runStateOf(int c) {
return c & ~CAPACITY;
}
/** * gets the number of worker threads */
private static int workerCountOf(int c) {
returnc & CAPACITY; }}Copy the code
Interrupt () returns true if thread.isinterrupted () isInterrupted, but the interrupted Thread is marked as interrupted. InterruptedException is thrown when a thread is interrupted while blocking and waiting. If caught and handled in the thread’s RUN method, the thread exits.
// Threads cannot be stopped, and therefore threads cannot be stopped.
public static void main1(String[] args) {
THREAD_POOL_EXECUTOR.execute(() -> {
while (true) {}}); THREAD_POOL_EXECUTOR.shutdownNow(); }// When an interrupt exception is caught and the thread does not handle the exception, the thread exits and the thread pool exits
public static void main2(String[] args) {
THREAD_POOL_EXECUTOR.execute(() -> {
while (true) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
throw newRuntimeException(e); }}}); THREAD_POOL_EXECUTOR.shutdownNow(); }Copy the code
execute
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}
int c = ctl.get();
If the number of threads in the thread pool is less than the number of core threads, create a new thread to execute the task, return. * /
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) {
return;
}
c = ctl.get();
}
/** * The number of threads in the thread pool exceeds the number of core threads, and the task is added to the queue, waiting to be executed. * If the task fails to be queued, start a new thread to execute the task if the maximum number of threads has not been reached; The maximum number of threads is reached, and the reject policy is executed. * /
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// Check the thread pool status again and remove the task from the queue if the thread is closed
if(! isRunning(recheck) && remove(command)) { reject(command);// If the thread pool is running but there are no worker processes. Add a worker thread that will execute the task from the queue
} else if (workerCountOf(recheck) == 0) {
addWorker(null.false); }}else if(! addWorker(command,false)) { reject(command); }}Copy the code
addWorker
// Create a new thread and call the thread's start method, returning true
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
/** * Double-layer for loop to determine whether the thread pool state is running and whether the number of threads meets the definition */
for(; ;) {int c = ctl.get();
/** * RS indicates the thread pool running state */
int rs = runStateOf(c);
/** * 1. When the thread pool is shutdown, tasks cannot be added. Return false */ when there is a task. After the thread pool has been shutdown, false */ when the task queue is empty
if(rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask ==null && !workQueue.isEmpty())) {
return false;
}
for(; ;) {// Determine whether the number of threads in the thread pool meets the definition
int wc = workerCountOf(c);
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) {
return false;
}
// How does CAS work on thread count
if (compareAndIncrementWorkerCount(c)) {
break retry;
}
c = ctl.get(); // Re-read ctl
if(runStateOf(c) ! = rs) {continue retry;
}
// else CAS failed due to workerCount change; retry inner loop}}// Whether the thread in the worker calls the start method
boolean workerStarted = false;
// Add the worker to the workers HashSet
boolean workerAdded = false;
Worker w = null;
try {
Create a new thread
w = new Worker(firstTask);
final Thread t = w.thread;
if(t ! =null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) {
throw new IllegalThreadStateException();
}
workers.add(w);
int s = workers.size();
if (s > largestPoolSize) {
largestPoolSize = s;
}
workerAdded = true; }}finally {
mainLock.unlock();
}
// Add worker to workers, indicating that the worker is used for the first time. To start this thread, start
if (workerAdded) {
t.start();
workerStarted = true; }}}finally {
if (!workerStarted) {
addWorkerFailed(w);
}
}
return workerStarted;
}
Copy the code
Worker.run
// Actually call runWorker
public void run(a) {
runWorker(this);
}
Copy the code
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
// Whether thread execution is caused by an exception,true means that the exception exits
boolean completedAbruptly = true;
try {
// The thread keeps getting the queue head task to execute
// getTask actually calls the workqueue.poll (keepAliveTime, timeunit.nanoseconds) of the blocking queue.
GetTask returns null thread exit due to timeout when the number of threads in the thread pool is greater than the number of core threads. The thread is freed
while(task ! =null|| (task = getTask()) ! =null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) { wt.interrupt(); }try {
// The hook function before the task executes
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
// The hook function after the task is executedafterExecute(task, thrown); }}finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally{ processWorkerExit(w, completedAbruptly); }}Copy the code
tryTerminate
TryTerminate attempts to close the thread pool.
/** * When removing work, try to determine whether the thread pool can exit */
final void tryTerminate(a) {
for(; ;) {int c = ctl.get();
if(isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) {return;
}
/** * If the worker thread is not 0, break a thread */
if(workerCountOf(c) ! =0) {
interruptIdleWorkers(ONLY_ONE);
return;
}
/** * The thread is terminated() and the task in the queue is terminated(). Set the thread pool state to TIDYING. Set the thread pool state to terminated */ after calling the terminated() method
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return; }}finally{ mainLock.unlock(); }}}Copy the code
This article was created by Zhang Panqin on his blog www.mflyyou.cn/. It can be reproduced and quoted freely, but the author must be signed and indicate the source of the article.
If reprinted to wechat official account, please add the author’s official qr code at the end of the article. Wechat official account name: Mflyyou