Scan the qr code below or search the wechat official account, cainiao Feiyafei, you can follow the wechat official account, read more Spring source code analysis and Java concurrent programming articles.

preface

At work and in books, we hear and read some advice on how to use threads: Don’t create threads yourself directly in code, but use threads in a thread pool. The reasons for using thread pools can be roughly summarized as follows.

  • 1.Reduce resource consumption. Thread is a very valuable resource of the operating system. When many people develop a project at the same time, they create threads in the code without knowing it. This will lead to too many threads, and the creation and destruction of threads, at the level of the operating system, need to be bySwitch from user mode to kernel modeThis is alaboriousIn the process. Using a thread pool can avoid frequent thread creation and destruction, and threads in a thread pool can be reused.
  • 2.Improve response speed. When the request arrives, the thread creation time can be saved by using the thread pool because the thread in the pool has already been created.
  • 3.Improve thread manageability. Threads are a scarce resource. When too many threads are created, system performance deteriorates. Using thread pools, threads can be uniformly allocated, tuned, and monitored. Thread pools are simple to use, but just because they work doesn’t mean they work well. In an interview, you will not be asked how thread pools should be used, but rather what problems thread pools can cause if they are not used properly. Therefore, it is very necessary to understand the implementation of thread pool, not only will be helpful for interviews, but also let us avoid a lot of problems in the daily work.

Realize the principle of

There are several concepts in a thread pool: core thread count, maximum thread count, and task queue. The core thread count refers to the base size of the thread pool; The maximum number of threads means that the number of threads in the thread pool cannot exceed this value at any one time. Task queue is used to store the submitted tasks when there are many tasks and the number of threads in the thread pool has reached the number of core threads. Unlike other pooling techniques, thread pooling is implemented based on the producer-consumer pattern, with the submitter of the task being the producer and the thread pool being the consumer. When we need to execute a task, we just throw the task into the thread pool. The process of executing tasks in a thread pool is shown below.

  • 1.First, check whether the number of threads in the thread pool exceeds the number of core threads. If not, create a new thread to execute the task. If the number of core threads is exceeded, the following process is entered.
  • 2.Determine whether the task queue is full, if not, add the task to the task queue; If it is, proceed to the following process.
  • 3.Then determine whether the number of threads will exceed the maximum number of threads after creating a thread. If not, create a new thread to execute the task. If so, enter the following process.
  • 4.Execute a rejection policy.

Until I looked at the implementation of the thread pool, I was wondering why the task queue was full first and then the maximum number of threads was exceeded. Wouldn’t it be logical to create as many threads as possible first and let them handle tasks? Do you add tasks to the queue when there are too many tasks for the thread to handle? I didn’t know the answer until I looked at the thread pool implementation. The answer is at the end of the article.

ThreadPoolExecutor

Under the JUC package, a concrete implementation of thread pools is provided: ThreadPoolExecutor. ThreadPoolExecutor provides a number of constructors, the most complex of which takes seven parameters.

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {}Copy the code
  • CorePoolSize: This parameter represents the number of core threads in the thread pool. When a task is submitted to the thread pool, if the number of threads in the thread pool has not reached corePoolSize, a new thread is created to execute the task, and if so, the task is added to the task queue.

  • MaximumPoolSize: This parameter indicates the maximum number of threads allowed in the thread pool. When the task queue is full and a new task is added to the thread pool, it determines whether a new thread will exceed maximumPoolSize, and if so, it does not create the thread, but executes a denial policy. If maximumPoolSize is not exceeded, a new thread is created to execute the task.

  • KeepAliveTime: If the number of threads in the thread pool is greater than corePoolSize, then the threads that are larger than corePoolSize are idle if they have no work to handle. They are not allowed to remain idle for a maximum period of time. This time is called keepAliveTime, and the unit of time is TimeUnit.

  • Unit: Unit of the allowed lifetime of an idle thread. TimeUnit is an enumerated value that can be nanosecond, subtle, millisecond, second, minute, hour, or day.

  • workQueue: Task queue, used to store tasks. The type of this queue is blocking queueArrayBlockingQueue, LinkedBlockingQueue, SynchronousQueue, PriorityBlockingQueueAnd so on.ArrayBlockingQueueIs an array-based blocking queue in which elements are queued in a first-in, first-out (FIFO) order. Because the underlying implementation is an array, the size of the array must be specified when it is initialized, so ArrayBlockingQueue is a bounded queue.LinkedBlockingQueueIs a blocking queue based on a linked list implementation, in which elements are queued in a first-in, first-out (FIFO) order. Because the top layer is a linked list, a linked list maintains a contextual relationship based on Pointers between nodes. If it does not refer to the size of the list, its default size isInteger.MAX_VALUE, i.e.,This number is so large that LinkedBlockingQueue is usually called an unbounded queue. Of course, if the list size is specified at initialization, then it is a bounded queue.SynchronousQueueIs a blocking queue that stores no elements. Each insert operation must wait until another thread calls the remove operation before it returns, otherwise it will block. Throughput is usually higher than LinkedBlockingQueue.PriorityBlockingQueueIs a blocking queue that sorts elements in order of priority. The higher the priority, the higher the queue. This is an unbounded queue.

  • ThreadFactory: A thread pool factory used to create threads. In practical projects, threads need to be given a certain name when they are created to facilitate troubleshooting. You can easily set a name with business meaning for each created thread through the thread pool factory.

  • Handler: rejects the policy. When the task queue is full and the number of threads reaches maximumPoolSize, the thread pool will not accept new tasks, and a reject policy will be used to determine how the task will be processed. By default, AbortPolicy is used to indicate that the new task cannot be processed and an exception is thrown. There are four inner classes defined in the ThreadPoolExecutor class that represent four rejection policies. Custom rejection policies can also be implemented by implementing the RejectExecutionHandler interface.

AbortPocily: No longer receives new tasks and throws exceptions directly. CallerRunsPolicy: The thread submitting the task handles it itself. DiscardPolicy: Discards the card without processing it. DiscardOldestPolicy: Discards the top task in the task queue and executes the current task. (The task at the top of the queue is not necessarily the one that has been in the queue for the longest time, because it can be a queue that is sorted by priority.)

When using ThreadPoolExecutor, you can submit a task to the thread pool using the execute(Runnable Task) method, with no return value. You can also use the Submit () method to add a task to the thread pool with a return value, which is the Future object. The submit() method has three overloaded methods.

  • submit(Runnable task), although the return value object is Future, but useFuture.get()The result is null.
  • submit(Runnable task,T result), the return value object of the method is FutureFuture.get()When we get the concrete return value, the result is equal to the second argument to the method, result.
  • submit(Callable task), the argument to the method is oneCallable typeThe method has a return value.
  • If you are interested in the principle of Future, you can learn about it by yourself. The following article will introduce it specifically.

Source code analysis

Now that you know the basic usage of ThreadPoolExecutor, let’s look at the code implementation of the thread pool with the source code. The thread pool principle above, we can find that the thread pool principle is relatively simple, the code should not be difficult to implement, look at the source code is mainly to learn the excellent code written by others, especially programming master Doug Lea code. For a thread pool, in addition to the important attributes described above, we also need a variable to represent the thread pool state. The thread pool also needs to have running, closed, closed, and so on. If we were to implement a thread pool, our first instinct would be to have a separate variable representing the state of the pool and another variable representing the number of threads in the pool. That’s fine, but Doug Lea didn’t do it that way. He expressed the two as a variable. So the question is, how do you represent two values in one variable? Those of you who have read the ReadWriteLock source code may be able to immediately think of another solution. (For an introduction to ReadWriteLock, see the implementation principle of ReadWriteLock.) In the read-write lock implementation, an int value is split by the high and low bits, with the high representing one number and the low representing another number. Doug Lea used the technique of bit splitting again in the thread pool implementation. In the thread pool, a variable of the AtomicInteger class is used to represent the state of the thread pool and the number of threads. This variable takes up 4 bytes in memory, or 32 bits, with the high 3 bits representing the state of the thread pool and the low 29 bits representing the number of threads. There are a total of 5 states in the thread pool, and a maximum of 8 states can be represented with 3 bits. Therefore, using the higher 3 bits to represent the state of the thread pool can fully meet the requirements. The schematic diagram is as follows.

Before looking at the core implementation logic of thread pools, let’s take a quick look at the definitions of variables and methods in ThreadPoolExecutor. Because they are often used in core logic, and these methods and variables make a lot of use of bitwise operations, it is not particularly intuitive, so it is helpful to be familiar with their functions in advance to see the core logic. The related source code and comments are as follows.

// The top two bits represent the state of the thread pool, and the other 29 bits represent the number of threads, i.e. the number of workers
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Count the number of digits, 29
private static final int COUNT_BITS = Integer.SIZE - 3;
// The maximum size of the thread pool, 2^ 29-1,
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
// The state of the thread pool. The three bits represent the running state of the thread.
// RUNNING: 111
private static final int RUNNING    = -1 << COUNT_BITS;
// SHUTDOWN: 000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// STOP: 001
private static final int STOP       =  1 << COUNT_BITS;
Make the most of: 010
private static final int TIDYING    =  2 << COUNT_BITS;
// TERMINATED: 011
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
// Calculate the state of the thread pool. The lower 29 bits are all 0, so the final result is the state of the thread pool
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// The number of worker threads. The top three bits of the result are all zeros, so the final result is the number of worker threads
private static int workerCountOf(int c)  { return c & CAPACITY; }
// Calculate CTLS based on the state of the thread pool and the number of worker threads, essentially merging the two into CTLS
private static int ctlOf(int rs, int wc) { return rs | wc; }

/* * Bit field accessors that don't require unpacking ctl. * These depend on the bit layout and on workerCount being never negative. */

// The state of the thread pool is less than s
private static boolean runStateLessThan(int c, int s) {
    return c < s;
}

The state of the thread pool is greater than or equal to s
private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}

// Check whether the thread pool is running
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}
Copy the code

With that in mind, let’s look at the core implementation. When using thread pools we can submit a task to a thread pool using execute(Runnable Task), so we start with the core entry execute(Runnable Task) method. The source code for the execute() method is as follows. I added some comments in the source code for reference.

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    
    int c = ctl.get();
    // workerCountOf(c) counts the number of worker threads,
    // 1. Check whether the number of working threads is smaller than the number of core threads. If so, create a new thread to execute the task
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        // If the task fails, obtain the CTL value again for the following logic calculation
        c = ctl.get();
    }
    // 2. When the number of worker threads is greater than or equal to the number of core threads, the thread pool is running and the task is successfully added to the queue
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // If the thread pool state is not RUNNING, the denial policy is executed
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // If the number of threads in the pool is 0, the addWorker() method is called to create a new worker thread
        else if (workerCountOf(recheck) == 0)
            addWorker(null.false);
    }
    // 3. Create a new worker thread if the thread pool is not running or the task fails to be queued
    else if(! addWorker(command,false))
        // 4. If creating a new worker thread fails, execute the reject policy.
        reject(command);
}
Copy the code

The logic of the execute() method is roughly divided into four parts, corresponding to the four steps described in the thread pool principles section.

  • Through the firstworkerCountOf(c)Method counts the number of threads in the current thread pool, compares it to the number of core threads specified when the pool was initialized, and calls if it is smaller than the number of core threadsaddWorker()Method, which essentially creates a new thread to execute the task. The addWorker() method is analyzed behind the source code.
  • If the number of current threads is greater than or equal to the number of core threads, passworkQueue.off(command)Method to add the task to the task queue. If the task queue is not full at this point, it will be added successfully, workqueue.off (command) will return true, and it will be entered into the if logical block for some additional judgment. If the task queue is full at this point, the workqueue.off (command) method returns false, and the next 3 and 4 are executed.
  • When the task queue is full, it is called againaddWorker()The addworker() method checks whether the maximum number of threads will be exceeded before creating a new one, and if so, addworker() returns false. If not, a new thread is created to execute the task.
  • If addWorker() returns true, meaning that no new threads can be created, then it goes to 4, calling the reject() method to enforce the reject policy. The reject() method has simple logic, calling what we specifyhandlertherejectedExecution()Methods. The source code is as follows.
final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}
Copy the code

From the above analysis, you can see that the addWorker() method is called in several places of logic, which shows how important this method is. As it turns out, this method is not only important, but the code implementation is complex. The first argument is the Runnable task we pass in, and the second argument is a Boolean value. Passing true means that the number of threads in the current pool has not reached the number of core threads. Passing false means that the number of threads in the current pool is greater than or equal to the number of core threads. The source code for the addWorker() method is quite long, and I’ve split it into two sections.

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        // 1. If the thread pool state is greater than SHUTDOWN, return false. Because the thread pool is closed, it can no longer accept tasks
        // 2. If firstTask is not empty or the task queue is empty, return false
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null&&! workQueue.isEmpty()))return false;

        for (;;) {
            int wc = workerCountOf(c);
            If the number of threads is greater than or equal to the theoretical maximum (2^29-1), return false. (Because the number of threads cannot be increased)
            // 2. The number of threads should be compared according to core. Return false when the number of threads is greater than the number of core threads or the maximum number of threads.
            // (if the number of threads is larger than the number of core threads, the task should be added directly to the queue. When the number of threads is greater than the maximum number, you must not create new threads, otherwise setting the maximum number of threads is useless.
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // Number of threads +1, if set successfully, out of the outer loop
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // Get the state of the thread pool again and compare it to the previous value,
            // If it is the same, the state of the thread pool has not changed, and the inner loop continues
            // If not, the state of the thread pool has changed during this period and needs to jump to the outer loop and then loop again
            c = ctl.get();  // Re-read ctl
            if(runStateOf(c) ! = rs)continue retry;
            // else CAS failed due to workerCount change; retry inner loop}}// Omit the second half of the code
    / /...
}
Copy the code

In this code, we see an unfamiliar syntax: retry… break retry… The continue to retry. This writing method is really too rare, rare to my first sight, thought I accidentally touched the keyboard, the source to change. This syntax is similar to the (abandoned) GOto in C, which defines a retry before a for loop. Break retry causes code to jump out of the loop and not enter the loop again. When used with continue Retry, it jumps out of the current cycle and immediately into the next cycle. Because a two-tier for loop is used, the retry syntax is used to make it easier to jump out of the inner loop. The retry is not a Java keyword. It is a random string. You can write it as a, B, or C, but the break and continue strings must be the same. As you can see, the core logic of the first half is to use two infinite for and a CAS operation to set the number of threads in the thread pool. If the number of threads in the thread pool is modified successfully, it interrupts the loop and enters the logic of the second half of the code. If the modification fails, it uses the for loop to modify again. This advantage is that it not only realizes thread safety, but also avoids the use of locks and improves efficiency. In this part of the code, we made a lot of judgments, these judgments are mainly to verify the state of the thread pool and the number of threads, I think it is not particularly important, we mainly focus on the core logic. When the thread count is successfully modified, the second half of the addWorker() method is executed, with the source code below.

private boolean addWorker(Runnable firstTask, boolean core) {
    // omit the first half of the code
    / /...
    
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // Create a new worker thread
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if(t ! =null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                // Get the state of the thread pool again, because the state of the thread pool may have changed during the lock acquisition
                int rs = runStateOf(ctl.get());

                // If the thread pool state is running or closed but firstTask is empty, add worker threads to the thread pool
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // Determine if the worker thread has started and throw an exception if it has
                    // I don't think this step makes any sense because the worker thread is newly created and is not started anywhere
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true; }}finally {
                mainLock.unlock();
            }
            // Start the thread
            if (workerAdded) {
                t.start();
                workerStarted = true; }}}finally {

        // If startup fails, the worker thread is removed from the thread pool and the number of threads is reduced by 1
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
Copy the code

In this part of the code, a Worker object is created through new Worker(firstTask). The Worker object inherits AQS and implements the Runnable interface. It is the real Worker in the thread pool. The tasks we submit to the thread are eventually encapsulated into Worker objects, which then complete the tasks. Take a quick look at the Woker constructor, its source code is as follows. In the constructor, we first set the synchronization variable state to -1, and then create a thread through ThreadFactory. Note that when creating a thread through ThreadFactory, we pass in the Worker itself, namely this, which means that the last created thread object, The target attribute in it points to the Worker object.

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable{

    /** Thread this worker is running in. Null if factory fails. */
    final Thread thread;
    /** Initial task to run. Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        // Pass this in to make the thread's target attribute equal to the current worker object
        this.thread = getThreadFactory().newThread(this); }}Copy the code

When the Worker object is created through new Worker(firstTask), the thread is already created. Before starting the thread, check whether the thread is started through T.isalive (). If not, the start() method of the thread will be called to start the thread. It should be noted that mainlock. lock() is called to ensure thread safety after the worker object is created. Because of the possibility of concurrency in this step, workers. Add (w) needs to acquire the lock to ensure thread safety. When a thread’s start() method is called, the thread’s run() method is executed if the thread has CPU execution rights, and in the thread’s run() method, the run() method of the target attribute in the thread is executed. The target attribute of the thread here is the worker object we created, so the run() method of the worker is eventually executed.

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable{

    public void run(a) {
        runWorker(this); }}Copy the code

In run() of the Worker class, the runWorker() method is called directly. So the core logic for Worker tasks is implemented in the runWorker() method. The source code is as follows.

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // If the worker thread's firstTask is not empty or can get the task from the task queue, it executes
        // Otherwise it blocks all the way to the getTask() method
        while(task ! =null|| (task = getTask()) ! =null) {
            // Ensure the worker's serial thread of execution
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted. This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {
                // beforeExecute() is an empty method implemented by subclasses
                // The purpose of this method is to do something else before the task executes
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // Execute the task
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    AfterExecute () is an empty method implemented by subclasses
                    // The purpose of this method is to do something else after the task is executedafterExecute(task, thrown); }}finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally{ processWorkerExit(w, completedAbruptly); }}Copy the code

The source code for runWorker() also looks long, but the core logic is just one line, task.run(), which ultimately executes the Runnable task we submitted. The runWorker() method uses a while loop to keep the Worker object performing the task. The worker keeps executing when the incoming task is not empty or when the task can be retrieved from the task queue using the getTask() method. Otherwise, processWorkerExit will be called in the finally statement block to exit, allowing the thread to be interrupted and eventually destroyed. The getTask() method is a blocking method that returns a task as soon as it can be retrieved from the task queue. If you can’t get the task, it will block. It supports timeouts, and when the maximum thread lifetime specified during thread pool initialization is exceeded, null is returned, causing the worker thread to exit the while loop, and the thread is finally destroyed. That concludes the analysis of the thread pool’s execute() method. Finally, the shutdown() and shutdownNow() methods of the thread pool are briefly analyzed. When the shutdown() method is called, the state of the thread pool is set to shutdown, and idle threads are interrupted, not threads that are already performing tasks. When the shutdownNow() method is called, the state of the thread pool is set to STOP, and all threads, including those executing tasks, are then interrupted.

  • The source code for the shutdown() method is shown below.
public void shutdown(a) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // Check permissions
        checkShutdownAccess();
        // Set the thread pool state to SHUTDOWN
        advanceRunState(SHUTDOWN);
        // Interrupt idle worker threads
        interruptIdleWorkers();
        / / short approach, by specific to a subclass implementation, such as ScheduledThreadPoolExecutor
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    // Try to set the thread pool state to TERMINATED
    tryTerminate();
}
Copy the code
  • The source code for the shutdownNow() method is shown below.
public List<Runnable> shutdownNow(a) {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // Set the thread pool state to STOP
        advanceRunState(STOP);
        // Interrupts all threads, including those that are executing tasks
        interruptWorkers();
        // Clear all tasks in the task queue
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}
Copy the code

 

  • In practice, which method should be used to interrupt the thread pool should be decided based on specific tasks, ifTo require the task to complete, use the shutdown() method. The shutdown() method is also often recommended to be more elegant.

conclusion

  • This paper mainly introduces the realization principle of thread pool, its principle is mainly divided into four core steps, first judge whether the number of threads exceeds the number of core threads, and then judge whether the task queue is full, and then judge whether the number of threads will exceed the maximum number of threads, and finally execute the rejection strategy. The article then details several core parameters of the thread poolcorePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handlerAnd their respective significance, and then combined with source code implementation, detailed analysis of the task of the implementation process.
  • In both the read-write lock implementation and the thread pool implementation, Doug Lea used the technique of splitting an int variable into high and low bits. This idea is worth learning, not only because of its clever design, but also because bit operations are more efficient in computers.
  • Finally, I would like to explain the confusion mentioned at the beginning of the article: why do we judge whether the task queue is full first, and then whether the number of threads exceeds the maximum number of threads? Instead of determining the maximum number of threads first and then whether the task queue is full?
  • The answer depends on the specific source code implementation. This is called whenever a thread needs to be createdaddWorker()Method, in the logic of the second half of addWorker(), is calledmainLock.lock()Method to obtain a global lock, which causes a scramble for resources. If the maximum number of threads first, and then determine whether a task queue is full, it will cause the thread pool of the principle of four steps, step 1 determine the core number of threads to obtain a global lock, step 2 determine the maximum number of threads, and to obtain a global lock, so compared to determine whether a task queue is full, first to determine the maximum number of threads, it might beAn additional process to obtain the global lock. So in designing thread pools, in order toAvoid resource contention for global locks whenever possible, so it determines whether the task queue is full first and then the maximum number of threads.
  • Another question is:LinkedBlockingQueueThroughput ratio ofArrayBlockingQueueThe throughput is high. The former is implemented based on linked lists, while the latter is implemented based on arrays. Shouldn’t arrays normally perform better than linked lists?
  • I then looked at the source code for the two blocking queues and found that this was due to the LinkedBlockingQueueRead and write operations use two locks, takeLock and putLock, read and write operationsThere will be no competition for resources. The ArrayBlockingQueueRead and write use the same lock, read and write operationsThere is competition for locks. Therefore LinkedBlockingQueue has a higher throughput than ArrayBlockingQueue.

recommended

  • Pipe program: the cornerstone of concurrent programming
  • Learn the implementation principle of CAS
  • Unsafe class source code interpretation and usage scenarios
  • Design principle of queue synchronizer (AQS)
  • Queue synchronizer (AQS) source code analysis
  • ReentrantLock source code analysis
  • Fair locks versus unfair locks
  • Condition source code analysis
  • The implementation principle of ReadWriteLock
  • Semaphore source code analysis and use scenarios
  • Concurrency tool class CountDownLatch source analysis and usage scenarios
  • CyclicBarrier concurrency tool class source analysis and usage scenarios
  • Do wait() and notify() always come in pairs? Thread.join()