In concurrency, not only do we rely on the various lock or queue operations described earlier – the multithreaded foundation of JAVA concurrency (5), but we also need to consider resource consumption. . This is where thread pools are introduced.
For the familiar Executors, we used the thread pool regularly. However, it is not recommended that we create a thread pool directly from this class. Instead, we need to create a thread pool from ThreadPoolExecutor. This will allow us to understand the state of the threads in the pool at any time and prevent thread exceptions in the thread pool.
Executors
newFixedThreadPool(int nThreads)
Create a thread pool of fixed size.newSingleThreadExecutor()
Create a single thread poolnewCachedThreadPool()
Create a cached thread pool in which threads will persist for a period of time after execution and then be released without execution.newScheduledThreadPool(int corePoolSize)
Create a thread pool for scheduled tasks
The first three implementations all use ThreadPoolExecutor, but pass in different parameters to create different thread pools. Let’s look at the implications of the parameters in ThreadPoolExecutor.
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
Copy the code
corePoolSize
Core thread pool sizemaximumPoolSize
Maximum capacity of the thread poolkeepAliveTime
Thread lifetimeunit
A unit of survival timeworkQueue
Blocking queues are stored to execute tasks
Two parameters that will fill in for us by default:
threadFactory
Thread factory, used to generate threads to be added to the thread poolhandler
Rejection policy handler, used to reject redundant tasks in the current thread pool, etc. The default isAbortPolicy
Rejection policies
The thread factory will help us generate threads to add to the thread pool:
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())// Whether it is a daemon thread
t.setDaemon(false);
if(t.getPriority() ! = Thread.NORM_PRIORITY)// Whether it is the default priority
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
Copy the code
The execute(Runnable Command) method in the thread pool is used to help us perform the required tasks:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();// CTL is a wrapped atomic class that contains the number and status of threads
if (workerCountOf(c) < corePoolSize) {// The number of worker threads is smaller than the current number of core threads
if (addWorker(command, true))// Join the queue, and true means to use the core thread
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {// Determine whether the thread pool is running and add tasks to the queue
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))If the thread pool is not running, reject the task
reject(command);
else if (workerCountOf(recheck) == 0)// If the number of threads is 0, open a thread to execute, not the core thread
addWorker(null.false);
}
else if(! addWorker(command,false))The number of threads is greater than the number of core threads and smaller than the maximum number of threads.
reject(command);
}
Copy the code
ForkJoinTask
This is a special thread pool that can split a large task into several smaller tasks to execute. The results of each task will be integrated and returned after completion of execution. Two abstract classes are derived below: RecursiveAction, which returns no value and just partitions the task to execute. RecursiveTask has a return value, which is returned after the task is executed.
package com.montos.lock;
import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
public class CountTask extends RecursiveTask<Long> {
private static final long serialVersionUID = 1L;
private static final int THRESHOLD = 10000;
private long start;
private long end;
public CountTask(long start, long end) {
super(a);this.start = start;
this.end = end;
}
@Override
protected Long compute(a) {
long sum = 0;
boolean canCompute = (end - start) < THRESHOLD;// Determine the threshold
if (canCompute) {
for (longi = start; i <= end; i++) { sum += i; }}else {
long step = (start + end) / 100;
ArrayList<CountTask> subTasks = new ArrayList<CountTask>();
long pos = start;
for (int i = 0; i < 100; i++) {
long lastOne = pos + step;
if (lastOne > end)
lastOne = end;
CountTask subTask = new CountTask(pos, lastOne);
pos += step + 1;
subTasks.add(subTask);
subTask.fork();// The child thread performs the solution
}
for (CountTask t : subTasks) {
sum += t.join();// Return the sum of all result sets}}return sum;
}
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
CountTask task = new CountTask(0.200000l);
ForkJoinTask<Long> result = forkJoinPool.submit(task);
try {
Long res = result.get();
System.out.println("result is :" + res);
} catch(InterruptedException | ExecutionException e) { e.printStackTrace(); }}}Copy the code
Result is :20000100000 result is :20000100000
Some friends may encounter two errors (modify the parameter value, and cause the problem, here I encountered!!). :
- 1.
java.util.concurrent.ExecutionException:java.lang.StackOverflowError
. The reason is thatForkJoin
There is no stack control, and code with care that method recursion does not exceed the JVM’s memory, and adjust the JVM’s memory if necessary: add it to the JDK configuration in Eclipse-XX:MaxDirectMemorySize=128
(The default is 64MB). Change to 128, no stack overflow is reported, but the next error is reported. - 2.
java.lang.NoClassDefFoundError: Could not initialize class java.util.concurrent.locks.AbstractQueuedSynchronizer$Node
. The reason for this is that the processing length of subtasks is unbalanced. We need to compute the original length.
Most of the concurrent classes in the JDK so far talk about usage, and I’ll look forward to a later article describing and handling the underlying code.