The official account is open for submission

Good writing is not afraid of going unread  Poking me contribute

Category: Java concurrency

After reading about

5

minutes

Author: Huang Junbin

Link: jianshu.com/p/dfeac8f3f9db

Source: Jane Book

Copyright belongs to the author, has authorized the original, prohibit unauthorized reprint.

Thread – Thread

01

Introduction to the

The smallest unit of scheduling in modern operating systems is threads, also known as lightweight processes. Multiple threads can be created within a process, each with its own counters, stacks, local variables and other attributes, and have access to shared memory variables. The processor switches between these threads at high speed, giving the user the impression that they are executing simultaneously.

02

Why multithreading

  • More processor cores

  • Faster response times

  • Better to the programming model

03

Thread state

A Java thread can be in six different states during its lifetime, and a thread can be in only one of these states at any given moment:

04

Common Methods

05

Wait and notice

One thread modifies the value of an object, and another thread senses the change and acts accordingly, starting in one thread and ending in another. The former is the producer, while the latter is the consumer. This mode isolates the “how” and “what”, achieves decoupling at the functional level, and has good scalability in architecture.

Principle of waiting party:

  1. Gets the lock of the object

  2. If the condition is not met, the lock’s wait() method is called so that the thread is in waiting and the condition is still checked after being notified

  3. If the conditions are met, the corresponding logic is executed

Pseudo code:

Synchronized (objects) {while{object. Wait (); } The corresponding logical processing}Copy the code

Principle of notifying Party:

  1. Gets the lock of the object

  2. Change the conditions

  3. Notifies all threads waiting on the object

Pseudo code:

Synchronized (object){change the condition object. NotifyAll (); }Copy the code

Implementation code:

public class WaitNotify{  static boolean flag = true;  static Object lock = new Object(a); public staticvoid main(String[] args) throws Exception{           Thread waitThread = new Thread(new Wait(),"WaitThread");           waitThread.start();           TimeUtil.SECONDS.sleep(1);           Thread notifyThread = new Thread(new Notify(),"NotifyThread");           notifyThread.start();    }static class  Wait implements Runnable{   public void run(){      / / lock      synchronized(lock){       // If the condition is not met, enter WAITTING state and release lock       while(flag){        System.out.println("flag is true ");          lock.wait();          }        // The condition is satisfied        System.out.println("doSomething");      }    }} static class  Notify implements Runnable{   public void run(){      / / lock      synchronized(lock){         // Get the lock of the lock, then notify, notification will not release the lock         // The WaitThread cannot return from the wait method until the notifying thread completes execution and releases the lock         lock.notifyAll();          System.out.println("flag is false now");         flag = false; }}}Copy the code

The output is as follows:

flag is trueflag is false nowdoSomething

ThreadPool – ThreadPool

01

Application scenarios of thread pools

Thread pools are the most widely used concurrency framework in Java, and can be used by almost any program that needs to perform tasks asynchronously or concurrently. There are three benefits to using thread pools properly during development:

  1. Reduce resource consumption. Reduce the cost of thread creation and destruction by reusing created threads.

  2. Improve response speed. When a task arrives, it can be executed immediately without waiting for the thread to be created.

  3. Improve thread manageability. Threads are scarce resources. If created without limit, they consume system resources and degrade system stability. Thread pools can be used for uniform allocation, tuning, and monitoring. However, to make good use of thread pools, you must have a thorough understanding of how they are implemented.

02

Implementation principle of thread pool

ThreadPoolExecutor

ThreadPoolExecutor performs the execute method in the following four cases:

  1. If fewer threads are currently running than corePoolSize, a new thread is created to perform the task (note that this step requires a global lock).

  2. If the running thread is equal to or more than corePoolSize, the task is added to BlockingQueue.

  3. If the task cannot be added to the BlockingQueue (the queue is full), a new thread is created to process the task (note: this step requires a global lock).

  4. If you create a new thread will make the currently running thread beyond maximumPoolSize, task will be rejected, and call the RejectedExecutionHandler. RejectedExecution () method.

The overall design idea for ThreadPoolExecutor to take these steps is to avoid acquiring global locks as much as possible when executing the execute() method (which would be a serious scalability bottleneck). After ThreadPoolExecutor has warmed up (the number of threads currently running is greater than or equal to corePoolSize), almost all execute() method calls execute Step 2, which does not require a global lock.

03

Executor Execution Flow (based on JDK7.0)

public void execute(Runnable command) {     if (command == null)         throw new NullPointerException();     int c = ctl.get();//AtomicInteger     / / 1.     if (workerCountOf(c) < corePoolSize) {         if (addWorker(command, true))             return;         c = ctl.get();     }     / / 2.     if (isRunning(c) && workQueue.offer(command)) {         int recheck = ctl.get();         if (! isRunning(recheck) && remove(command))/ / (21)             reject(command);         else if (workerCountOf(recheck) == 0)/ / (22)             addWorker(null.false);// Why false     }     / / 3.     else if(! addWorker(command,false)) reject(command); }Copy the code
  1. We first check to see if the number of threads in the current thread pool is smaller than the number of core threads we specified. If so, we try to create a new thread, make command its first task, and add them to the thread pool. However, when we call addWorker(command, true) to add a thread to the thread pool after determining the number of valid threads, it will be concurrent with multiple threads and may result in joining failure. If it succeeds, it will return directly. If it fails, it will retrieve CLT again, because at this time CLT must have changed, otherwise it will not fail. Continue to perform (2).

  2. Using isRunning(c) to determine if the thread pool is still running, we try to add the current command to the blocking queue. The process of joining is also concurrent and may fail. If this fails, proceed with (3). After joining the blocking queue successfully, we need to check again to prevent threads from closing during joining or threads from running out of threads in the thread pool, all because the idle time exceeds our specified aliveTime. If the thread pool is no longer in the RUNNING state, discard it using our rejection policy (21). If there are no threads in the pool, create an empty thread and let it block the queue for task execution (22).

  3. If neither of the above steps succeeds, then we need to use the maximum thread pool we specified to process it, but this can also fail, can multiple threads execute, if failed, the thread is discarded with the rejection policy.

private boolean addWorker(Runnable firstTask, boolean core) {      // (1) loop CAS to increase the number of threads in the pool by 1.      retry:      for (;;) {          int c = ctl.get();          int rs = runStateOf(c);          // Check if queue empty only if necessary.          if (rs >= SHUTDOWN &&              ! (rs == SHUTDOWN &&                 firstTask == null&&! workQueue.isEmpty()))return false;          for (;;) {              int wc = workerCountOf(c);              //core true adds threads to the core thread pool false adds threads to the maximum thread pool              // The number of threads exceeds the limit, cannot be added, directly return              if (wc >= CAPACITY ||                  wc >= (core ? corePoolSize : maximumPoolSize))                  return false;              //CAS changes the CLT value to +1 and drains space from the pool for the thread to be added              if (compareAndIncrementWorkerCount(c))                  break retry;              c = ctl.get();  // Re-read ctl              Return to the Retry outer loop if the state of the thread pool changes              if(runStateOf(c) ! = rs)continue retry;              // else CAS failed due to workerCount change; retry inner loop}}//(2) create a new thread and add it to the thread pool workers.      boolean workerStarted = false;      boolean workerAdded = false;      Worker w = null;      try {          // Workers operations need to be locked          final ReentrantLock mainLock = this.mainLock;          w = new Worker(firstTask);          final Thread t = w.thread;          if(t ! =null) {             // Refine the strength of the lock to prevent critical sections from being too large and wasting time              mainLock.lock();              try {                  // Recheck while holding lock.                  // Back out on ThreadFactory failure or if                  // shut down before lock acquired.                  int c = ctl.get();                  int rs = runStateOf(c);                  // Determine the state of the thread pool                  if (rs < SHUTDOWN ||                      (rs == SHUTDOWN && firstTask == null)) {                      // Determine the status of the added task, if the exception is already thrown                      if (t.isAlive()) // precheck that t is startable                          throw new IllegalThreadStateException();                     // Add the new thread to the thread pool                      workers.add(w);                      int s = workers.size();                      // Fix largestPoolSize                      if (s > largestPoolSize)                          largestPoolSize = s;                      workerAdded = true; }}finally {                  mainLock.unlock();              }              // Thread adds the thread pool successfully, then starts the newly created thread              if (workerAdded) {                  t.start();/ / (3)                  workerStarted = true; }}}finally {          // If the thread fails to add the thread pool or start the thread fails, the addWorkerFailed function needs to be called. If the thread is added successfully, the addWorkerFailed function needs to be removed and the CLT value is restored          if (! workerStarted)              addWorkerFailed(w);      }      returnworkerStarted; }Copy the code

04

How to implement thread reuse

Thread pool, each thread is a Worker, the Worker is an inner class, inherited AbstractQueuedSynchronizer.

Worker’s run method calls runWorker’s method, and runWorker’s method circulates getTask to get the blocked queue task, so as to achieve the purpose of thread reoccurrence.

The main fields of Worker are the following three, and the code is relatively simple:

// The thread that is actually running in the pool. Created from the thread factory we specifiedfinal Thread thread;// Thread wrapped tasks. Thread mainly calls the run method of the task when runningRunnable firstTask;// Record the number of tasks completed by the current threadvolatile long completedTasks;Copy the code

Worker’s constructor

Worker(Runnable firstTask) {    setState(-1); // Inhibit interrupts until runWorker    this.firstTask = firstTask;    // Create a thread using the thread factory we specified. Note that the argument is this, which means that when thread.run is executed, the Woker class's run method is actually executed    this.thread = getThreadFactory().newThread(this); }Copy the code
final void runWorker(Worker w) {    Thread wt = Thread.currentThread();    Runnable task = w.firstTask;    w.firstTask = null;    w.unlock(); // allow interrupts    boolean completedAbruptly = true;    try {        while(task ! =null|| (task = getTask()) ! =null) {            w.lock();            // The thread pool is stopped when it is stopped or when the current thread is interrupted. However, if the current thread is not interrupted, an interrupt request is issued            if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {            // Start the task before the Hook, similar to the callback function                beforeExecute(wt, task);                Throwable thrown = null;                try {                    // Execute the task                    task.run();                } catch (RuntimeException x) {                    thrown = x; throw x;                } catch (Error x) {                    thrown = x; throw x;                } catch (Throwable x) {                    thrown = x; throw new Error(x);                } finally {                // Hook after task execution, similar to callback functionafterExecute(task, thrown); }}finally {            // The task is reset after execution, completedTasks counter ++, and unlocked                task = null;                w.completedTasks++;                w.unlock();            }        }        completedAbruptly = false;    } finally {       // The Worker exits destruction when the thread is idle and reaches the value we set.        processWorkerExit(w, completedAbruptly);    }}Copy the code
private Runnable getTask() {    boolean timedOut = false; // Did the last poll() time out?    retry:    for (;;) {        int c = ctl.get();        int rs = runStateOf(c);        // if the thread pool is shutdown and the queue is empty, or if the thread pool is stopped or terminated, null is returned when the thread pool number is -1        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {            decrementWorkerCount();            return null;        }        // Indicates whether the current thread should be timed out when it is idle        boolean timed;            for (;;) {            int wc = workerCountOf(c);            // If allowCoreThreadTimeOut is true or the number of current threads is greater than the number of core thread pools, timeout reclamation is required            timed = allowCoreThreadTimeOut || wc > corePoolSize;            / / (1)            // If the number of threads is less than the maximum number of threads, and no timeout collection is allowed or not timed out, then the loop is broken and the queue continues to block.            if (wc <= maximumPoolSize && ! (timedOut && timed))                break;            // If not true, the current thread count -1, return null, reclaim the thread            if (compareAndDecrementWorkerCount(c))                return null;            // If the above if is not true, the CAS loop attempts to modify the CTL again            c = ctl.get();  // Re-read ctl            if(runStateOf(c) ! = rs)continue retry;            // else CAS failed due to workerCount change; retry inner loop} (2)try {        // Call the poll of the blocking queue if idle collection is allowed, otherwise take until there is a redeeming task in the queue            Runnable r = timed ?                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :                workQueue.take();            TimedOut = true; timedOut = true; Enter the next loop, and will not be established at (1), and then enter the program of CAS to modify CTL            if(r ! =null)                return r;            timedOut = true;        } catch (InterruptedException retry) {            timedOut = false; }}}Copy the code

05

How to Handle exceptions

The currently running thread beyond maximumPoolSize, the task will be rejected, and call the RejectedExecutionHandler. RejectedExecution () method, the default AbortPolicy exception handling.

  • ThreadPoolExecutor. AbortPolicy: discard task and throw RejectedExecutionException anomalies.

 /** * A handler for rejected tasks that throws a * {@code RejectedExecutionException}. */public static class AbortPolicy implements RejectedExecutionHandler {    /** * Creates an {@code AbortPolicy}. */    public AbortPolicy() { }    /** * Always throws RejectedExecutionException. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task * @throws RejectedExecutionException always. */    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {        throw new RejectedExecutionException("Task " + r.toString() +                                             " rejected from " +                                             e.toString());    }}Copy the code
  • ThreadPoolExecutor. DiscardPolicy: discard task too, but I don’t throw an exception.

/** * A handler for rejected tasks that silently discards the * rejected task. */public static class DiscardPolicy implements RejectedExecutionHandler {    /** * Creates a {@code DiscardPolicy}. */    public DiscardPolicy() { }    /** * Does nothing, which has the effect of discarding task r. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {    }}Copy the code
  • ThreadPoolExecutor. DiscardOldestPolicy: discard queue in front of the task, and then to try to perform a task (repeat)

/** * A handler for rejected tasks that discards the oldest unhandled * request and then retries {@code execute}, unless the executor * is shut down, in which case the task is discarded. */public static class DiscardOldestPolicy implements RejectedExecutionHandler {    /** * Creates a {@code DiscardOldestPolicy} for the given executor. */    public DiscardOldestPolicy() { }    /** * Obtains and ignores the next task that the executor * would otherwise execute, if one is immediately available, * and then retries execution of task r, unless the executor * is shut down, in which case task r is instead discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {        if (!e.isShutdown()) {            e.getQueue().poll();            e.execute(r);        }    }}Copy the code
  • ThreadPoolExecutor. CallerRunsPolicy: handle the tasks by the calling thread

/** * A handler for rejected tasks that throws a * {@code RejectedExecutionException}. */public static class AbortPolicy implements RejectedExecutionHandler {    /** * Creates an {@code AbortPolicy}. */    public AbortPolicy() { }    /** * Always throws RejectedExecutionException. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task * @throws RejectedExecutionException always. */    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {        throw new RejectedExecutionException("Task " + r.toString() +                                             " rejected from " +                                             e.toString());    }}Copy the code

Executor Framework

01

FixedThreadPool

A FixedThreadPool is called a reusable thread pool with a fixed number of threads:

public static ExecutorService newFixedThreadPool(int nThreads) {    return new ThreadPoolExecutor(        nThreads,        nThreads,        0L,        TimeUnit.MILLISECONDS,        newLinkedBlockingQueue<Runnable>()); }Copy the code

FixedThreadPool is suitable for scenarios where the current number of threads needs to be limited to meet resource management requirements. It is suitable for heavily loaded servers.

02

SingleThreadExecutor

Create singlethread-executor using a SingleThread by following the Executors API:

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {    return new FinalizableDelegatedExecutorService(        new ThreadPoolExecutor(        1.1.0L,        TimeUnit.MILLISECONDS,        newLinkedBlockingQueue<Runnable>(), threadFactory)); }Copy the code

SingleThreadExecutor is for tasks that need to be executed sequentially; And no more than one thread is active at any one point in time.

03

CachedThreadPool

Create a CachedThreadPool that creates new threads as required by the Executors API:

public static ExecutorService newCachedThreadPool() {    return new ThreadPoolExecutor(        0,        Integer.MAX_VALUE,        60L,        TimeUnit.SECONDS,        newSynchronousQueue<Runnable>()); }Copy the code

CachedThreadPool is an unbounded thread pool suitable for small programs that perform many short asynchronous tasks, or for lightly loaded servers.

04

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor usually use the factory class Executors to create. Executors can create two types of ScheduledThreadPoolExecutor, as follows:

  • ScheduledThreadPoolExecutor ScheduledThreadPoolExecutor: contains a number of threads.

  • ScheduledThreadPoolExecutor SingleThreadScheduledExecutor: contains only a single thread.

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {    return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1)); }public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return newScheduledThreadPoolExecutor(corePoolSize); }Copy the code

ScheduledThreadPoolExecutor applies to require multiple background threads execute cycle task, at the same time in order to meet the demand for resources management need to limit the number of background thread application scenario.

END

Have help? Click on the top right

Share it with friends, or forward it to moments

1. The public account is open for submission, which has many benefits.  Poking me contribute

2. Age: 43

How to use Git to manage porn?
Graphic Android Studio skills | play TODO and custom TODO
Summary and reflection of a 6-year software engineer

Thank you for liking or sharing

Don’t forget to scan for attention


Click “Read the original article” to see more