Introduction to ForkJoin framework
ForkJoin is a multi-line concurrency framework since JDK1.7. ForkJoin is a framework that splits large tasks into smaller ones and aggregates the results of each of those smaller tasks. Hadoop-like MapReduce idea.
Fork is to divide a large task into several sub-tasks for parallel execution. Join is to merge the execution results of these sub-tasks and finally obtain the result of the large task. For example, calculate 1+2+… +10000, can be divided into 10 subtasks, each subtask to sum 1000 numbers, and finally summarize the results of the 10 subtasks. The running flow chart of Fork/Join is as follows:
Job stealing algorithm
A work-stealing algorithm is a thread stealing tasks from other queues to execute. The fork-Join framework uses a special thread pool called ForkJoinPool to handle dependencies between tasks, which implements the “work-stealing” algorithm and executes ForkJoinTasks. ForkJoinPool maintains multiple threads, the number of which is set to the number of CPU cores. Instead of all threads sharing a common queue, each thread has a special type of Deques queue (double-endian queue) that places all tasks for that thread.
The operation flow chart of job theft is as follows:
So why use a job-stealing algorithm? If we need to do a big task, we can put this task division for a number of mutually dependent child tasks, in order to reduce the competition between threads, then put these subtasks are different in the queue, and create a separate thread for each queue to perform the tasks in the queue, thread and the queue is not corresponding to a For example, thread A handles tasks in queue A. However, some threads finish tasks in their queue first, while others have tasks in their queue. Instead of waiting, a finished thread can help another thread, so it steals a task from another thread’s queue to execute. And then they can access the same queue, so in order to reduce theft task thread and stolen competition between task thread, often use deque, stolen task thread forever from the head of deque task execution with (LIFO last in, first out), and the task of stealing thread forever from the end of the deque task execution with first in first out (FIFO).
The advantage of the job stealing algorithm is that it makes full use of threads for parallel computation and reduces contention between threads. The disadvantage of the algorithm is that there is still contention in some cases, such as when there is only one task in a double-ended queue. It also consumes more system resources, such as creating multiple threads and multiple double-ended queues.
If you don’t understand, let’s do another example.
As shown below:
In the figure above, each thread has its own queue of tasks that can be executed by the thread, allowing the thread to temporarily put a blocked task aside. That is, if the current task cannot continue (and may need to rely on subtasks), the blocking task is placed in the queue and suspended until all dependencies are ready.
New tasks are added to the queue of threads through a Fork operation, and each thread always processes “last added to the queue (LIFO last in first out)”.
As shown in the figure above, in the task queue of thread 1, “Task 1” enters the task queue before “Task 2”, so task 2 is executed first by thread 1, and then task 1 is executed. If possible, any idle thread can grab (steal) tasks from another thread’s queue. One thread always steals the “oldest” task (FIFO first in first out) from the other threads.
As shown in the figure above, thread 2 steals the oldest task 1 from thread 1’s queue. Threads always try to steal from neighboring threads to reduce the contention that can occur when stealing tasks.
Task execution and steal the order is very important, as is often the case, the task to steal this movement will not happen frequently, because it steal moves more consumption cost, when a task from one thread to another thread, related to the task context needs to move from a thread’s stack to another thread’s stack. If two threads happen not to be in the same CPU, context switching can also occur between cpus, which is more costly. Therefore, the fork-Join framework tries to minimize this situation.
Introduction to ForkJoin framework
ForkJoin is based on the division of large tasks and the merging of small tasks. Fork and join.
1. Split tasks – fork. You need a class that splits large tasks into subtasks until the subtasks are small enough.
2. Execute the task and merge the result. The partitioned subtasks are placed separately in DeQUES and threads are started to execute the tasks in DeQUES. The results of subtasks are placed in a queue, and a thread is started to take data from the queue and merge the data.
ForkJoin uses two classes to do these two things:
- ForkJoinPool: ForkJoinPool is the core of ForkJoin, which implements
AbstractExecutorService
And managed itForkJoinWorkerThread
. A ForkJoinPool does not assign a thread to each ForkJoinTask, but a deque to each worker thread. A ForkJoinTask needs to be executed through a ForkJoinPool. The subtasks segmented by the task are added to the two-ended queue maintained by the current worker thread and enter the head of the queue. When a worker thread has no work in its queue temporarily, it randomly fetches a task from the tail of another worker thread’s queue.
- ForkJoinTask: To use the ForkJoin framework, we must first create a ForkJoin task. ForkJoinTask provides a mechanism for performing fork() and join() operations on tasks. Generally, we do not need to inherit ForkJoinTask directly, but only subclasses. The fork /Join framework provides the following two subclasses:
-
- RecursiveAction: Used for tasks that do not return results.
-
- RecursiveTask: Used for tasks that return results.
ForkJoinPool constructor
ForkJoinPool constructor:
public ForkJoinPool() {
this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
defaultForkJoinWorkerThreadFactory, null.false);
}
Copy the code
ForkJoinPool () to the Runtime. AvailableProcessors () method return value as a parameter to create ForkJoinPool parallelism.
public ForkJoinPool(int parallelism) {
this(parallelism, defaultForkJoinWorkerThreadFactory, null.false);
}
Copy the code
ForkJoinPool(int Parallelism) Creates a ForkJoinPool that contains parallel threads of parallelism.
The public constructor with the most arguments:
public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode) {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}
Copy the code
-
Parallelism: The parallelism level is used by the Fork/Join framework to determine the number of parallel threads in the framework. There is one thread for each task in parallel, but don’t read this as the maximum number of threads that exist in the Fork/Join framework, Do not compare this attribute to the corePoolSize and maximumPoolSize attributes of the ThreadPoolExecutor thread pool, because ForkJoinPool is organized and works in a completely different way. In subsequent discussions, readers will also find that the relationship between the number of threads that can exist in the Fork/Join framework and the value of this parameter is not absolute (valid but not entirely determined by it).
-
Factory: When the Fork/Join framework creates a new thread, it also uses the thread-creation factory. But this thread factory is no longer need to implement ThreadFactory interface, but need to implement ForkJoinWorkerThreadFactory interface. The latter is a functional interface that simply implements a method called newThread. In the Fork/Join framework has a default ForkJoinWorkerThreadFactory interface implementation: DefaultForkJoinWorkerThreadFactory
-
Handler: Exception catching processor. When an exception occurs in a executed task and is thrown from the task, it is caught by the handler.
-
AsyncMode: This parameter is also important because it literally means asynchronous mode. It does not say whether the Fork/Join framework works in synchronous or asynchronous mode. In the Fork/Join framework, there is a queue of tasks to be executed for each thread working independently. This queue is a two-way queue composed of arrays. That is, the task to be executed in the queue can use the first in, first out (FIFO) mode, or the last in, first out (LIFO) mode.
Private constructor/**
* Creates a {@code ForkJoinPool} with the given parameters, without
* any security checks or parameter validation. Invoked directly by
* makeCommonPool.
*/
private ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
int mode,
String workerNamePrefix) {
this.workerNamePrefix = workerNamePrefix;
this.factory = factory;
this.ueh = handler;
this.config = (parallelism & SMASK) | mode;
long np = (long)(-parallelism); // offset ctl counts
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}
Copy the code
You can see that the public constructor traces the root up to the call to the private constructor.
There is also a private constructor: makeCommonPool()
/** * Creates and returns the common pool, respecting user settings * specified via system properties. */
private static ForkJoinPool makeCommonPool() {
int parallelism = -1;
ForkJoinWorkerThreadFactory factory = null;
UncaughtExceptionHandler handler = null;
try { // ignore exceptions in accessing/parsing properties
String pp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.parallelism");
String fp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.threadFactory");
String hp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
if(pp ! =null)
parallelism = Integer.parseInt(pp);
if(fp ! =null)
factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
getSystemClassLoader().loadClass(fp).newInstance());
if(hp ! =null)
handler = ((UncaughtExceptionHandler)ClassLoader.
getSystemClassLoader().loadClass(hp).newInstance());
} catch (Exception ignore) {
}
if (factory == null) {
if (System.getSecurityManager() == null)
factory = defaultForkJoinWorkerThreadFactory;
else // use security-managed default
factory = new InnocuousForkJoinWorkerThreadFactory();
}
if (parallelism < 0 && // default 1 less than #cores
(parallelism = Runtime.getRuntime().availableProcessors() - 1) < =0)
parallelism = 1;
if (parallelism > MAX_CAP)
parallelism = MAX_CAP;
return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
"ForkJoinPool.commonPool-worker-");
}
Copy the code
The makeCommonPool() method is described to create a ForkJoinPool (LIFO_QUEUE) that depends on the Settings specified by the user through the system configuration.
User manual configuration:
parallelism
threadFactory
exceptionHandler
Copy the code
ForkJoinTask starts
Asynchronous execution Execute (ForkJoinTask) forkJoinTask. fork Wait for the result. Invoke (ForkJoinTask) ForkJoinTask.invoke executes to obtain the Future Submit (ForkJoinTask) ForkJoinTask. Fork (ForkJoinTask are Futures)Copy the code
ForkJoinTask exception handling
A ForkJoinTask may occasionally raise an exception when it is executing, but there is no way to catch exceptions when they are being recorded directly on the main thread, so ForkJoinTask provides an isCompletedAbnormally() method to check if a task has abnormally thrown an exception or been canceled. An exception can be obtained through the getException method of ForkJoinTask.
/**
* Returns {@code true} if this task threw an exception or was cancelled.
*
* @return {@code true} if this task threw an exception or was cancelled
*/
public final boolean isCompletedAbnormally() {
return status < NORMAL;
}
Copy the code
Return true if task throws an exception or is cancelled.
/**
* Returns {@code true} if this task completed without throwing an
* exception and was not cancelled.
*
* @return {@code true} if this task completed without throwing an
* exception and was not cancelled
*/
public final boolean isCompletedNormally() {
return (status & DONE_MASK) == NORMAL;
}
Copy the code
Returns true if the task is not canceled and completes without throwing an exception.
/**
* Returns the exception thrown by the base computation, or a
* {@code CancellationException} if cancelled, or {@code null} if
* none or if the method has not yet completed.
*
* @return the exception, or {@code null} if none
*/
public final Throwable getException() {
int s = status & DONE_MASK;
return ((s >= NORMAL) ? null :
(s == CANCELLED) ? new CancellationException() :
getThrowableException());
}
Copy the code
Get the exception thrown:
- An exception thrown by the task
- If the task is cancelled, it is thrown
CancellationException
- The task is not completed or no exception is thrown to return NULL
ForkJoin implementation principle
ForkJoinPool consists of a ForkJoinTask array that stores the tasks submitted to the ForkJoinPool by programs, and a ForkJoinWorkerThread array. The ForkJoinWorkerThread array performs these tasks.
fork()
ForkJoinTask implements the fork method. When we call a ForkJoinTask fork, the program calls the ForkJoinWorkerThread workQueue, pushes the current task to the workQueue, and returns the result.
/**
* Arranges to asynchronously execute this task in the pool the
* current task is running in, if applicable, or using the {@link
* ForkJoinPool#commonPool()} if not {@link #inForkJoinPool}. While
* it is not necessarily enforced, it is a usage error to fork a
* task more than once unless it has completed and been
* reinitialized. Subsequent modifications to the state of this
* task or any data it operates on are not necessarily
* consistently observable by any thread other than the one
* executing it unless preceded by a call to {@link #join} or
* related methods, or a call to {@link #isDone} returning {@code
* true}.
*
* @return {@code this}, to simplify usage
*/
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
Copy the code
ForkJoinWorkerThread. WorkQueue. Push method keep the task in the queue ForkJoinTask array. ForkJoinPool signalWork(WorkQueue[] ws, WorkQueue Q) is then invoked to wake up or create a worker thread to perform the task.
/**
* Pushes a task. Call only by owner in unshared queues. (The
* shared-queue version is embedded in method externalPush.)
*
* @param task the task. Caller must ensure non-null.
* @throws RejectedExecutionException if array cannot be resized
*/
final void push(ForkJoinTask
task){ ForkJoinTask<? >[] a; ForkJoinPool p; int b = base, s = top, n;if((a = array) ! =null) { // ignore if queue removed
int m = a.length - 1; // fenced write for task visibility
U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
U.putOrderedInt(this, QTOP, s + 1);
if ((n = s - b) <= 1) {
if((p = pool) ! =null)
p.signalWork(p.workQueues, this);
}
else if(n >= m) growArray(); }}Copy the code
join()
ForkJoinTask implements join methods. The Join method blocks the current thread and waits for the result.
/**
* Returns the result of the computation when it {@link #isDone is
* done}. This method differs from {@link #get()} in that
* abnormal completion results in {@code RuntimeException} or
* {@code Error}, not {@code ExecutionException}, and that
* interrupts of the calling thread do <em>not</em> cause the
* method to abruptly return by throwing {@code
* InterruptedException}.
*
* @return the computed result
*/
public final V join() {
int s;
// If the result of calling doJoin is not NORMAL, an exception or cancellation occurs. Report abnormal
if((s = doJoin() & DONE_MASK) ! = NORMAL) reportException(s);If the value is equal to NORMAL, the original result is returned
return getRawResult();
}
Copy the code
As you can see, the doJoin() method is called first. The doJoin() method is used to get the status of the current task to determine what result is returned. There are four task states:
(2) It has been NORMAL CANCELLED while the SIGNAL is abnormal.Copy the code
/** * Throws exception, if any, associated with the given status. */
private void reportException(int s) {
CancellationException is returned when the task is cancelled
if (s == CANCELLED)
throw new CancellationException();
// If the task is abnormal, an exception is thrown
if (s == EXCEPTIONAL)
rethrow(getThrowableException());
}
Copy the code
doJoin()
In the doJoin() method, first check the status of the task to see whether the task has been executed. If the execution is completed, the task status is directly returned. If not, the task is fetched from the task array and executed.
If the task is successfully executed, set the task status to NORMAL. If an exception occurs, record the exception and set the task status to EXCEPTIONAL.
/**
* Implementation for join, get, quietlyJoin. Directly handles
* only cases of already-completed, external wait, and
* unfork+exec. Others are relayed to ForkJoinPool.awaitJoin.
*
* @return status upon completion
*/
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
// If completed, return status. If not, try again
return (s = status) < 0 ? s :
// The current thread is ForkJoinWorkerThread. The current thread is ForkJoinWorkerThread. The ForkJoinWorkerThread is ForkJoinWorkerThread.
// The result of execution is that the status is returned when the thread pool is complete, otherwise the awaitJoin method of the ForkJoinPool is used to wait
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
// If the current thread is not ForkJoinWorkerThread, call externalAwaitDone.
externalAwaitDone();
}
/**
* Primary execution method for stolen tasks. Unless done, calls
* exec and records status if completed, but doesn't wait for
* completion otherwise.
*
* @return status on exit from this method
*/
final int doExec() {
int s; boolean completed;
if ((s = status) >= 0) {
try {
completed = exec();
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
if (completed)
s = setCompletion(NORMAL);
}
return s;
}
Copy the code
invoke()
Invoke() calls the doInvoke() method and returns the state. If the state is not NORMAL, an exception is reported. Returns the result if the status is normal.
/**
* Commences performing this task, awaits its completion if
* necessary, and returns its result, or throws an (unchecked)
* {@code RuntimeException} or {@code Error} if the underlying
* computation did so.
*
* @return the computed result
*/
public final V invoke() {
int s;
if((s = doInvoke() & DONE_MASK) ! = NORMAL)// The result status of the doInvoke method keeps only the complete state bit indicating that it is not NORMAL, and then reports an exception
reportException(s);
// Complete normally, return original result
return getRawResult();
}
/**
* Implementation for invoke, quietlyInvoke.
*
* @return status upon completion
*/
private int doInvoke() {
int s; Thread t; ForkJoinWorkerThread wt;
return (s = doExec()) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(wt = (ForkJoinWorkerThread)t).pool.
/ / ForkJoinPool: : awaitJoin, in the method used in the circulation way internalWait,
// It satisfies the need to wait by the cutoff time or period every time, and also solves the false wake up problem
awaitJoin(wt.workQueue, this, 0L) :
externalAwaitDone();
}
Copy the code
reference
- www.infoq.cn/article/for…
- Blog.csdn.net/tyrroo/arti…
- www.jianshu.com/p/2db037afd…
- www.cnblogs.com/wenbronk/p/…
- zhuanlan.zhihu.com/p/38204373
- www.cnblogs.com/coloz/p/111…