1.3 the thread

1.3.1 Basic status of Threads in Java

Note: Waiting and Blocked correspond differently

Waiting corresponds to a thread calling the wait method (which must be inside the lock) entering a blocking queue for a heavyweight lock

Blocked corresponds to a thread that has been called by notify and is enqueued for a heavyweight lock

1.3.2 Deadlock conditions

  1. Mutually exclusive condition: This resource can only be occupied by one thread at any time.

  2. Request and hold conditions: when a process is blocked by requesting resources, it holds on to acquired resources.

  3. Non-deprivation condition: a thread cannot forcibly deprivate a resource it has acquired before using it up. The resource can be released only after the thread uses it up.

  4. Circular waiting condition: a circular waiting resource relationship is formed between several processes.

1.3.3 ThreadLocal

public class Thread implements Runnable {...// The ThreadLocal value associated with this thread. Maintained by the ThreadLocal class
	ThreadLocal.ThreadLocalMap threadLocals = null;
	// The InheritableThreadLocal value associated with this thread. Maintained by the InheritableThreadLocal class
	ThreadLocal.ThreadLocalMap inheritableThreadLocals = null; . }Copy the code
public void set(T value) {
 	Thread t = Thread.currentThread();
	 ThreadLocalMap map = getMap(t);
	 if(map ! =null)
		 map.set(this, value);
	 else
		 createMap(t, value);
 }
 ThreadLocalMap getMap(Thread t) {
	 return t.threadLocals;
 }
Copy the code

The final variable is placed in the current thread’s ThreadLocalMap, not in a ThreadLocal, which can be understood as a wrapper around a ThreadLocalMap, passing the value of the variable. ThreadLocalMap (thread.currentThread ()); ThreadLocalMap (Thread t); ThreadLocalMap (Thread t)

ThreadLocal as a key variable is stored as a Value

ThreadLocal memory leak

The key used in ThreadLocalMap is a weak reference to ThreadLocal and the value is a strong reference.

If ThreadLocal is not strongly referenced externally, the key will be cleaned up during garbage collection, but the value will not be. This will result in an Entry with a null key in ThreadLocalMap. If we do nothing, the value will never be collected by the GC, which could cause a memory leak.

The ThreadLocalMap implementation already takes this into account by calling the set(), get(), and remove() methods to clean up records with a null key.

It is best to call the remove() method manually after using ThreadLocal

Why weak references

If each key has a strong reference to a threadLocal, that threadLocal cannot be reclaimed because of a strong reference to an entry! Cause a memory leak, unless the thread terminates, the thread is reclaimed, the map is reclaimed, and in the case of a thread pool, it’s very difficult to terminate the thread.


1.3.4 Creating threads

  1. Through the extend Thread

  2. Through implement Runnable

  3. Through implement Callabele

    public class Thread2 {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            FutureTask<Integer> futureTask = new FutureTask<Integer>(new C());
            newThread(futureTask).start(); Integer integer = futureTask.get(); System.out.println(integer); }}class C implements Callable<Integer> {
    
        public Integer call(a) throws Exception {
            return 1; }}Copy the code

1.3.5 Thread Pool Problems

(1) Creation of thread pool

Do not allow Executors to create by ThreadPoolExecutor instead

/** * Creates a new ThreadPoolExecutor with the given initial parameters. * /
public ThreadPoolExecutor(
    	intCorePoolSize, // Number of core threadsintMaximumPoolSize, // Maximum number of threadslongBlockingQueue<Runnable> workQueue, // Block queue ThreadFactory ThreadFactory, // RejectedExecutionHandler handler)	// Reject the policy
{
        if(corePoolSize< 0||
        maximumPoolSize<=0||
        maximumPoolSize<corePoolSize ||
        keepAliveTime< 0)
        throw new IllegalArgumentException();
        if(workQueue==null||threadFactory==null||handler==null)
        throw new NullPointerException();
        this.corePoolSize=corePoolSize;
        this.maximumPoolSize=maximumPoolSize;
        this.workQueue=workQueue;
        this.keepAliveTime=unit.toNanos(keepAliveTime);
        this.threadFactory=threadFactory;
        this.handler=handler;
        }
Copy the code

(2) Rejection strategy

(3) Source code analysis

Threads will continue to be created only when the wait queue is full

public void execute(Runnable command) {

    //(1) If the task is null, an NPE exception is thrown
    if (command == null)
        throw new NullPointerException();

    // (2) get the combination of the current thread pool state and the number of threads variable
    int c = ctl.get();

    // (3) Whether the number of threads in the current thread pool is smaller than corePoolSize, or start a new thread
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }

    // if the thread pool is in the RUNNING state, add the task to the blocking queue
    if (isRunning(c) && workQueue.offer(command)) {

        // (4.1) Secondary check
        int recheck = ctl.get();
        // (4.2) If the current thread pool state is not RUNNING, the task is removed from the queue and the reject policy is executed
        if (! isRunning(recheck) && remove(command))
            reject(command);

        // (4.3) No If the current thread pool is empty, add a thread
        else if (workerCountOf(recheck) == 0)
            addWorker(null.false);
    }
    // (5) If the queue is full, a new thread will be created. If the new thread fails, the denial policy will be executed
    else if(! addWorker(command,false))
        reject(command);
}
Copy the code
// start a new thread after 12 lines
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // (6) Check whether the queue is empty only when necessary
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null&&! workQueue.isEmpty()))return false;

        // (7) loop cas to increase the number of threads
        for (;;) {
            int wc = workerCountOf(c);

            // (7.1) Return false if the number of threads exceeds the upper limit
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // (7.2) Cas increases the number of threads, and only one thread succeeds
            if (compareAndIncrementWorkerCount(c))
                break retry;
            If the cas fails, it will check whether the thread pool status changes. If the thread pool status changes, it will jump to the outer loop and retry to obtain the thread pool status. If the thread pool status changes, the inner loop will retry to obtain the CAS.
            c = ctl.get();  // Re-read ctl
            if(runStateOf(c) ! = rs)continueretry; }}// (8) Cas succeeded
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // (8.1) Create worker
        final ReentrantLock mainLock = this.mainLock;
        // Submit the task
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if(t ! =null) {

            (8.2) add an exclusive lock for workers synchronization, since multiple threads may have called the execute method of the thread pool.
            mainLock.lock();
            try {

                // (8.3) Recheck the thread pool state to avoid calling the shutdown interface before acquiring the lock
                int c = ctl.get();
                int rs = runStateOf(c);

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // (8.4) Add a task
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true; }}finally {
                mainLock.unlock();
            }
            // (8.5) If the task is added successfully, the task starts
            if (workerAdded) {
                // Start the task
                t.start();
                workerStarted = true; }}}finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
Copy the code
T.start calls the work's run method, which calls the runWorker methodCopy the code
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // If the submitted task is not empty, the submitted task is processed, if the submitted task is empty, the task is fetched from the blocking queue via getTask()
        while(task ! =null|| (task = getTask()) ! =null) {
            w.lock();
            if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally{ afterExecute(task, thrown); }}finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally{ processWorkerExit(w, completedAbruptly); }}Copy the code
private Runnable getTask(a) {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            If the number of threads is greater than the number of core threads, a thread must be killed
            && (wc > 1 || workQueue.isEmpty())) {
            // Perform operations through cas
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            // If the number of threads is smaller than the number of core threads, block
                workQueue.take();
            if(r ! =null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false; }}}Copy the code

When the thread’s run method executes the getTask () method, there are two types of return values

If null, the task continues until the thread dies. 2. The task is returned and the thread blocks in the blocking queueCopy the code

Two key methods are addwork and getTask


(4) Thread pool state

  • RUNNING (RUNNING, -1) : Can receive new tasks or process tasks in a blocking queue.

  • SHUTDOWN (to be closed, 0) : No new tasks can be accepted and the tasks in the blocking queue continue to be processed.

  • STOP (STOP, 1) : does not receive new tasks, does not process tasks in the blocking queue, and interrupts ongoing tasks.

  • TIDYING (TIDYING, 2) : All tasks have terminated, the “number of working threads” recorded by CTL is 0, and the thread pool changes to TIDYING state. The hook method terminated() is executed when the thread pool is in TIDYING state. For its implementation, nothing is done in ThreadPoolExecutor. The template method pattern is used and, like AQS tryAquire(), requires a subclass implementation. If you want to do something after entering TIDYING, you can reload it.

  • TERMINATED (TERMINATED, 3) : TERMINATED completely and all resources are TERMINATED.

(5) The thread pool throws an exception

There are two submission methods in the thread pool: submit using excute () and submit using submit ().

The excute () method catches all exceptions that your code might throw.

Using submit () requires getting an exception using the get () method of the returned Future object.

Solution:

  1. After each submit (), the exception get () method is called to see if the task executes properly
  2. ThreadPoolExecutor again. AfterExecute () method, to note here, handled separately
class ExtendedExecutor extends ThreadPoolExecutor {
   // ...
   protected void afterExecute(Runnable r, Throwable t) {
     super.afterExecute(r, t);
     if (t == null && r instanceofFuture<? >) {try{ Object result = ((Future<? >) r).get(); }catch (CancellationException ce) {
           t = ce;
       } catch (ExecutionException ee) {
           t = ee.getCause();
       } catch (InterruptedException ie) {
           Thread.currentThread().interrupt(); // ignore/reset}}if(t ! =null) System.out.println(t); }}Copy the code

1.3.6 ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor inherited from ThreadPoolExecutor, delay or cycle for task execution, belongs to a kind of thread pool. It also has the following features compared to ThreadPoolExecutor:

Use a specialized task type, ScheduledFutureTask, to perform periodic tasks, or to receive tasks that do not require time scheduling (these tasks are performed through the ExecutorService).

A dedicated storage queue – DelayedWorkQueue – is used to store tasks. DelayedWorkQueue is a kind of unbounded DelayQueue. It also simplifies the execution mechanism compared to ThreadPoolExecutor (the delayedExecute method, discussed separately below).

Optional run-after-shutdown parameter is supported, and optional logic is supported to decide whether to continue running cycles or delay tasks after the pool has been shutdown. And when the task (re) submit operation overlaps with the shutdown operation, the review logic is different.


public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory, handler);
}
  
Copy the code

The constructors call the construction of ThreadPoolExecutor through super and use a specific wait queue, DelayedWorkQueue.

public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                       long delay,
                                       TimeUnit unit) {
    if (callable == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<V> t = decorateTask(callable,
        new ScheduledFutureTask<V>(callable, triggerTime(delay, unit)));// Construct the ScheduledFutureTask task
    delayedExecute(t);// The task executes the main method
    return t;
}

Copy the code

Schedule is used to execute one-off (delayed) tasks. The function executes logic in two steps:

Encapsulation Callable/Runnable: First by triggerTime computing tasks, delay the time, and then through the Runnable ScheduledFutureTask constructor/Callable task structure for ScheduledThreadPoolExecutor can perform the task of type, Finally, call the decorateTask method to execute the user-defined logic; DecorateTask is a user-customizable extension method that returns the wrapped RunnableScheduledFuture task by default.

protected <V> RunnableScheduledFuture<V> decorateTask( Runnable runnable, RunnableScheduledFuture
       
         task)
        {
    return task;
}
Copy the code

Task execution: Use delayedExecute. Let’s analyze it in detail.

private void delayedExecute(RunnableScheduledFuture
        task) {
    if (isShutdown())
        reject(task);// The pool is closed
    else {
        super.getQueue().add(task);// Join the team
        if(isShutdown() && ! canRunInCurrentRunState(task.isPeriodic()) &&// Determine the run-after-shutdown parameter
            remove(task))// Remove the task
            task.cancel(false);
        else
            ensurePrestart();// Start a new thread waiting task}}Copy the code

EnsurePrestart is a method of parent ThreadPoolExecutor that starts a new worker thread waiting to execute the task, even if corePoolSize is 0. B: If the pool is closed and the run-after-shutdown parameter is false, the ThreadPoolExecutor method remove is executed to remove the specified task from the queue. After successfully removed call ScheduledFutureTask. Cancel to cancel the task

Core methods :scheduleAtFixedRate and scheduleWithFixedDelay

/** * Create a periodic execution of the task, the first execution of the delay time is initialDelay, * every period after the execution of the task, not waiting for the completion of the first execution to start the timer */
publicScheduledFuture<? > scheduleAtFixedRate(Runnable command,long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    // Build RunnableScheduledFuture task type
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),// Calculate the delay time of the task
                                      unit.toNanos(period));// Calculate the execution period of the task
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);// Execute user-defined logic
    sft.outerTask = t;// Assigns a value to the outerTask and prepares to rejoin the queue for the next execution
    delayedExecute(t);// Execute the task
    return t;
}

/** * Create a periodic execution of the task, the first execution of the delay time is initialDelay, * after the first execution of the delay to start the next execution */
publicScheduledFuture<? > scheduleWithFixedDelay(Runnable command,long initialDelay,
                                                 long delay,
                                                 TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (delay <= 0)
        throw new IllegalArgumentException();
    // Build RunnableScheduledFuture task type
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),// Calculate the delay time of the task
                                      unit.toNanos(-delay));// Calculate the execution period of the task
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);// Execute user-defined logic
    sft.outerTask = t;// Assigns a value to the outerTask and prepares to rejoin the queue for the next execution
    delayedExecute(t);// Execute the task
    return t;
}

Copy the code

One is a fixed delay, and the other is that you have to wait for the return before starting the delay

reference

That’s a good explanation

This also line