This is the 11th day of my participation in Gwen Challenge
1. Executors Tools
In the previous article, we summarized the basic usage of the thread pool TreadPoolExecutor class. So the Executors class is provided in the JDK to create common thread pools:
ExecutorService newFixedThreadPool(int nThreads)
: Creates a fixed sizenThreads
In the thread pool, redundant tasks are queued for processingExecutorService newSingleThreadExecutor()
: Creates a single thread libraryExecutorService newCachedThreadPool()
: Create a cacheable thread pool. If the length of the thread pool exceeds the processing requirement, you can recycle idle threads flexibly, or create a new thread if none is availableScheduledExecutorService newScheduledThreadPool(int corePoolSize)
: Creates a fixed-length thread pool to support scheduled and periodic task execution
1.1 newFixedThreadPool method
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
Copy the code
As you can see, the source code uses the ThreadPoolExecutor class to create a thread pool with a core thread equal to the maximum number of threads, so the newFixedThreadPool feature is to reuse a fixed number of threads to execute tasks, and queue tasks if the current threads are all working. The default rejection policy is AbortPolicy. The default queue SIZE is Integer.SIZE, which is about 2 billion, so the queue is usually not full.
1.2 newSingleThreadExecutor method
public static ExecutorService newSingleThreadExecutor(a) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1.1.0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
Copy the code
The core of the newSingleThreadExecutor method is to create a thread pool of fixed size 1 from ThreadPoolExecutor, That is, getActiveCount, getPoolSize, getMaximumPoolSize are all 1. Here, if the currently active thread dies due to an exception, the thread pool creates a new thread to replace the original thread. Only one thread can exist at a time.
1.3 newCachedThreadPool method
public static ExecutorService newCachedThreadPool(a) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
Copy the code
Because SynchronousQueue is used as a blocking queue, it is characterized by reusing idle threads when they are alive and creating new threads otherwise.
1.4 newScheduledThreadPool method
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
Copy the code
The ScheduledExecutorService class is used for scheduled tasks.
conclusion
The essence of the Executors method is to wrap the ThreadPoolExecutor class and create a common thread pool class.
2. Future, FutureTask and Callable
The executre method that performs Runnable tasks was mentioned in the previous ThreadPoolExecutor article, as well as the Submit method.
public <T> Future<T> submit(Callable<T> task)
Copy the code
So what are Future, FutureTask, and Callable for?
2.1 the Future
Future is used to cancel, query, and retrieve the results of a specific Runnable or Callable task. If necessary, the results of the execution can be obtained through the GET method, which blocks until the task returns results.
public interface Future<V> {
/** * Attempts to cancel the task * Returns false when the task is completed, canceled, or cannot be canceled * If true is returned and the task has not started, it should be canceled. * If the task has already started, the mayInterruptIfRunning parameter determines whether to interrupt */
boolean cancel(boolean mayInterruptIfRunning);
/** * returns ture */ if called before the task is completed
boolean isCancelled(a);
/** * Task completion returns true */
boolean isDone(a);
/** * Wait until the execution is complete to obtain the result *@return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
*/
V get(a) throws InterruptedException, ExecutionException;
/** * wait a maximum of timeout to obtain the result, the specified time is not completed to obtain the result, return null *@param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
* @throws TimeoutException if the wait timed out
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Copy the code
According to the source code, the Future function definition is:
- Judge whether the task is completed;
- Can interrupt the task;
- Obtain the task execution results.
2.2 FutureTask
Look directly at the source definition:
public class FutureTask<V> implements RunnableFuture<V>{
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable}}public interface RunnableFuture<V> extends Runnable.Future<V> {
void run(a);
}
Copy the code
As you can see, FutureTask implements the RunnableFuture interface, which inherits the Runnable and Future interfaces. So FutureTask has features of both Runnable and Future. As you can see from its constructor, a Future can encapsulate a Callable object and then serve as a submission task.
And here’s the key pointsubmit
Method parameter types can cause the return valueFuture
Different.
In fact, FutureTask is a unique implementation class for the Future interface.
2.3 Callable
The Callable interface, located in the java.util.concurrent package, also declares a single method in it. The Callable interface is defined similarly to Runnable:
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call(a) throws Exception;
}
Copy the code
As you can see, Callable is a generic interface, and the return value type is defined as a generic type, so typically we work with FutureTask and ExecutorService for task submission and retrieval.
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result); Future<? > submit(Runnable task);Copy the code
In general, we use the first submit method and the third submit method, and the second submit method is rarely used. If Runnable has no return value, how can we get the return value? This is actually FutureTask, which implements the Runnable interface.
2.4 Submit process analysis
Let’s start with a UML diagram of the class:
1. Submit method
The submit method is defined in the ExecutorService interface and implemented by the AbstractExecutorService class.
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
/ * * *@throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc} * /
publicFuture<? > submit(Runnable task) {if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
/ * * *@throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc} * /
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
/ * * *@throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc} * /
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
Copy the code
As you can see in the Submit method, you end up converting to a RunnableFuture object, which essentially points to a FutureTask type. In the final execution, the execute method is used.
We mentioned earlier that the task type passed in the Submit method affects the return value. What is the problem? Call newTaskFor to generate FutureTask.
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
Copy the code
Therefore, in the subsequent execution, the call method of the Callbale object is called, and the result is stored in the running FutureTask for return, which is normally retrieved.
If the type passed in is Runnable, the newTaskFor method is also called to generate the FutureTask object.
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call(a) {
task.run();
returnresult; }}Copy the code
As you can see, after a series of transitions, the final result is a RunnableAdapter. The value of the RunnableAdapter is the result passed in, and the value of the RunnableAdapter is null.
Runnable: Runnable: Runnable: Runnable: Runnable: Runnable
submit
Method passed inRunnable
Type, generally to get the result, willCallable
Object construction intoFutureTask
Type is passed in, (denoted here asFutureA
);- call
newTaskFor
Method to generateFutureTask
Object (denoted asFutureB
), and that object is ussubmit
Method returnFuture
Object; - in
FutureTask
Called in the constructor ofExecutors.callable(runnable, result)
Method to build aCallable
Objects are stored inFutureTask
(i.e.FutureB
)callable
In the. Among themresult
The default isnull
Because the incoming isRunnable
Type, so build by creating a new oneCallable
A subclass ofRunnableAdapter
Encapsulate. - when
task
A mission is executed when it has been successfully initiatedcallable
的call
Methods. Due to the currentCallable
The object isRunnableAdapter
Type, so ultimately the call is passed inRunnable
(FutureTask
Type)run
Method and the return value isresult
. - After such twists and turns, eventually back to build the original
FutureTask
的Callable
In the callcall
Method, the computed result is then stored in the pass as parameterFutureTask
, and return the valueFuture
As a result,result
.
Therefore, when FutureTask + Callable is used together, null will appear if the calculation result is obtained through the return value of Submit.
In the introduction of ThreadPoolExecutor, we introduced the general process for execute, but did not involve the actual execution process, so here we will outline the process for the execution of the Submit method.
2. addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
/** * Omit n lines of code that determine if the thread pool is running and if the number of threads exceeds the limit. * If the thread pool is running and the thread does not exceed the threshold, proceed to create the thread. * /
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// Encapsulate firstTask as a Worker object.
w = new Worker(firstTask);
// Get the encapsulated task thread
final Thread t = w.thread;
if(t ! =null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck the status of the thread pool
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// If the thread pool is running or closed but has no tasks, add work to the pool
workers.add(w);
// Get the size of the added collection and change the poolSize value
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true; }}finally {
mainLock.unlock();
}
// Thread execution is enabled
if (workerAdded) {
t.start();
workerStarted = true; }}}finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Copy the code
The logic of addWorkder can be roughly divided into the following steps:
- Their main purpose is to determine whether the thread pool is in a running state and whether the number of threads exceeds the limit. If the thread pool is running and the thread does not exceed the threshold, proceed to create the thread.
- create
Worker
Object. - add
Worker
Object to collection. - To obtain
Worker
Thread and execute.
We want to get the final execution transformation, how to transfer to the interface we define, we need to strip Worker’s coat to see.
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run(a) {
runWorker(this);
}
// omit n lines of code
}
Copy the code
The first step is to make it clear that the Worker class is an inner class of ThreadPoolExecutor. Worker class is a subclass of integrated AbstractQueuedSynchronizer, AbstractQueuedSynchronizer should be familiar with the front (have we been introduced to in concurrent programming lock series), and implement the Runnable interface. It contains a Runnable and a Thread object, which is created by.
this.thread = getThreadFactory().newThread(this);
Copy the code
Create itself as a parameter. GetThreadFactory () ThreadPoolExecutor provides a method to obtain a ThreadFactory, which is implemented in the Executor inner class DefaultThreadFactory.
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private finalString namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s ! =null)? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix ="pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
// Take our task Runnable object as a common Thread
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
// Check whether it is a daemon thread. If it is a daemon thread, set false to make it not a daemon thread
if (t.isDaemon())
t.setDaemon(false);
// Set the thread priority to Normal
if(t.getPriority() ! = Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY);returnt; }}Copy the code
In this way, the Worker creates a thread with itself as the parameter, and when the thread starts, the Worker’s run method will be executed. Finally, runWorker(this) is executed.
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// Get the task encapsulated by the Worker
Runnable task = w.firstTask;
// Destroy the Runnable reference to the workder object and release the lock
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while(task ! =null|| (task = getTask()) ! =null) {
// Before the task is executed, the lock is acquired to ensure that the thread pool is not interrupted while the thread is executing
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {
Call the beforeExecute method before the task executes and throw an exception. The task will not execute. The method is empty.
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally{ afterExecute(task, thrown); }}finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally{ processWorkerExit(w, completedAbruptly); }}Copy the code
The key method here is task.run(). Since task is of type FutureTask, the program runs in the Run method of FutureTask.
public void run(a) {
if(state ! = NEW || ! UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if(c ! =null && state == NEW) {
V result;
boolean ran;
try {
// Call the call method of the custom Callable object to obtain the calculation result
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
// Set the return value of the calculation resultset(result); }}finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if(s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); }}protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion(); }}Copy the code
Eventually, all execution and result storage is reverted back to FutureTask.
At this point, the whole process logic analysis is finished.
3. Simple applications
Here are some simple examples to simulate how to use, mainly:
Future + Callable
FutureTask + Callable
3.1 the Future + Callable
public class HelloWorld {
Future<String> feature;
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callables callable = new Callables();
Future<Integer> future = executorService.submit(callable);
executorService.shutdown();
try {
System.out.println("Main thread fetch result:" + future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch(ExecutionException e) { e.printStackTrace(); }}static class Callables implements Callable<Integer>{
@Override
public Integer call(a) throws Exception {
Thread.sleep(1000);
int sum =0;
for(int i=0; i<100; i++) { sum += i; } System.out.println("Child thread calculation result:" + sum);
returnsum; }}}Copy the code
Execution Result:
Calculation result of child thread:4950Main thread fetch result:4950
Copy the code
3.2 FutureTask + Callable
public class HelloWorld {
Future<String> feature;
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callables callable = new Callables();
FutureTask<Integer> futureTask = newFutureTask<>(callable); Future<? > future = executorService.submit(futureTask); executorService.shutdown();try {
System.out.println("Main thread fetch result:" + future.get() + "= =" + futureTask.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch(ExecutionException e) { e.printStackTrace(); }}static class Callables implements Callable<Integer>{
@Override
public Integer call(a) throws Exception {
Thread.sleep(1000);
int sum =0;
for(int i=0; i<100; i++) { sum += i; } System.out.println("Child thread calculation result:" + sum);
returnsum; }}}Copy the code
Execution Result:
Result of subthread: 4950 Result of main thread: NULL ==4950Copy the code
As you can see, the Future returned by submit is null.
After this article, the concurrent programming article will be concluded, of course, there is more to cover, there will be time to continue.