About the author

Guo Xiaoxing, programmer and guitarist, is mainly engaged in the infrastructure of Android platform. Welcome to exchange technical questions. You can go to my Github to raise an issue or send an email to [email protected] to communicate with me.

The article directories

  • One thread principle
    • 1.1 Thread Creation
    • 1.2 Thread Scheduling
  • Two-thread synchronization
    • 2.1 volatile
    • 2.2 synchronized
  • The three thread pool
    • 3.1 Thread pool scheduling
    • 3.2 Thread pool configuration
    • 3.1 Thread pool monitoring
  • Four-thread pool applications
    • 4.1 AsyncTask
    • 4.2 Okhttp

This article is intended to discuss the principles and practical experience of multi-threaded concurrency in Java. It is not intended to be a tutorial. You can refer to other articles on the web for details.

One thread principle

1.1 Thread Creation

Thread is a more lightweight scheduling unit than process. The introduction of thread can separate the resource allocation and execution scheduling of process. Each thread can share process resources and schedule independently.

The difference between a process and a thread is usually explained in this way. In article 01Android Process Framework: Process Startup creation, Startup, and Scheduling, we dissected the nature of a process. Let’s briefly recall it here.

A description of the nature of the process:

As we know, the code is static, and the system composed of code and resources needs a dynamic existence if it wants to run. The process is the dynamic execution process of the program. What is process? A process is a collection of code and related resources that handle execution state, including code snippets, files, signals, CPU state, memory address space, and so on.

Processes are described using the task_struct structure, as follows:

  • Code snippet: instructions formed after compilation
  • Data segment: Data required by the program to run
    • Read-only data segment: constant
    • Initialized data segment: global variable, static variable
    • Uninitialized Data segment (BSS) : Uninitialized global and static variables
  • Stack segment: Some memory dynamically allocated while a program is running
  • PCB: process information, status identification, etc

Let’s take a look at the Java thread creation sequence diagram as follows:

As you can see, you end up calling the pthread_create() method of the pThread library to create a new thread, which is also described using the task_struct structure, but which does not have its own independent address space, but instead shares address space and resources with the process it is running on.

So you can see that there is no essential difference between a process and a thread for a virtual machine, other than whether it has a separate address space.

Let’s look at how threads are scheduled.

1.2 Thread Scheduling

Thread state flow diagram

  • NEW: Creation state, after the thread is created, but not yet started.
  • RUNNABLE: Running state. A thread in the running state may be in a waiting state, such as waiting for CPU or I/O.
  • WAITING: a state in which wait(), join(), locksupport.spark () are called.
  • TIMED WAITING: indicates the TIMED wait state. Wait (time), join(time), locksupport.sparknanos (), locksupport.sparkUnit () and other methods are called.
  • BLOCKED: a state that is BLOCKED waiting for a lock to be released, for example by invoking synchronized to increase the lock.
  • TERMINATED: State of termination. Normally, the thread exits after completing a task or terminates abnormally.

NEW, WAITING, and TIMED_WAITING are easy to understand, but we’ll focus on RUNNABLE and BLOCKED.

There are five situations when a thread enters a RUNNABLE state:

  • The thread finds the sleep time after calling sleep(time)
  • The blocking IO called by the thread has returned, and the blocking method has completed execution
  • The thread successfully acquired the resource lock
  • A thread is waiting for a notification and has successfully received a notification from another thread
  • The thread is suspended, and the resume() method is called to unsuspend the thread.

When a thread enters a BLOCKED state, there are generally five cases:

  • The thread calls the sleep() method to give up the occupied resource
  • The thread calls a method of blocking IO and blocks before the method returns.
  • The thread view acquires a resource lock, but the resource lock is being held by another thread lock.
  • The thread is waiting for a notification
  • The thread scheduler calls the suspend() method to suspend the thread

Let’s take a look at some of the methods associated with thread state.

  • The sleep() method suspends the currently executing Thread for a specified amount of time, which can be retrieved from thread.currentThread ().
  • The yield() method gives up CPU resources held by the thread, leaving them to other tasks to occupy CPU execution time. However, the time of giving up is not certain. It is possible to give up and get the CPU time slice immediately.
  • The wait() method waits for the thread currently executing code, places the current thread into a pre-execution queue, and stops execution at the code waiting () until notified or interrupted. This method causes the thread calling the method to release the lock on the shared resource, exit from the run state, and wait until it is woken up again. This method can only be invoked in the synchronized code block, otherwise you will be thrown IllegalMonitorStateException anomalies.
  • The wait(long millis) method waits for a certain amount of time for a thread to wake up the lock, and then wakes up automatically.
  • The notify() method is used to notify other threads that may be waiting for the object lock on the object. This method can randomly wake up a thread in the waiting queue for the same shared resource, and cause the thread to exit the waiting queue and enter the runnable state.
  • NotifyAll () method notifyAll() is a method that exits from the wait state and enters the runnable state of all threads waiting for the same shared resource in the wait queue. The notifyAll() method is usually executed by the thread with higher priority, but may be executed randomly depending on the implementation of the VIRTUAL machine.
  • The join() method can queue the code after the calling thread completes its normal execution.

Two-thread synchronization

Thread-safe, by thread-safe we mean relatively thread-safe, it means that the individual operations on this object are thread-safe, and we don’t have to do extra safeguards when we call them.

What do you mean relatively safe? 🤔

🤞 for example

We know that a Vector in Java is thread-safe. It is safe to insert, delete, and read from a multithreaded environment, but only if it is operated on by one thread at a time. If multiple threads operate on a Vector at the same time, it is no longer thread-safe.

    final Vector<String> vector = new Vector<>();

    while (true) {
        for (int i = 0; i < 10; i++) {
            vector.add("项:" + i);
        }

        Thread removeThread = new Thread(new Runnable() {
            @Override
            public void run(a) {
                for (int i = 0; i < vector.size(); i++) { vector.remove(i); }}}); Thread printThread =new Thread(new Runnable() {
            @Override
            public void run(a) {
                for (int i = 0; i < vector.size(); i++) { Log.d(TAG, vector.get(i)); }}}); removeThread.start(); printThread.start();if (Thread.activeCount() >= 20) {
            return; }}Copy the code

But the program crashed

The correct way to do this would be to add a synchronization lock to the vector as follows:

        final Vector<String> vector = new Vector<>();

        while (true) {
            for (int i = 0; i < 10; i++) {
                vector.add("项:" + i);
            }

            Thread removeThread = new Thread(new Runnable() {
                @Override
                public void run(a) {
                    synchronized (vector){
                        for (int i = 0; i < vector.size(); i++) { vector.remove(i); }}}}); Thread printThread =new Thread(new Runnable() {
                @Override
                public void run(a) {
                    synchronized (vector){
                        for (int i = 0; i < vector.size(); i++) { Log.d(TAG, vector.get(i)); }}}}); removeThread.start(); printThread.start();if (Thread.activeCount() >= 20) {
                return; }}Copy the code

2.1 volatile

Volatile is also an implementation of mutex synchronization, but it is very lightweight.

Volatile has two key semantics:

  • Ensure that variables that are volatile are visible to all threads
  • Command reordering is disabled

To understand the volatile keyword, we need to start with Java’s threading model. As shown in the figure:

Java memory model provides all of the fields (these fields including instance fields, static field, etc., not including local variables, such as method parameters, because these are private, thread does not exist competition) are present in the main memory, each thread has its own working memory, working memory preserved the thread that variables used in main memory copies of copies, Threads can only operate on variables in working memory, not directly read or write to main memory. Of course, different memory cannot directly access each other’s working memory, that is, main memory is the medium through which threads transfer values.

Let’s understand the first sentence:

Ensure that variables that are volatile are visible to all threads

How do you guarantee visibility? 🤔

Volatile variables are forced to be written back to main memory after working memory changes, and flushed from main memory when used by other threads, thus ensuring consistency.

There is a common misconception about ensuring that volatile variables are visible to all threads:

Because volatile variables are consistent across threads, operations based on volatile variables are safe in multithreaded concurrency.

The first part of the statement is true, but the second part is false, so it forgets to consider whether the operation of a variable is atomic.

Point_up for example


    private volatile int start = 0;

    private void volatileKeyword(a) {

        Runnable runnable = new Runnable() {
            @Override
            public void run(a) {
                for (int i = 0; i < 10; i++) { start++; }}};for (int i = 0; i < 10; i++) {
            Thread thread = new Thread(runnable);
            thread.start();
        }
        Log.d(TAG, "start = " + start);
    }

Copy the code

This code starts 10 threads, incrementing each time 10 times, which logically should result in 100, but it doesn’t.

Why is that? :thinking:

If you take a closer look at start++, it’s not really an atomic operation. In a nutshell, it has two steps:

  1. Fetch the value of start, which is correct because of the volatile modifier.
  2. Increment, but at the time of increment, another thread may have already increased start, in which case it is possible to write the smaller start back into main memory.

Therefore, volatile only guarantees visibility, and locks are still required to guarantee atomicity in cases where:

  • The result of the operation does not depend on the current value of the variable, or only a single thread changes the value of the variable. (Either the result is independent of the current value, the operation is atomic, or just one thread changes the value of the variable.)
  • Variables do not need to participate in invariant constraints with other state variables

For example, if we add a Boolean variable to a thread to determine whether it has stopped, volatile is a good place to use it.

Let’s understand the second sentence.

  • Command reordering is disabled

What is instruction reorder? 🤔

Instruction reordering is the out-of-order execution of value instructions, that is, if conditions permit, it directly runs the subsequent instructions that are capable of immediate execution, avoiding the wait caused by obtaining the data required by the next instruction, and providing execution efficiency through out-of-order execution technology.

Instruction reordering adds a memory barrier before rendering assignments to volatile variables. Instructions reordering cannot reorder subsequent instructions from the barrier.

Instruction reordering is not the focus of this article, see instruction reordering for more details.

2.2 synchronized

Synchronized is an implementation of mutually exclusive synchronization.

Synchronized: When a thread access to be synchronized method or block of code, the thread got a lock of the object and another thread temporarily unable to access this method, only waiting for the method or code block is performed, the thread will release the object lock, other threads to execute the method or code block.

The use of volatile and synchronized is used as an example.

Point_up for example

public class Singleton {

    // Volatile ensures (1) instance visibility in multithreaded concurrency and (2) disallows instruction reordering while instance is in operation
    private volatile static Singleton instance;

    public static Singleton getInstance(a) {
        // The first call is null to ensure unnecessary synchronization
        if (instance == null) {
            Synchronized globalizes Singleton to ensure that only one thread creates an instance at a time
            synchronized (Singleton.class) {
                // The second nulltime is used to create an instance in case of null
                if (instance == null) {
                    instance = newSingleton(); }}}returninstance; }}Copy the code

This is a classic DSL singleton.

Its bytecode is as follows:

As you can see, synchronized blocks of code are preceded by Monitorenter and Monitorexit, respectively. Both bytecodes need to specify objects to lock and unlock.

About locking and unlocking objects:

  • Synchronized code block: a synchronized code block whose scope is the entire code block and whose object is the object that calls the code block.
  • Synchronized method: a synchronized method whose scope is the entire method and whose object is the object that calls the method.
  • Synchronized: a synchronized static method whose scope is the entire static method and whose objects are all objects that call the class.
  • Synchronized (this) : Scope is all variables, methods, or code blocks in the object marked by synchronized, and object is the object itself.
  • Synchronized (classname. class) : scopes are static methods or variables and objects are class objects.

Synchronized (this) adds an object lock, synchronized(classname.class) adds a class lock, they differ as follows:

Object locks: All Java objects contain a mutex, which is automatically acquired and released by the JVM. When a thread enters the synchronized method, it acquires the lock of the object. Of course, if a thread has acquired the lock of the object, the current thread waits. The JVM automatically releases the object lock if the synchronized method returns normally or terminates with an exception. The benefits of synchronized are also reflected in the fact that the lock can still be automatically released by the JVM when the method throws an exception.

Class locks: Object locks are used to control synchronization between instance methods, and class locks are used to control synchronization between static methods (or static variable mutex). Class locking is a concept, not a real thing. It is used to help us understand the difference between locking instance methods and static methods. As we all know, a Java Class can have many objects, but only one Class object, which means that different instances of a Class share the Class object of that Class. A Class object is really just a Java object, just a little special. Since every Java object has a mutex, static methods of classes require Class objects. So a Class lock is just a lock on a Class object. There are several ways to get a Class’s Class object, the simplest of which is myclass.class. A Class lock and an object lock are not the same thing. One is the lock on the Class object of a Class, and one is the lock on an instance of a Class. That is, when one thread accesses static synchronized, another thread is allowed to access an instance of an object’s synchronized method. The reverse is also true, because they require different locks.

There is also ReentrantLock. Compared with synchronized, eentrantLockR has more functions such as wait interruptable and fair lock. This is limited by space and will not be expanded.

The three thread pool

We know that threads are expensive to create, switch, and destroy, so it’s natural to use thread pools to reuse and manage threads. Thread pooling in Java is usually implemented through ThreadPoolExecutor. Let’s take a look at how ThreadPoolExecutor works and its application AsyncTask on Android.

3.1 Thread pool scheduling

Thread pools have five running states, as shown below:

Thread pool state diagram

  • RUNNING: Can accept new tasks or process tasks in a waiting queue.
  • SHUTDOWN: does not accept new tasks, but can process tasks in the waiting queue.
  • STOP: does not accept new tasks and does not process tasks in the waiting queue. Interrupts the task in progress.
  • TIDYING: All tasks are finished, there are no valid threads in the current thread pool, and the terminated() method is about to be called.
  • TERMINATED: The thread pool TERMINATED when the TERMINATED () method is called.

In addition, ThreadPoolExecutor uses an AtomicInteger to record the state of the thread pool and the number of threads in the pool, as shown below:

  • Lower 29 bits: Used to store the number of threads
  • High 3 bits: Used to store thread pool state
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;/ / 111
private static final int SHUTDOWN   =  0 << COUNT_BITS;/ / 000
private static final int STOP       =  1 << COUNT_BITS;/ / 001
private static final int TIDYING    =  2 << COUNT_BITS;/ / 010
private static final int TERMINATED =  3 << COUNT_BITS;/ / 110

// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }// Thread pool status
private static int workerCountOf(int c)  { return c & CAPACITY; }// The current number of threads in the pool
private static int ctlOf(int rs, int wc) { return rs | wc; }
Copy the code

Before we introduce thread pool scheduling, let’s recall the two interfaces that Java uses to implement tasks:

  • Runnable: Completes the task in the run() method with no return value and no exception thrown.
  • Callable: Completes a task in the call() method, returns a value and may throw an exception.

In addition, there is a Future interface, which can determine whether the task performed by Runnable and Callable is complete, interrupt the task, and obtain the result of the task. We usually use its implementation class, FutureTask, which is a wrapper class for Future, Runnable, and Callable. It makes it easy to perform operations defined by the Future interface. Thread blocking within FutureTask is implemented based on LockSupport.

Let’s look at thread pools and tasks.

ThreadPoolExecutor scheduling flow chart

execute(Runnable command)

public class ThreadPoolExecutor extends AbstractExecutorService {
        public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            int c = ctl.get();
            //1. If the thread pool status is RUNNING and the size of the thread pool is smaller than the configured number of core threads, a new thread can be created in the thread pool to execute a new task.
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            //2. If the status of the thread pool is RUNNING and the size of the thread pool is larger than the configured number of core threads, try to insert the task into the blocking queue for waiting
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                // If the thread pool is successfully inserted, the status of the thread pool will be checked to see if it is RUNNING. If not, the current task will be removed and the policy will be rejected.
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                // if the number of threads in the thread pool is 0, that is, all threads in the thread pool are SHUTDOWN, then a null task is added
                // New tasks are no longer accepted because of the SHUTDOWN state.
                else if (workerCountOf(recheck) == 0)
                    addWorker(null.false);
            }
            //3. If the blocking queue cannot be inserted, create a new thread. If the creation fails, enter the denial policy.
            else if(! addWorker(command,false)) reject(command); }}Copy the code
  1. If the thread pool size is smaller than the configured number of core threads, a new thread can be created in the thread pool to perform a new task.
  2. If the thread pool status is RUNNING and the thread pool size is greater than the configured number of core threads, an attempt is made to insert the task into the blocking queue for waiting. If the insert is successful, for robustness, the thread pool is checked to see if the state is RUNNING. If not, the current task is removed and the reject policy is entered. If the number of threads in the thread pool is 0, that is, all threads in the thread pool are in the SHUTDOWN state, then a NULL task is added (because the SHUTDOWN state does not accept new tasks).
  3. If the blocking queue cannot be inserted, try to create a new thread, or enter the reject policy if the creation fails.

This is actually quite understandable, for example. A team from our company came to do the task,

  • If the number of tasks is smaller than the number of team members (the number of core threads), the task is assigned to people in the team.
  • If the number of tasks is greater than the number of team members, new people are recruited to complete the task, and the task is added to the waiting list (blocking queue).
  • If there is no schedule, try to recruit new people to complete the task (maximum number of threads), if you can’t recruit new people to complete the task, it is not human work, then go to the product manager to cut requirements (rejection strategy).

addWorker(Runnable firstTask, boolean core)

AddWorker (Runnable firstTask, Boolean core) addWorker(Runnable firstTask, Boolean core) addWorker(Runnable firstTask, Boolean core) addWorker(Runnable firstTask, Boolean core) RunWorker () is called to start the thread.

public class ThreadPoolExecutor extends AbstractExecutorService {
    
     private boolean addWorker(Runnable firstTask, boolean core) {
            // Retry the label
            retry:
            for (;;) {
                int c = ctl.get();
                // Get the current thread pool state
                int rs = runStateOf(c);
    
                No new tasks are accepted if: 1 the thread pool is not in the RUNNING state. 2 The task to be executed is empty. 3 the blocking queue is full
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null&&! workQueue.isEmpty()))return false;
    
                for (;;) {
                    // Get the current number of threads in the thread pool
                    int wc = workerCountOf(c);
                    // If the capacity is exceeded, no new tasks are accepted. Core indicates whether corePoolSize is used as the comparison standard
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    // Increase the number of threads
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    // If the thread pool state changes, the loop is restarted
                    if(runStateOf(c) ! = rs)continue retry;
                    // else CAS failed due to workerCount change; retry inner loop}}// The number of threads has increased successfully, and a new Thread has been added. Worker is the encapsulation class of Thread
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if(t ! =null) {
                    final ReentrantLock mainLock = this.mainLock;
                    / / lock
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());
    
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            // Add the newly started thread to the thread pool
                            workers.add(w);
                            // Update the number of threads in the thread pool. Note that the number cannot exceed largestPoolSize
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true; }}finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        // Call the runWorker() method to start the execution thread
                        t.start();
                        workerStarted = true; }}}finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            returnworkerStarted; }}Copy the code

runWorker(Worker w)

The runWorker() method is the core loop of the entire blocking queue, in which the thread pool is constantly fetching and executing new tasks from the blocking queue workerQueue.

public class ThreadPoolExecutor extends AbstractExecutorService {
    
    final void runWorker(Worker w) {
           Thread wt = Thread.currentThread();
           Runnable task = w.firstTask;
           w.firstTask = null;
           w.unlock(); // allow interrupts
           boolean completedAbruptly = true;
           try {
               // Keep fetching tasks from the blocking queue. If the tasks are empty, the loop terminates
               while(task ! =null|| (task = getTask()) ! =null) {
                   w.lock();
                   // If pool is stopping, ensure thread is interrupted;
                   // if not, ensure thread is not interrupted. This
                   // requires a recheck in second case to deal with
                   // shutdownNow race while clearing interrupt
                   if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {
                       // This method is empty, you can redo the method to do some processing before the task execution begins
                       beforeExecute(wt, task);
                       Throwable thrown = null;
                       try {
                           task.run();
                       } catch (RuntimeException x) {
                           thrown = x; throw x;
                       } catch (Error x) {
                           thrown = x; throw x;
                       } catch (Throwable x) {
                           thrown = x; throw new Error(x);
                       } finally {
                           // This method is empty, you can redo the method and do some processing after the task is finishedafterExecute(task, thrown); }}finally {
                       task = null;
                       w.completedTasks++;
                       w.unlock();
                   }
               }
               completedAbruptly = false;
           } finally{ processWorkerExit(w, completedAbruptly); }}// Remove the Task from the blocking queue workerQueue
        private Runnable getTask(a) {
               boolean timedOut = false; // Did the last poll() time out?
               / / loop
               for (;;) {
                   int c = ctl.get();
                   // Get the thread pool state
                   int rs = runStateOf(c);
       
                   // STOP the loop if: 1 The thread pool is not RUNNING (>= SHUTDOWN) 2 the thread pool is >= STOP or the blocking queue is empty
                   if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                       / / degressive workCount
                       decrementWorkerCount();
                       return null;
                   }
       
                   int wc = workerCountOf(c);
       
                   1 allowCoreThreadTimeOut = true this can be done manually
                   // Set the number of current threads to 2 greater than the number of core threads
                   boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
       
                   if ((wc > maximumPoolSize || (timed && timedOut))
                       && (wc > 1 || workQueue.isEmpty())) {
                       if (compareAndDecrementWorkerCount(c))
                           return null;
                       continue;
                   }
       
                   try {
                       Timed determines whether a task is fetched from a blocking queue in a poll timeout wait or a take() blocking wait
                       Runnable r = timed ?
                           workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                           workQueue.take();
                       if(r ! =null)
                           return r;
                       timedOut = true;
                   } catch (InterruptedException retry) {
                       timedOut = false; }}}}Copy the code

So you can understand that the runWorker() method is in the run() method of the newly created thread, and runWorker() constantly calls the getTask() method to get the tasks in the blocking queue, thus achieving thread reuse.

3.2 Thread pool configuration

Let’s first look at the constructor of ThreadPoolExecutor:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
Copy the code
  • Int corePoolSize: core thread pool size
  • Int maximumPoolSize: specifies the maximum capacity of the thread pool
  • Long keepAliveTime: The duration of a thread that is inactive
  • TimeUnit unit: TimeUnit
  • BlockingQueue workQueue: Task queue
  • ThreadFactory ThreadFactory: Thread project
  • RejectedExecutionHandler Handler: reject the policy of a thread

So how do we configure these parameters? To properly configure thread pools, we need to understand our task characteristics, generally speaking:

  • Task nature: CPU intensive, IO intensive, and hybrid
  • Task priority: Low, medium, and high
  • Task execution time: short, medium and long
  • Task dependence: Whether to rely on other resources, database, network

We analyze the configuration of these parameters one by one based on these properties.

The first is corePoolSize and maximumPoolSize. The configuration of this usually takes into account the CPU’s simultaneous thread threshold. Once this threshold is exceeded, the CPU takes a lot of time to complete thread switching and scheduling, which can lead to a significant decline in performance.

/** * The number of CPU cores, note that this method is not reliable, it may not return the true number of CPU cores, because the CPU will in some cases sleep on some cores, in which case it returns the number of active CPU cores. * /
private static final int NUMBER_OF_CPU = Runtime.getRuntime().availableProcessors();

/** * Number of core threads */
private static final int corePoolSize = Math.max(2, Math.min(NUMBER_OF_CPU - 1.4));

/** * Maximum number of threads */
private static final int maximumPoolSize = NUMBER_OF_CPU * 2 + 1;
Copy the code

In the case of keepAliveTime, this parameter describes how long a thread can live when it is inactive, set to a smaller time for CPU intensive tasks and a longer time for IO intensive or database connection tasks.

Let’s take a look at the configuration of the BlockingQueue parameter. BlockingQueue is used to describe a BlockingQueue. Its approach comes in four forms to meet different needs.

An exception is thrown Special values blocking timeout
add(e) offer(e) put(e) offer(e, time, unit)
remove() poll() take() poll(time, unit)
element() peek() Do not use Do not use

It has the following characteristics:

  • Null elements are not supported
  • Thread safety

Its implementation classes are:

  • ArrayBlockingQueue: An array implementation of a bounded blocking queue that sorts elements according to FIFO and supports fair access queues (ReenttrantLock implementation).
  • LinkedBlockingQueue: A bounded blocking queue of linked list structures with default and maximum length integer.max_value that sorts elements according to FIFO.
  • PriorityBlockingQueue: An unbounded blocking queue that supports priority sorting. By default, natural order is used, and a Comparator can be specified.
  • DelayQueue: an unbounded blocking queue that supports delayed fetching of elements. When creating an element, you can specify how long it will take to fetch the current element from the queue.
  • SynchronousQueue: A blocking queue that does not store elements. The save operation must wait for the fetch operation and vice versa, which acts as a passer and is well suited for transitive scenarios.
  • LinkedTransferQueue: An unbounded blocking queue made up of LinkedBlockingQueue with transfer and tryTranfer methods that immediately pass elements to consumers waiting to receive them.
  • LinkedBlockingDeque: A double-ended blocking queue of linked list structures that can insert and delete elements from both ends of the queue. Because there are two entrances and two exits, competition can be cut in half. This applies to job theft scenarios.

Job theft: For example, there are two queue of A and B, each do their own work, but A higher efficiency, and put his own job done quickly, and often will go to steal B task to do, this is A and B can access the same queue, in order to reduce the competition A, B, required to steal A only take the task from the end of the deque, stolen is A B from A deque head only.

Let’s finally look at the configuration of the RejectedExecutionHandler parameter.

RejectedExecutionHandler RejectedExecutionHandler RejectedExecutionHandler RejectedExecutionHandler RejectedExecutionHandler RejectedExecutionHandler RejectedExecutionHandler RejectedExecutionHandler

  • ThreadPoolExecutor. AbortPolicy: the default policy, when the number of threads in thread pool is greater than or equal to the maximum number of threads, throw RejectedExecutionException anomalies.
  • ThreadPoolExecutor. DiscardPolicy: when the number of threads in thread pool is greater than or equal to the maximum number of threads, silently discarded cannot perform a new task, don’t quote any exceptions.
  • ThreadPoolExecutor. CallerRunsPolicy: when the number of threads in thread pool is greater than or equal to the maximum number of threads, if the thread pool is not closed, directly in the caller’s thread to perform the task.
  • ThreadPoolExecutor. DiscardOldestPolicy: when the number of threads in thread pool is greater than or equal to the maximum number of threads, discarding the blocking queue head of mission (i.e., waiting for the recent task), and then to add the current task.

In addition, Executors provides a series of factory methods for creating thread pools. These threads are suitable for different scenarios.

  • NewCachedThreadPool () : unbounded automatically recycles the thread pool to check whether there are any previously created threads in the pool. If there are no existing threads in the pool, a new thread will be created and added to the pool. If the thread in the pool is inactive for 60 seconds, it will terminate automatically. This method is applicable to asynchronous tasks with a short life cycle.
  • NewFixedThreadPool (int nThreads) : A fixed-size thread pool is similar to newCachedThreadPool(), but the pool holds a fixed number of threads. Threads cannot be created at any time. If new threads are created, they are queued until a thread is removed from the pool. Suitable for very stable, very regular concurrent threads, mostly used in the server.
  • NewScheduledThreadPool (int corePoolSize) : periodic task thread pool. The threads in this thread pool can execute the threads in order of delay or periodically.
  • NewSingleThreadExecutor () : singleton thread pool, with only one thread in the pool at any time.

3.3 Thread pool monitoring

ThreadPoolExecutor provides empty methods that can be overridden by inheriting ThreadPoolExecutor.

public class ThreadPoolExecutor extends AbstractExecutorService {
       
    protected void beforeExecute(Thread t, Runnable r) {}protected void afterExecute(Runnable r, Throwable t) {}}Copy the code

Common monitoring indicators are as follows:

  • TaskCount: Indicates the number of tasks that need to be executed by the thread pool.
  • CompletedTaskCount: The number of tasks completed by the thread pool during the run, less than or equal to taskCount.
  • LargestPoolSize: maximum number of threads that have ever been created in the thread pool. This data tells you if the thread pool has ever been full. If the value is equal to the maximum size of the thread pool, it indicates that the thread pool has been full.
  • GetPoolSize: specifies the number of threads in the thread pool. Threads in the thread pool are not automatically destroyed if the thread pool is not destroyed, so the size only increases.
  • GetActiveCount: Gets the number of active threads.

Four-thread pool applications

4.1 AsyncTask

AsyncTask is implemented based on ThreadPoolExecutor and encapsulates Thread+Handler. It is mainly used to execute short tasks.

A simple AsyncTask example

public class AsyncTaskDemo extends AsyncTask<String.Integer.String> {

    /** * called before background tasks start to execute, used to perform some interface initialization operations, such as displaying a dialog box, UI thread. * /
    @Override
    protected void onPreExecute(a) {
        super.onPreExecute();
    }

    /** ** The worker thread **@param strings params
     * @return result
     */
    @Override
    protected String doInBackground(String... strings) {
        return null;
    }

    /** * Update progress, UI thread **@param values progress
     */
    @Override
    protected void onProgressUpdate(Integer... values) {
        super.onProgressUpdate(values);
    }


    /** * the UI thread calls this method after the background task completes and returns with a return statement. * *@param result result
     */
    @Override
    protected void onPostExecute(String result) {
        super.onPostExecute(result);
    }

    /** * callback ** after canceling the background task@param reason reason
     */
    @Override
    protected void onCancelled(String reason) {
        super.onCancelled(reason);
    }

    /** * callback */
    @Override
    protected void onCancelled(a) {
        super.onCancelled(); }}Copy the code

The use of AsyncTask is very simple, next we will analyze the source code of AsyncTask implementation.

The flow chart of AsyncTask

The AsyncTask source code begins as a process for creating a thread pool.

public abstract class AsyncTask<Params.Progress.Result> {
    
        private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
        // Number of core threads, minimum 2, maximum 4
        private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1.4));
        private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
        // The thread lives for 30 seconds when it is inactive
        private static final int KEEP_ALIVE_SECONDS = 30;
    
        // The thread build factory specifies the name of the thread
        private static final ThreadFactory sThreadFactory = new ThreadFactory() {
            private final AtomicInteger mCount = new AtomicInteger(1);
    
            public Thread newThread(Runnable r) {
                return new Thread(r, "AsyncTask #"+ mCount.getAndIncrement()); }};// An unbounded blocking queue consisting of a linked list structure
        private static final BlockingQueue<Runnable> sPoolWorkQueue =
                new LinkedBlockingQueue<Runnable>(128);
    
        public static final Executor THREAD_POOL_EXECUTOR;
    
        // Build a thread pool
        static {
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                    CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
                    sPoolWorkQueue, sThreadFactory);
            threadPoolExecutor.allowCoreThreadTimeOut(true); THREAD_POOL_EXECUTOR = threadPoolExecutor; }}Copy the code

In addition, we can use AsyncTask. ExecuteOnExecutor (Executor exec, Params… Params) from define thread pool.

Let’s look at the constructor.

public abstract class AsyncTask<Params.Progress.Result> {
    
      The constructor needs to be called in the UI thread
      public AsyncTask(a) {
          WorkerRunnable implements the Callable interface
          mWorker = new WorkerRunnable<Params, Result>() {
              public Result call(a) throws Exception {
                  mTaskInvoked.set(true);
  
                  Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
                  //noinspection unchecked
                  Result result = doInBackground(mParams);
                  Binder.flushPendingCommands();
                  returnpostResult(result); }};// Create a FutureTask object to receive mWorker's results
          mFuture = new FutureTask<Result>(mWorker) {
              @Override
              protected void done(a) {
                  try {
                      // Pass the result of the execution to Handler. Note that FutureTask's get() method blocks until the result is returned
                      postResultIfNotInvoked(get());
                  } catch (InterruptedException e) {
                      android.util.Log.w(LOG_TAG, e);
                  } catch (ExecutionException e) {
                      throw new RuntimeException("An error occurred while executing doInBackground()",
                              e.getCause());
                  } catch (CancellationException e) {
                      postResultIfNotInvoked(null); }}}; }private void postResultIfNotInvoked(Result result) {
          final boolean wasTaskInvoked = mTaskInvoked.get();
          if (!wasTaskInvoked) {
              postResult(result);
          }
      }
  
      private Result postResult(Result result) {
          @SuppressWarnings("unchecked")
          Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
                  new AsyncTaskResult<Result>(this, result));
          message.sendToTarget();
          return result;
      }
      
     // Internal Handler
     private static class InternalHandler extends Handler {
        public InternalHandler(a) {
            // The UI thread Looper
            super(Looper.getMainLooper());
        }

        @SuppressWarnings({"unchecked"."RawUseOfParameterizedType"})
        @Override
        public void handleMessage(Message msg) { AsyncTaskResult<? > result = (AsyncTaskResult<? >) msg.obj;switch (msg.what) {
                // Return the result
                case MESSAGE_POST_RESULT:
                    // There is only one result
                    result.mTask.finish(result.mData[0]);
                    break;
                // Return to progress
                case MESSAGE_POST_PROGRESS:
                    result.mTask.onProgressUpdate(result.mData);
                    break; }}}}Copy the code

As you can see, when we call the AsyncTask constructor, we create a FutureTask object that wraps the Callable object (the task we want to perform) inside and sends the result to the Handler in the Done () method of the FutureTask object.

Then look at the execution method execute().

public abstract class AsyncTask<Params.Progress.Result> {
    
        // need to be called in the UI thread
        @MainThread
        public final AsyncTask<Params, Progress, Result> execute(Params... params) {
            return executeOnExecutor(sDefaultExecutor, params);
        }

        @MainThread
        public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec, Params... params) {
            if(mStatus ! = Status.PENDING) {switch (mStatus) {
                    case RUNNING:
                        throw new IllegalStateException("Cannot execute task:"
                                + " the task is already running.");
                    case FINISHED:
                        throw new IllegalStateException("Cannot execute task:"
                                + " the task has already been executed "
                                + "(a task can be executed only once)");
                }
            }
    
            mStatus = Status.RUNNING;
            // We can duplicate the method before the task is executed
            onPreExecute();
    
            mWorker.mParams = params;
            // Execute the task, exec is sDefaultExecutor
            exec.execute(mFuture);
    
            return this; }}Copy the code

And then look at this sDefaultExecutor.

You can see that sDefaultExecutor is a SerialExecutor object, which implements the Executor interface.

public abstract class AsyncTask<Params.Progress.Result> {
    
        public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
        private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
        
        private static class SerialExecutor implements Executor {
            // Task queue
            final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
            // The task currently executed
            Runnable mActive;
    
            public synchronized void execute(final Runnable r) {
                
                mTasks.offer(new Runnable() {
                    public void run(a) {
                        try {
                            r.run();
                        } finally{ scheduleNext(); }}});if (mActive == null) {
                    // Start the taskscheduleNext(); }}protected synchronized void scheduleNext(a) {
                // The task to fetch the queue header is executed
                if((mActive = mTasks.poll()) ! =null) { THREAD_POOL_EXECUTOR.execute(mActive); }}}}Copy the code

So we add the FutureTask object to the tail of the queue without calling the asynctask.execute () method once, and then we pull the task from the head of the queue into the thread pool for execution, so you can see that this is a serial executor.

4.2 Okhttp

There is configuration for thread pools in Okhttp’s task scheduler Dispatcher

public final class Dispatcher {
    
      public synchronized ExecutorService executorService(a) {
        if (executorService == null) {
          executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
              new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher".false));
        }
        returnexecutorService; }}Copy the code

You can see its configuration:

  • The number of core threads is 0, and the maximum number of threads is integer. MAX_VALUE. There is no limit on the number of core threads. This is also more consistent with the nature of network requests.
  • The blocking queue is SynchronousQueue. This queue does not store tasks, but only delivers them, so adding tasks to it will execute.

This is actually Excutors. NewCachedThreadPool () the realization of the buffer pool. In summary, a new task enters the SynchronousQueue, which is a simplex queue that only delivers tasks, but does not store them, and then creates a new thread to execute the task for 60 seconds.

Okhttp request flow chart

When making a network request, client.dispatcher().finished(this) is called after each request execution is complete.

final class RealCall implements Call {
    
  final class AsyncCall extends NamedRunnable {
    private final Callback responseCallback;

    AsyncCall(Callback responseCallback) {
      super("OkHttp %s", redactedUrl());
      this.responseCallback = responseCallback;
    }

    String host(a) {
      return originalRequest.url().host();
    }

    Request request(a) {
      return originalRequest;
    }

    RealCall get(a) {
      return RealCall.this;
    }

    @Override protected void execute(a) {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this.new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response); }}catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e); }}finally {
        // Asynchronous request
        client.dispatcher().finished(this); }}}}Copy the code

Let’s look at the client.dispatcher().finished(this) method.

public final class Dispatcher {
    
  private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      // Remove the finished call from the running queue calls
      if(! calls.remove(call))throw new AssertionError("Call wasn't in-flight!");
      // The asynchronous request promoteCalls is true
      if (promoteCalls) promoteCalls();
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }

    if (runningCallsCount == 0&& idleCallback ! =null) { idleCallback.run(); }}private void promoteCalls(a) {
      // The current number of asynchronous requests is greater than the maximum number of requests
      if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
      // The asynchronous wait queue is empty and the execution will not continue
      if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
  
      // Iterate over the asynchronous wait queue
      for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
        AsyncCall call = i.next();
  
        // If the maximum number of requests for the same host is not exceeded, the current thread is reused
        if (runningCallsForHost(call) < maxRequestsPerHost) {
          i.remove();
          runningAsyncCalls.add(call);
          executorService().execute(call);
        }
  
        // The run queue reaches its limit and is no longer executed
        if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.}}}Copy the code

So you can see that Okhttp doesn’t control the number of threads with a thread pool, the threads in the thread pool do all the running requests, the Dispatcher controls the threads, The dispatcher.promotecalls () method limits asynchronous requests to two maximum values, maxRequests and maxRequestsPerHost. Continuously add requests from readyAsyncCalls to the runningAsyncCalls queue within the value range.