In concurrency, not only do we rely on the various lock or queue operations described earlier – the multithreaded foundation of JAVA concurrency (5), but we also need to consider resource consumption. . This is where thread pools are introduced.

For the familiar Executors, we used the thread pool regularly. However, it is not recommended that we create a thread pool directly from this class. Instead, we need to create a thread pool from ThreadPoolExecutor. This will allow us to understand the state of the threads in the pool at any time and prevent thread exceptions in the thread pool.

Executors

  • newFixedThreadPool(int nThreads)Create a thread pool of fixed size.
  • newSingleThreadExecutor()Create a single thread pool
  • newCachedThreadPool()Create a cached thread pool in which threads will persist for a period of time after execution and then be released without execution.
  • newScheduledThreadPool(int corePoolSize)Create a thread pool for scheduled tasks

The first three implementations all use ThreadPoolExecutor, but pass in different parameters to create different thread pools. Let’s look at the implications of the parameters in ThreadPoolExecutor.

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
Copy the code
  • corePoolSizeCore thread pool size
  • maximumPoolSizeMaximum capacity of the thread pool
  • keepAliveTimeThread lifetime
  • unitA unit of survival time
  • workQueueBlocking queues are stored to execute tasks

Two parameters that will fill in for us by default:

  • threadFactoryThread factory, used to generate threads to be added to the thread pool
  • handlerRejection policy handler, used to reject redundant tasks in the current thread pool, etc. The default isAbortPolicyRejection policies

The thread factory will help us generate threads to add to the thread pool:

public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())// Whether it is a daemon thread
                t.setDaemon(false);
            if(t.getPriority() ! = Thread.NORM_PRIORITY)// Whether it is the default priority
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
Copy the code

The execute(Runnable Command) method in the thread pool is used to help us perform the required tasks:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();// CTL is a wrapped atomic class that contains the number and status of threads
        if (workerCountOf(c) < corePoolSize) {// The number of worker threads is smaller than the current number of core threads
            if (addWorker(command, true))// Join the queue, and true means to use the core thread
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {// Determine whether the thread pool is running and add tasks to the queue
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))If the thread pool is not running, reject the task
                reject(command);
            else if (workerCountOf(recheck) == 0)// If the number of threads is 0, open a thread to execute, not the core thread
                addWorker(null.false);
        }
        else if(! addWorker(command,false))The number of threads is greater than the number of core threads and smaller than the maximum number of threads.
            reject(command);
    }
Copy the code

ForkJoinTask

This is a special thread pool that can split a large task into several smaller tasks to execute. The results of each task will be integrated and returned after completion of execution. Two abstract classes are derived below: RecursiveAction, which returns no value and just partitions the task to execute. RecursiveTask has a return value, which is returned after the task is executed.

package com.montos.lock;

import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

public class CountTask extends RecursiveTask<Long> {
	private static final long serialVersionUID = 1L;
	private static final int THRESHOLD = 10000;
	private long start;
	private long end;

	public CountTask(long start, long end) {
		super(a);this.start = start;
		this.end = end;
	}

	@Override
	protected Long compute(a) {
		long sum = 0;
		boolean canCompute = (end - start) < THRESHOLD;// Determine the threshold
		if (canCompute) {
			for (longi = start; i <= end; i++) { sum += i; }}else {
			long step = (start + end) / 100;
			ArrayList<CountTask> subTasks = new ArrayList<CountTask>();
			long pos = start;
			for (int i = 0; i < 100; i++) {
				long lastOne = pos + step;
				if (lastOne > end)
					lastOne = end;
				CountTask subTask = new CountTask(pos, lastOne);
				pos += step + 1;
				subTasks.add(subTask);
				subTask.fork();// The child thread performs the solution
			}
			for (CountTask t : subTasks) {
				sum += t.join();// Return the sum of all result sets}}return sum;
	}
	public static void main(String[] args) {
		ForkJoinPool forkJoinPool = new ForkJoinPool();
		CountTask task = new CountTask(0.200000l);
		ForkJoinTask<Long> result = forkJoinPool.submit(task);
		try {
			Long res = result.get();
			System.out.println("result is :" + res);
		} catch(InterruptedException | ExecutionException e) { e.printStackTrace(); }}}Copy the code

Result is :20000100000 result is :20000100000

Some friends may encounter two errors (modify the parameter value, and cause the problem, here I encountered!!). :

  • 1.java.util.concurrent.ExecutionException:java.lang.StackOverflowError. The reason is thatForkJoinThere is no stack control, and code with care that method recursion does not exceed the JVM’s memory, and adjust the JVM’s memory if necessary: add it to the JDK configuration in Eclipse-XX:MaxDirectMemorySize=128(The default is 64MB). Change to 128, no stack overflow is reported, but the next error is reported.
  • 2.java.lang.NoClassDefFoundError: Could not initialize class java.util.concurrent.locks.AbstractQueuedSynchronizer$Node. The reason for this is that the processing length of subtasks is unbalanced. We need to compute the original length.

Most of the concurrent classes in the JDK so far talk about usage, and I’ll look forward to a later article describing and handling the underlying code.