preface

The asynchronous execution of multithreading, although it can maximize the computing power of multi-core computers, but if not controlled, it will cause a burden on the system. Threads themselves take up Memory space, and a large number of threads can take up Memory resources and possibly cause Out of Memory. Even when this is not the case, a large number of thread collections can put a lot of pressure on the GC.

To avoid creating threads repeatedly, thread pools are available to allow threads to be reused. In layman’s terms, when work comes in, a thread is taken from the thread pool, and when the work is done, instead of closing the thread, it is returned to the thread pool for use by other tasks.

Let’s explore thread pools in general and in detail.

1. Overall architecture

Look at the Executor framework:

Interface: Executor, CompletionService ExecutorService, ScheduledExecutorService

Abstract class: AbstractExecutorService

Implementation class: ExecutorCompletionService ThreadPoolExecutor, ScheduledThreadPoolExecutor

You can see the main methods in the figure, and this article focuses on ThreadPoolExecutor

2. Study ThreadPoolExecutor

Take a look at the constructor for this class:

    public ThreadPoolExecutor(int paramInt1, int paramInt2, long paramLong, TimeUnit paramTimeUnit,
            BlockingQueue<Runnable> paramBlockingQueue, ThreadFactory paramThreadFactory,
            RejectedExecutionHandler paramRejectedExecutionHandler) {
        this.ctl = new AtomicInteger(ctlOf(-536870912.0));
        this.mainLock = new ReentrantLock();
        this.workers = new HashSet();
        this.termination = this.mainLock.newCondition();
        if ((paramInt1 < 0) || (paramInt2 <= 0) || (paramInt2 < paramInt1) || (paramLong < 0L))
            throw new IllegalArgumentException();
        if ((paramBlockingQueue == null) || (paramThreadFactory == null) || (paramRejectedExecutionHandler == null))
            throw new NullPointerException();
        this.corePoolSize = paramInt1;
        this.maximumPoolSize = paramInt2;
        this.workQueue = paramBlockingQueue;
        this.keepAliveTime = paramTimeUnit.toNanos(paramLong);
        this.threadFactory = paramThreadFactory;
        this.handler = paramRejectedExecutionHandler;
    }
Copy the code

CorePoolSize: the corePoolSize of the thread pool. After the thread pool is created, the thread pool has no threads by default.

The create thread is created to execute tasks when they come in. In other words, after the thread pool is created, the number of threads in the thread pool is zero, and a thread is created to execute the incoming task until the number of threads reaches corePoolSize, and the incoming task is queued. (Note the arrival task). To put it more succintly: corePoolSize represents the maximum number of threads allowed to run simultaneously in the thread pool.

If the thread pool’s prestartAllCoreThreads() method is executed, the thread pool creates and starts all core threads ahead of time.

MaximumPoolSize: The maximum number of threads allowed by the thread pool, which indicates the maximum number of threads that can be created. MaximumPoolSize must be greater than or equal to corePoolSize.

KeepAliveTime: Indicates how long a thread can hold without a task and then stop. By default, keepAliveTime only works if the number of threads in the thread pool is greater than corePoolSize. In other words, shutdown occurs when the number of threads in the thread pool is greater than corePoolSize and a thread’s idle time reaches keepAliveTime.

Unit: keepAliveTime Unit.

WorkQueue: A blocking queue that stores tasks that are waiting to be executed. When the number of threads in the thread pool exceeds its corePoolSize, the thread enters the blocking queue and waits to block. With workQueue, thread pools implement blocking

ThreadFactory: threadFactory, used to create threads.

Handler: Indicates the policy for refusing to process a task.

2.1 Task cache Queue

We mentioned several times earlier that the task cache queue, or workQueue, is used to store tasks waiting to be executed.

The workQueue type is BlockingQueue

, which can be of one of the following types:

1) ArrayBlockingQueue: array-based first-in, first-out queue whose size must be specified when it is created;

2) Unbounded task queue LinkedBlockingQueue: a first in, first out queue based on a linked list. If the queue size is not specified when it is created, it defaults to integer.max_value.

SynchronousQueue: This is a special queue that does not hold the submitted task but instead creates a new thread to execute the new task.

2.2 Rejection Policy

AbortPolicy: discard task and throw RejectedExecutionException

CallerRunsPolicy: This policy runs the currently discarded task directly in the caller thread as long as the thread pool is not closed. Obviously, this will not actually drop the task, but it is highly likely that the performance of the task submission thread will drop dramatically.

DiscardOldestPolicy: Discards the oldest request in the queue, i.e. a task to be executed, and attempts to resubmit the current task.

DiscardPolicy: Discards the task without doing any processing.

2.3 Task processing strategy of thread Pool:

If the number of threads in the current thread pool is less than corePoolSize, a thread is created to execute each task;

If the number of threads in the current thread pool >=corePoolSize, each task will be added to the task cache queue. If it is added successfully, the task will wait for the idle thread to take it out and execute it. If the add fails (typically because the task cache queue is full), a new thread will be attempted to execute the task. If the number of threads in the current thread pool reaches maximumPoolSize, a task rejection policy is applied.

If the number of threads in the thread pool is greater than corePoolSize, and if a thread is idle for longer than keepAliveTime, the thread is terminated until the number of threads in the thread pool is less than corePoolSize. If a thread in the core pool is allowed to live for longer than keepAliveTime, the thread in the core pool is terminated.

2.4 Closing the thread pool

ThreadPoolExecutor provides two methods for shutting down the thread pool, shutdown() and shutdownNow(), where:

Shutdown () : The thread pool is not terminated immediately, but is not terminated until all tasks in the task cache queue have completed, but no new tasks are accepted

ShutdownNow () : Immediately terminates the thread pool and attempts to interrupt tasks in progress, and empties the task cache queue to return tasks that have not yet been executed

2.5 Source Code Analysis

Execute: AbstractExecutorService (AbstractExecutorService); ThreadPoolExecutor (AbstractExecutorService);

Submit (),invokeAll(), and invokeAny() are all execute methods invoked in ExecutorService, so execute is the core of the core, around which source code analysis will be conducted step by step.

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState  and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, By RETURNING false. * If the number of running threads is less than corePoolSize, the addWorker method is called to create a new thread and execute the task as the first task of the new thread.             Of course, atomic checks are done before the thread is created, and if conditions do not permit, the thread is not created to execute the task and returns false.   * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back The enqueuing if * stopped, or start a new thread if there are none. So we need to do a double check to make sure that we have added a thread (because there are threads that have died since the last check) or             When we enter the method, the thread pool is already closed. So, we will recheck the status, roll back to the queue if the thread pool is closed, and create a new thread if there are no threads in the thread pool. * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task.             If the task cannot be queued (the queue is full), then we try to start a new thread (from CorePoolSize to maximum). If this fails, we can determine the cause, either             The thread pool is closed or saturated (at maximum), so we enforce a denial policy. * /&emsp; &emsp; &emsp; &emsp; &emsp; &emsp; &emsp; &emsp;// 1. If the current number of threads is smaller than corePoolSize, the thread is created and started.
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true)) &emsp; &emsp; &emsp; &emsp; &emsp; &emsp; &emsp; &emsp;// If yes, return
return; c = ctl.get(); } &emsp; &emsp; &emsp; &emsp;// 2. If step 1 fails, try to enter the blocking queue.
        if(isRunning(c) && workQueue.offer(command)) { &emsp; &emsp; &emsp; &emsp; &emsp; &emsp; &emsp;If the thread pool status is RUNNING and remove is successful, the task will be rejected
            int recheck = ctl.get();
            if(! isRunning(recheck) && remove(command)) reject(command); &emsp; &emsp; &emsp; &emsp; &emsp; &emsp; &emsp;// if the current number of workers is 0, addWorker(null, false) to create a thread whose task is null
            else if (workerCountOf(recheck) == 0)
                addWorker(null.false); } &emsp; &emsp; &emsp; &emsp;// 3. If steps 1 and 2 fail, try to expand the number of thread pools from corePoolSize to maxPoolSize. If steps 1 and 2 fail, reject the task
        else if(! addWorker(command,false)) reject(command); }[![copy code](HTTPS://common.cnblogs.com/images/copycode.gif)](javascript:void(0);
Copy the code

I believe that the code is also a face of confusion, next with a flow chart to tell what he did:

Use the above flow chart to parse line by line, starting with null pointer check,

The wonrkerCountOf() method gets the total number of threads in the current thread pool and compares the current thread count to the core pool size.

  • If less than, it passesaddWorker()Method schedules execution.
  • If it is larger than the core pool size, it is committed to the wait queue.
  • If entering the wait queue fails, the task is submitted directly to the thread pool.
  • If the maximum number of threads is reached, the commit fails and a rejection policy is executed.

The way to add tasks to the excute() method is to use the addWorker() method. Take a look at the source code.

private boolean addWorker(Runnable firstTask, boolean core) { retry: &emsp; &emsp; &emsp; &emsp;// An outer loop is used to determine the state of the thread pool
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null&&! workQueue.isEmpty()))return false; &emsp; &emsp; &emsp; &emsp; &emsp; &emsp;// The task of the inner loop is to increase the number of workers by 1
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if(runStateOf(c) ! = rs)continue retry;
                // else CAS failed due to workerCount change; retry inner loop} } &emsp; &emsp; &emsp; &emsp;// After worker adds 1, next add woker to HashSet< worker > and start worker
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if(t ! =null) {
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    int rs = runStateOf(c);

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true; }}finally{ mainLock.unlock(); } &emsp; &emsp; &emsp; &emsp; &emsp; &emsp; &emsp; &emsp; &emsp;// If adding to HashSet
      
        succeeds, the thread is started
      
                if (workerAdded) {
                    t.start();
                    workerStarted = true; }}}finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
Copy the code

The main task of addWorker(Runnable firstTask, Boolean Core) is to create and start a thread.

It determines whether a thread can be created based on the state of the current thread and the given value (core or maximum).

AddWorker has four ways to send parameters. Execute uses three of them, respectively:

1.addWorker(paramRunnable, true)

If the number of threads is smaller than corePoolSize, a task to be processed is added to the Workers Set. Return false if Workers Set length exceeds corePoolSize.

2.addWorker(null, false)

Put an empty task into the workers Set with the maximumPoolSize limit. Such a worker whose task is empty will take the task from the task queue when the thread is executing, which is equivalent to creating a new thread without assigning the task immediately.

3.addWorker(paramRunnable, false)

When the queue is full, it tries to put the new task directly into the Workers Set with a maximumPoolSize limit. Return false if the thread pool is also full.

There are also cases where the execute() method is not used

addWorker(null, true)

Workers Set () {return false if the number of tasks in the Set has reached corePoolSize (); The actual use is in the prestartAllCoreThreads() method, which is used to pre-start the corePoolSize of workers waiting to execute tasks from the workQueue.

Execution process:

Check whether the thread pool is currently in the state where worker threads can be added. If yes, proceed to the next step. The state of the thread pool is terminated and the worker thread cannot be added. The state of the thread pool is terminated and the worker thread cannot be added. C, the thread pool status ==shutdown, firstTask==null, workQueue is empty, worker thread cannot be added. Because firstTask is empty in order to add a thread with no task and then get the task from the workQueue, which is empty, If the number of threads in the thread pool exceeds the upper limit (corePoolSize or maximumPoolSize), return false. If the number of threads in the thread pool exceeds the upper limit (corePoolSize or maximumPoolSize), return false. 3. Under the guarantee of ReentrantLock of the thread pool, add the newly created worker instance to the Workers Set, unlock it after completion and start the worker thread. If all these are successful, return true. If adding the worker to the Set fails or starting fails, the addWorkerFailed() logic is called

3. Four common thread pools

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int var0) {
        return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
    }
public static ExecutorService newFixedThreadPool(int var0, ThreadFactory var1) {
    return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), var1);
}
Copy the code

Fixed size thread pool, you can specify the thread pool size, corePoolSize is equal to maximumPoolSize, block queue is LinkedBlockingQueue, size is integer maximum. The number of threads in the thread pool is always the same. When a new task is submitted, it is executed immediately if there are idle threads in the thread pool. If there are no threads, it is temporarily stored in the blocking queue. For a fixed-size thread pool, there is no change in the number of threads. Also use an unbounded LinkedBlockingQueue to store executed tasks. When task submissions are frequent, the LinkedBlockingQueue grows rapidly and deplets system resources. In addition, when the thread pool is idle, that is, there are no runnable tasks in the thread pool, it will not release the worker thread, and will occupy some system resources, requiring shutdown.

newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor(a) {
        return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1.1.0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));
    }

    public static ExecutorService newSingleThreadExecutor(ThreadFactory var0) {
        return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1.1.0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), var0));
    }
Copy the code

A single thread pool is a thread pool with only one thread. The blocking queue uses LinkedBlockingQueue. Any redundant tasks submitted to the thread pool are temporarily stored in the blocking queue until they are idle. Tasks are executed in first-in, first-out order.

newCachedThreadPool

public static ExecutorService newCachedThreadPool(a) {
        return new ThreadPoolExecutor(0.2147483647.60L, TimeUnit.SECONDS, new SynchronousQueue());
    }

    public static ExecutorService newCachedThreadPool(ThreadFactory var0) {
        return new ThreadPoolExecutor(0.2147483647.60L, TimeUnit.SECONDS, new SynchronousQueue(), var0);
    }
Copy the code

Cache thread pool. Cached threads live for 60 seconds by default. The core pool for threads, corePoolSize, is 0, the core pool is integer. MAX_VALUE, and the blocking queue is SynchronousQueue. Is a direct commit blocking queue, which always forces the thread pool to add new threads to perform new tasks. When no task is executed, when the idle time of the thread exceeds keepAliveTime (60 seconds), the worker thread will terminate and be reclaimed. When a new task is submitted, if there are no idle threads, a new thread will be created to execute the task, resulting in some system overhead. If a large number of tasks are submitted at the same time, and the task execution time is not particularly fast, the thread pool will add an equal number of new tasks, which can quickly exhaust the system’s resources.

newScheduledThreadPool

public static ScheduledExecutorService newScheduledThreadPool(int var0) {
        return new ScheduledThreadPoolExecutor(var0);
    }

    public static ScheduledExecutorService newScheduledThreadPool(int var0, ThreadFactory var1) {
        return new ScheduledThreadPoolExecutor(var0, var1);
    }
Copy the code

A timed thread pool that can be used to perform tasks periodically, usually to periodically synchronize data.

ScheduleAtFixedRate: Indicates that tasks are executed at a fixed frequency. The period refers to the interval between successful tasks.

SchedultWithFixedDelay: A task is executed with a fixed delay. The delay is the time after the last execution succeeds and before the next execution starts.

4. Example

NewFixedThreadPool instances:

public class FixPoolDemo {

    private static Runnable getThread(final int i) {
        return new Runnable() {
            @Override
            public void run(a) {
                try {
                    Thread.sleep(500);
                } catch(InterruptedException e) { e.printStackTrace(); } System.out.println(i); }}; }public static void main(String args[]) {
        ExecutorService fixPool = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 10; i++) { fixPool.execute(getThread(i)); } fixPool.shutdown(); }}Copy the code

NewCachedThreadPool instances:

public class CachePool {
    private static Runnable getThread(final int i){
        return new Runnable() {
            @Override
            public void run(a) {
                try {
                    Thread.sleep(1000);
                }catch(Exception e){ } System.out.println(i); }}; }public static  void main(String args[]){
        ExecutorService cachePool = Executors.newCachedThreadPool();
        for (int i=1; i<=10; i++){ cachePool.execute(getThread(i)); }}}Copy the code

Instead of calling shutDown, you can see that after 60 seconds, the resource is automatically released.

NewSingleThreadExecutor instances:

public class SingPoolDemo {
    private static Runnable getThread(final int i){
        return new Runnable() {
            @Override
            public void run(a) {
                try {

                    Thread.sleep(500);
                } catch(InterruptedException e) { e.printStackTrace(); } System.out.println(i); }}; }public static void main(String args[]) throws InterruptedException {
        ExecutorService singPool = Executors.newSingleThreadExecutor();
        for (int i=0; i<10; i++){ singPool.execute(getThread(i)); } singPool.shutdown(); }Copy the code

Note that newSingleThreadExecutor, like newFixedThreadPool, does not release system resources when there are no tasks in the thread pool, so shudown is required.

NewScheduledThreadPool instances:

public class ScheduledExecutorServiceDemo {
    public static void main(String args[]) {

        ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
        ses.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run(a) {
                try {
                    Thread.sleep(4000);
                    System.out.println(Thread.currentThread().getId() + "Executed.");
                } catch(InterruptedException e) { e.printStackTrace(); }}},0.2, TimeUnit.SECONDS); }}Copy the code

5. There are several caveats to manually creating a thread pool

  1. ** Task independence. ** Deadlocks can occur if tasks depend on other tasks. For example, if a task is waiting for a return value or execution result from another task, a thread starvation deadlock will occur unless the thread pool is large enough.
  2. ** Properly configure tasks that are blocked for a long time. ** If the task is blocked for too long, thread pool performance will deteriorate even without deadlocks. In Java, block methods are defined in both time-limited and open-ended modes. Such as Thread. Join BlockingQueue. Put CountDownLatch. Await, etc., if a task timeout, then identify task failure, and then to suspend the mission or put back to the task queue for later execution, in this way, regardless of the task to the success of the outcome, Such an approach would ensure that the mission would always continue.
  3. Set a reasonable thread pool size. The thread pool size =NCPU *UCPU(1+W/C).
  4. ** Select the appropriate blocking queue. **newFixedThreadPool and newSingleThreadExecutor both use unbounded blocking queues. Unbounded blocking queues consume a lot of memory. If a bounded blocking queue is used, it will avoid the problem of consuming too much memory. What about new tasks? When using bounded queues, the appropriate rejection policy needs to be selected, and the queue size and thread pool size must be adjusted together. For very large or unbounded thread pools, SynchronousQueue can be used to avoid queueing tasks and submit them directly from the producer to the worker thread.

Here is a thread pool used by the Thrift framework to handle socket tasks, and you can see how FaceBook’s engineers customized the thread pool.

    private static ExecutorService createDefaultExecutorService(Args args) {
        SynchronousQueue executorQueue = new SynchronousQueue();

        return new ThreadPoolExecutor(args.minWorkerThreads, args.maxWorkerThreads, 60L, TimeUnit.SECONDS,
                executorQueue);
    }
Copy the code

6. Summary

6.1 How Do I Select the Number of thread pools

The size of the thread pool determines the performance of the system, too large or too small the number of thread pools can not play the optimal system performance.

Of course, the size of the thread pool doesn’t need to be too precise, just too big or too small. In general, determining the size of a thread pool takes into account the number of cpus, the size of memory, and whether the task is computationally or IO intensive

NCPU = Number of cpus

UCPU = Expected CPU usage 0 ≤ UCPU ≤ 1

W/C = ratio of waiting time to calculation time

If you want the processor to achieve ideal utilization, the optimal size of the thread pool is:

Thread pool size =NCPU *UCPU(1+W/C)

Used in Java

int ncpus = Runtime.getRuntime().availableProcessors();
Copy the code

Get the number of cpus.

6.2 Thread pool Factory

If no thread factory is specified for the Executors, the DefaultThreadFactory in Executors will be used. By default, threads created by the thread pool factory are non-daemon threads.

You can do a lot of things with a custom thread factory, such as keeping track of how many threads were created by the thread pool when, as well as customizing thread names and priorities. If you have

All new threads are set up as daemons, forcing the thread pool to be destroyed when the main thread exits.

The following example records the creation of a thread and sets all the threads as daemons.

public class ThreadFactoryDemo {
    public static class MyTask1 implements Runnable{

        @Override
        public void run(a) {
            System.out.println(System.currentTimeMillis()+"Thrad ID:"+Thread.currentThread().getId());
            try {
                Thread.sleep(100);
            } catch(InterruptedException e) { e.printStackTrace(); }}}public static void main(String[] args){
          MyTask1 task = new MyTask1();
        ExecutorService es = new ThreadPoolExecutor(5.5.0L, TimeUnit.MICROSECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setDaemon(true);
                System.out.println("Create thread"+t);
                returnt; }});for (int i = 0; i<=4; i++){ es.submit(task); }}}Copy the code

6.3 Extending thread pools

ThreadPoolExecutor can expand, it provides several methods: can rewrite in subclasses beforeExecute, afterExecute and terimated.

BeforeExecute and afterExecute are called in the thread executing the task, and logging, timing, monitoring, or statistical collection can be added to these methods,

You can also output useful debugging information to help the system diagnose faults. Here is an example of extending a thread pool:

public class ThreadFactoryDemo {
    public static class MyTask1 implements Runnable{

        @Override
        public void run(a) {
            System.out.println(System.currentTimeMillis()+"Thrad ID:"+Thread.currentThread().getId());
            try {
                Thread.sleep(100);
            } catch(InterruptedException e) { e.printStackTrace(); }}}public static void main(String[] args){
          MyTask1 task = new MyTask1();
        ExecutorService es = new ThreadPoolExecutor(5.5.0L, TimeUnit.MICROSECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setDaemon(true);
                System.out.println("Create thread"+t);
                returnt; }});for (int i = 0; i<=4; i++){ es.submit(task); }}}Copy the code

Proper use of thread pools

The following ali code specification said a paragraph:

Do not use Executors to create a thread pool. Use ThreadPoolExecutor to clear the running rules of the thread pool and avoid resource depletion. Executors each method disadvantages:

  1. NewFixedThreadPool and newSingleThreadExecutor:

    The main problem is that the stacked request processing queue can consume a lot of memory, or even OOM.

  2. NewCachedThreadPool and newScheduledThreadPool:

    The main problem is that the maximum number of threads is integer.max_value, which can create a very large number of threads, or even OOM.