1.3 the thread
1.3.1 Basic status of Threads in Java
Note: Waiting and Blocked correspond differently
Waiting corresponds to a thread calling the wait method (which must be inside the lock) entering a blocking queue for a heavyweight lock
Blocked corresponds to a thread that has been called by notify and is enqueued for a heavyweight lock
1.3.2 Deadlock conditions
-
Mutually exclusive condition: This resource can only be occupied by one thread at any time.
-
Request and hold conditions: when a process is blocked by requesting resources, it holds on to acquired resources.
-
Non-deprivation condition: a thread cannot forcibly deprivate a resource it has acquired before using it up. The resource can be released only after the thread uses it up.
-
Circular waiting condition: a circular waiting resource relationship is formed between several processes.
1.3.3 ThreadLocal
public class Thread implements Runnable {...// The ThreadLocal value associated with this thread. Maintained by the ThreadLocal class
ThreadLocal.ThreadLocalMap threadLocals = null;
// The InheritableThreadLocal value associated with this thread. Maintained by the InheritableThreadLocal class
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null; . }Copy the code
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if(map ! =null)
map.set(this, value);
else
createMap(t, value);
}
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
Copy the code
The final variable is placed in the current thread’s ThreadLocalMap, not in a ThreadLocal, which can be understood as a wrapper around a ThreadLocalMap, passing the value of the variable. ThreadLocalMap (thread.currentThread ()); ThreadLocalMap (Thread t); ThreadLocalMap (Thread t)
ThreadLocal as a key variable is stored as a Value
ThreadLocal memory leak
The key used in ThreadLocalMap is a weak reference to ThreadLocal and the value is a strong reference.
If ThreadLocal is not strongly referenced externally, the key will be cleaned up during garbage collection, but the value will not be. This will result in an Entry with a null key in ThreadLocalMap. If we do nothing, the value will never be collected by the GC, which could cause a memory leak.
The ThreadLocalMap implementation already takes this into account by calling the set(), get(), and remove() methods to clean up records with a null key.
It is best to call the remove() method manually after using ThreadLocal
Why weak references
If each key has a strong reference to a threadLocal, that threadLocal cannot be reclaimed because of a strong reference to an entry! Cause a memory leak, unless the thread terminates, the thread is reclaimed, the map is reclaimed, and in the case of a thread pool, it’s very difficult to terminate the thread.
1.3.4 Creating threads
-
Through the extend Thread
-
Through implement Runnable
-
Through implement Callabele
public class Thread2 { public static void main(String[] args) throws ExecutionException, InterruptedException { FutureTask<Integer> futureTask = new FutureTask<Integer>(new C()); newThread(futureTask).start(); Integer integer = futureTask.get(); System.out.println(integer); }}class C implements Callable<Integer> { public Integer call(a) throws Exception { return 1; }}Copy the code
1.3.5 Thread Pool Problems
(1) Creation of thread pool
Do not allow Executors to create by ThreadPoolExecutor instead
/** * Creates a new ThreadPoolExecutor with the given initial parameters. * /
public ThreadPoolExecutor(
intCorePoolSize, // Number of core threadsintMaximumPoolSize, // Maximum number of threadslongBlockingQueue<Runnable> workQueue, // Block queue ThreadFactory ThreadFactory, // RejectedExecutionHandler handler) // Reject the policy
{
if(corePoolSize< 0||
maximumPoolSize<=0||
maximumPoolSize<corePoolSize ||
keepAliveTime< 0)
throw new IllegalArgumentException();
if(workQueue==null||threadFactory==null||handler==null)
throw new NullPointerException();
this.corePoolSize=corePoolSize;
this.maximumPoolSize=maximumPoolSize;
this.workQueue=workQueue;
this.keepAliveTime=unit.toNanos(keepAliveTime);
this.threadFactory=threadFactory;
this.handler=handler;
}
Copy the code
(2) Rejection strategy
(3) Source code analysis
Threads will continue to be created only when the wait queue is full
public void execute(Runnable command) {
//(1) If the task is null, an NPE exception is thrown
if (command == null)
throw new NullPointerException();
// (2) get the combination of the current thread pool state and the number of threads variable
int c = ctl.get();
// (3) Whether the number of threads in the current thread pool is smaller than corePoolSize, or start a new thread
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// if the thread pool is in the RUNNING state, add the task to the blocking queue
if (isRunning(c) && workQueue.offer(command)) {
// (4.1) Secondary check
int recheck = ctl.get();
// (4.2) If the current thread pool state is not RUNNING, the task is removed from the queue and the reject policy is executed
if (! isRunning(recheck) && remove(command))
reject(command);
// (4.3) No If the current thread pool is empty, add a thread
else if (workerCountOf(recheck) == 0)
addWorker(null.false);
}
// (5) If the queue is full, a new thread will be created. If the new thread fails, the denial policy will be executed
else if(! addWorker(command,false))
reject(command);
}
Copy the code
// start a new thread after 12 lines
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// (6) Check whether the queue is empty only when necessary
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null&&! workQueue.isEmpty()))return false;
// (7) loop cas to increase the number of threads
for (;;) {
int wc = workerCountOf(c);
// (7.1) Return false if the number of threads exceeds the upper limit
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// (7.2) Cas increases the number of threads, and only one thread succeeds
if (compareAndIncrementWorkerCount(c))
break retry;
If the cas fails, it will check whether the thread pool status changes. If the thread pool status changes, it will jump to the outer loop and retry to obtain the thread pool status. If the thread pool status changes, the inner loop will retry to obtain the CAS.
c = ctl.get(); // Re-read ctl
if(runStateOf(c) ! = rs)continueretry; }}// (8) Cas succeeded
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// (8.1) Create worker
final ReentrantLock mainLock = this.mainLock;
// Submit the task
w = new Worker(firstTask);
final Thread t = w.thread;
if(t ! =null) {
(8.2) add an exclusive lock for workers synchronization, since multiple threads may have called the execute method of the thread pool.
mainLock.lock();
try {
// (8.3) Recheck the thread pool state to avoid calling the shutdown interface before acquiring the lock
int c = ctl.get();
int rs = runStateOf(c);
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// (8.4) Add a task
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true; }}finally {
mainLock.unlock();
}
// (8.5) If the task is added successfully, the task starts
if (workerAdded) {
// Start the task
t.start();
workerStarted = true; }}}finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Copy the code
T.start calls the work's run method, which calls the runWorker methodCopy the code
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// If the submitted task is not empty, the submitted task is processed, if the submitted task is empty, the task is fetched from the blocking queue via getTask()
while(task ! =null|| (task = getTask()) ! =null) {
w.lock();
if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally{ afterExecute(task, thrown); }}finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally{ processWorkerExit(w, completedAbruptly); }}Copy the code
private Runnable getTask(a) {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
If the number of threads is greater than the number of core threads, a thread must be killed
&& (wc > 1 || workQueue.isEmpty())) {
// Perform operations through cas
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
// If the number of threads is smaller than the number of core threads, block
workQueue.take();
if(r ! =null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false; }}}Copy the code
When the thread’s run method executes the getTask () method, there are two types of return values
If null, the task continues until the thread dies. 2. The task is returned and the thread blocks in the blocking queueCopy the code
Two key methods are addwork and getTask
(4) Thread pool state
-
RUNNING (RUNNING, -1) : Can receive new tasks or process tasks in a blocking queue.
-
SHUTDOWN (to be closed, 0) : No new tasks can be accepted and the tasks in the blocking queue continue to be processed.
-
STOP (STOP, 1) : does not receive new tasks, does not process tasks in the blocking queue, and interrupts ongoing tasks.
-
TIDYING (TIDYING, 2) : All tasks have terminated, the “number of working threads” recorded by CTL is 0, and the thread pool changes to TIDYING state. The hook method terminated() is executed when the thread pool is in TIDYING state. For its implementation, nothing is done in ThreadPoolExecutor. The template method pattern is used and, like AQS tryAquire(), requires a subclass implementation. If you want to do something after entering TIDYING, you can reload it.
-
TERMINATED (TERMINATED, 3) : TERMINATED completely and all resources are TERMINATED.
(5) The thread pool throws an exception
There are two submission methods in the thread pool: submit using excute () and submit using submit ().
The excute () method catches all exceptions that your code might throw.
Using submit () requires getting an exception using the get () method of the returned Future object.
Solution:
- After each submit (), the exception get () method is called to see if the task executes properly
- ThreadPoolExecutor again. AfterExecute () method, to note here, handled separately
class ExtendedExecutor extends ThreadPoolExecutor {
// ...
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t == null && r instanceofFuture<? >) {try{ Object result = ((Future<? >) r).get(); }catch (CancellationException ce) {
t = ce;
} catch (ExecutionException ee) {
t = ee.getCause();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // ignore/reset}}if(t ! =null) System.out.println(t); }}Copy the code
1.3.6 ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor inherited from ThreadPoolExecutor, delay or cycle for task execution, belongs to a kind of thread pool. It also has the following features compared to ThreadPoolExecutor:
Use a specialized task type, ScheduledFutureTask, to perform periodic tasks, or to receive tasks that do not require time scheduling (these tasks are performed through the ExecutorService).
A dedicated storage queue – DelayedWorkQueue – is used to store tasks. DelayedWorkQueue is a kind of unbounded DelayQueue. It also simplifies the execution mechanism compared to ThreadPoolExecutor (the delayedExecute method, discussed separately below).
Optional run-after-shutdown parameter is supported, and optional logic is supported to decide whether to continue running cycles or delay tasks after the pool has been shutdown. And when the task (re) submit operation overlaps with the shutdown operation, the review logic is different.
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
Copy the code
The constructors call the construction of ThreadPoolExecutor through super and use a specific wait queue, DelayedWorkQueue.
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable, triggerTime(delay, unit)));// Construct the ScheduledFutureTask task
delayedExecute(t);// The task executes the main method
return t;
}
Copy the code
Schedule is used to execute one-off (delayed) tasks. The function executes logic in two steps:
Encapsulation Callable/Runnable: First by triggerTime computing tasks, delay the time, and then through the Runnable ScheduledFutureTask constructor/Callable task structure for ScheduledThreadPoolExecutor can perform the task of type, Finally, call the decorateTask method to execute the user-defined logic; DecorateTask is a user-customizable extension method that returns the wrapped RunnableScheduledFuture task by default.
protected <V> RunnableScheduledFuture<V> decorateTask( Runnable runnable, RunnableScheduledFuture
task)
{
return task;
}
Copy the code
Task execution: Use delayedExecute. Let’s analyze it in detail.
private void delayedExecute(RunnableScheduledFuture
task) {
if (isShutdown())
reject(task);// The pool is closed
else {
super.getQueue().add(task);// Join the team
if(isShutdown() && ! canRunInCurrentRunState(task.isPeriodic()) &&// Determine the run-after-shutdown parameter
remove(task))// Remove the task
task.cancel(false);
else
ensurePrestart();// Start a new thread waiting task}}Copy the code
EnsurePrestart is a method of parent ThreadPoolExecutor that starts a new worker thread waiting to execute the task, even if corePoolSize is 0. B: If the pool is closed and the run-after-shutdown parameter is false, the ThreadPoolExecutor method remove is executed to remove the specified task from the queue. After successfully removed call ScheduledFutureTask. Cancel to cancel the task
Core methods :scheduleAtFixedRate and scheduleWithFixedDelay
/** * Create a periodic execution of the task, the first execution of the delay time is initialDelay, * every period after the execution of the task, not waiting for the completion of the first execution to start the timer */
publicScheduledFuture<? > scheduleAtFixedRate(Runnable command,long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
// Build RunnableScheduledFuture task type
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),// Calculate the delay time of the task
unit.toNanos(period));// Calculate the execution period of the task
RunnableScheduledFuture<Void> t = decorateTask(command, sft);// Execute user-defined logic
sft.outerTask = t;// Assigns a value to the outerTask and prepares to rejoin the queue for the next execution
delayedExecute(t);// Execute the task
return t;
}
/** * Create a periodic execution of the task, the first execution of the delay time is initialDelay, * after the first execution of the delay to start the next execution */
publicScheduledFuture<? > scheduleWithFixedDelay(Runnable command,long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
// Build RunnableScheduledFuture task type
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),// Calculate the delay time of the task
unit.toNanos(-delay));// Calculate the execution period of the task
RunnableScheduledFuture<Void> t = decorateTask(command, sft);// Execute user-defined logic
sft.outerTask = t;// Assigns a value to the outerTask and prepares to rejoin the queue for the next execution
delayedExecute(t);// Execute the task
return t;
}
Copy the code
One is a fixed delay, and the other is that you have to wait for the return before starting the delay
reference
That’s a good explanation
This also line