Click “like” to see, form a habit, the public account search [dime technology] pay attention to more original technical articles. This article has been included in GitHub org_Hejianhui /JavaStudy.

preface

We described two specific implementations of the Thread pool framework earlier:

  • ThreadPoolExecutor Default thread pool
  • ScheduledThreadPoolExecutor thread pool regularly

Thread pools provide a solution to the overhead and under-resourcing problems of the thread life cycle. By reusing threads for multiple tasks, the overhead of thread creation is spread over multiple tasks. Java7 also provides a framework for parallel execution of tasks Fork/Join, which is a framework for dividing large tasks into several smaller tasks, and finally summarizing the results of each small task to obtain the results of large tasks. Before introducing the Fork/Join framework, let’s start with a few concepts: CPU intensive, IO intensive, and then dive deeper into the Fork/Join framework.

Task nature type

CPU bound

CPU intensive, also called computational intensive, means that the performance of the hard disk and memory of the system is much better than that of the CPU. In this case, the system is in a state where the CPU is Loading 100%, and the I/O can be completed in a very short time by the CPU, while the CPU has a lot of operations to process. CPU Loading is high.

On a multiprogramming system, a program that spends most of its time doing CPU actions, such as calculations and logical decisions, is called CPU bound. For example, a program that calculates PI to less than one thousand decimal places and spends most of its time using trigonometry and square roots is CPU Bound’s program.

Cpu-bound programs tend to use a lot of CPU. This may be because the task itself does not require much access to the I/O device, or it may be because the program is multithreaded and thus shielded from waiting for I/O.

The number of threads is set to: Number of threads = number of CPU cores + 1 (Modern cpus support hyperthreading)

I/O bound

I/O intensive means that the CPU performance of the system is much better than that of hard disks and memory. In this case, the CPU is waiting for I/ OS (hard disks or memory) to read or write, and the CPU Loading is not high.

I/O Bound’s programs tend to hit their performance limits and still have low CPU usage. This is probably because the task itself requires a lot of I/O, and pipeline didn’t do a very good job of utilizing processor power.

Number of threads: Number of threads = ((Thread waiting time + thread CPU time)/Thread CPU time) x Number of cpus

CPU intensive VS I/O intensive

We can classify tasks as computationally intensive and I/O intensive.

Computation-intensive tasks, such as calculating PI or decoding video in high definition, require a lot of computation and consume CPU resources. This type of computationally intensive task can also be accomplished with multi-task, but the more tasks, the more time spent switching between tasks, and the less efficient the CPU performs the task. Therefore, for the most efficient utilization of the CPU, the number of computationally intensive tasks concurrently should be equal to the number of CPU cores.

Since computationally intensive tasks consume CPU resources, code execution efficiency is critical. Scripting languages such as Python are inefficient and completely unsuitable for computationally intensive tasks. For computationally intensive tasks, it is best to write in C.

The second type of task is I/O intensive. Tasks involving network and disk I/O are I/O intensive tasks. These tasks consume little CPU and spend most of their time waiting for I/O operations to complete (because THE I/O speed is much lower than the CPU and memory speed). For I/O intensive tasks, the more tasks, the more EFFICIENT the CPU, but there is a limit. Most common tasks are I/O intensive tasks, such as Web applications.

I/ O-intensive tasks spend 99% of their time on I/O, with very little CPU time, so replacing a very slow scripting language like Python with a very fast C language is not going to improve performance at all. For I/O intensive tasks, the most appropriate language is the one that is most efficient (with the least amount of code), scripting is preferred, and C is the worst.

What is the Fork/Join framework?

Fork/Join framework is a framework provided by Java7 for parallel execution of tasks. It is a framework that divides large tasks into several small tasks and finally summarizes the results of each small task to obtain the results of large tasks.

ForkIt is to divide a large task into several sub-tasks for parallel execution. Join is to merge the execution results of these sub-tasks and finally obtain the result of the large task. For example, 1+2+…… +10000, can be divided into 10 subtasks, each subtask to sum 1000 numbers, and finally summarize the results of the 10 subtasks. As shown below: The characteristics of Fork/Join:

  1. ForkJoinPool is not intended as a replacement for ExecutorService, but rather as a complement to it, providing better performance than ExecutorService in certain application scenarios. (See Java Tip: When to Use ForkJoinPool vs ExecutorService)
  2. ForkJoinPool is mainly used to implement divide-and-conquer algorithms, especially divide-and-conquer recursively called functions such as Quick Sort.
  3. ForkJoinPool is best suited for computations-intensive tasks, such as I/O, interthread synchronization, sleep(), and MangedBlocker.

For divide-and-conquer algorithms, see The Implementation and features of Divide-and-conquer backtracking

Job stealing algorithm

A work-stealing algorithm is a thread stealing tasks from other queues to execute.

We need to do A big task, we can put this task division for A number of mutually dependent child tasks, in order to reduce the competition between threads, then put these subtasks are different in the queue, and create A separate thread for each queue to perform the tasks in the queue, thread and queue one-to-one correspondence, such as A thread to handle A task in the queue.

However, some threads finish tasks in their queue first, while others have tasks in their queue. Instead of waiting, a finished thread can help another thread, so it steals a task from another thread’s queue to execute. In this case, they access the same queue, so it is usually used to reduce contention between the stolen and stolen task threadsdeque.The task thread is stolenAlways from a double-endian queueThe headTo carry out the mission, andSteal the thread of the taskAlways from a double-endian queueThe tailTake the task to execute.

The advantage of the job-stealing algorithm is thatIt makes full use of threads for parallel computation and reduces the contention between threads, the disadvantage is that there are still some cases of contention, such as when there is only one task in a two-end queue. It also consumes more system resources, such as creating multiple threads and multiple double-ended queues.

  1. Every worker thread in a ForkJoinPool maintains a WorkQueue. This is a two-ended Deque that stores forkJoinTasks.
  2. Each worker thread puts a new task at the end of the work queue when it runs (usually because of a call to fork()), and the worker thread processes its own work queue using LIFO, which means it pulls a task from the end of the queue each time it executes it.
  3. Each worker thread in dealing with their own work queue at the same time, will try to steal a task (or from just submit to the task of the pool, or work from other threads queue), the task of stealing the work queue in other threads team first, this means the worker thread in stealing other worker threads, FIFO is used.
  4. When join() is encountered, if the task that requires the join has not yet completed, the other tasks are processed first and wait for them to complete.
  5. Go to sleep when you have neither your own mission nor one to steal.

The use of Fork/Join

Application Scenario Example

Define a fork/join task, as shown in the following example, that randomly generates 2000W pieces of data into an array and then sums _

package com.niuh.forkjoin.recursivetask;

import java.util.concurrent.RecursiveTask;

ForkJoin can use recursiveTasks such as Fibonacci sequences. However, the defects of the RecursiveTask are as follows: * A single thread is used for recursiveTasks, * the stack overflows when recursing too many times. ForkJoin addresses both of these problems by using multiple threads for concurrent processing to maximize computational resources for efficiency while avoiding stack overflows. Of course, small problems like Fibonacci sequences are easier to solve with linear algorithms, and ForkJoin is not necessary in practice. ForkJoin is a nuclear bomb for large things, such as sorting large arrays. * Best application scenarios: multi-core, multi-memory, computation-intensive tasks that can be split and then merged */
class LongSum extends RecursiveTask<Long> {
    // The minimum threshold for task splitting
    static final int SEQUENTIAL_THRESHOLD = 1000;
    static final long NPS = (1000L * 1000 * 1000);
    static final boolean extraWork = true; // change to add more than just a sum


    int low;
    int high;
    int[] array;

    LongSum(int[] arr, int lo, int hi) {
        array = arr;
        low = lo;
        high = hi;
    }

    /** * fork() : queue tasks and schedule asynchronous execution. A task should only call fork() once, unless it has been executed and reinitialized. * tryUnfork() method: Attempts to take the task out of the queue for separate processing, but may not succeed. * Join () method: waits for the calculation to complete and returns the result. * isCompletedAbnormally() method: used to determine if a task calculation has been abnormally recorded. * /
    protected Long compute(a) {

        if (high - low <= SEQUENTIAL_THRESHOLD) {
            long sum = 0;
            for (int i = low; i < high; ++i) {
                sum += array[i];
            }
            return sum;

        } else {
            int mid = low + (high - low) / 2;
            LongSum left = new LongSum(array, low, mid);
            LongSum right = new LongSum(array, mid, high);
            left.fork();
            right.fork();
            long rightAns = right.join();
            long leftAns = left.join();
            returnleftAns + rightAns; }}}Copy the code

Perform the fork/join task

package com.niuh.forkjoin.recursivetask;

import com.niuh.forkjoin.utils.Utils;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;

public class LongSumMain {
    // Get the number of logical processors
    static final int NCPU = Runtime.getRuntime().availableProcessors();
    /** * for time conversion */
    static final long NPS = (1000L * 1000 * 1000);

    static long calcSum;

    static final boolean reportSteals = true;

    public static void main(String[] args) throws Exception {
        int[] array = Utils.buildRandomIntArray(2000000);
        System.out.println("cpu-num:" + NCPU);
        // Calculate the sum of array data in single thread
        long start = System.currentTimeMillis();
        calcSum = seqSum(array);
        System.out.println("seq sum=" + calcSum);
        System.out.println("singgle thread sort:->" + (System.currentTimeMillis() - start));

        start = System.currentTimeMillis();
        // Use fork/join to split the array summation task and merge the result
        LongSum ls = new LongSum(array, 0, array.length);
        ForkJoinPool fjp = new ForkJoinPool(NCPU); // The number of threads used
        ForkJoinTask<Long> task = fjp.submit(ls);

        System.out.println("forkjoin sum=" + task.get());
        System.out.println("singgle thread sort:->" + (System.currentTimeMillis() - start));
        if (task.isCompletedAbnormally()) {
            System.out.println(task.getException());
        }

        fjp.shutdown();

    }


    static long seqSum(int[] array) {
        long sum = 0;
        for (int i = 0; i < array.length; ++i) {
            sum += array[i];
        }
        returnsum; }}Copy the code

Fork/Join framework principles

Fork/JoinForkJoinPool as a thread pool, ForkJoinTask(which generally implements three abstract subclasses) as a task, and ForkJoinWorkerThread as a concrete thread entity that executes the task.

ForkJoinWorkerThread

ForkJoinWorkerThread Thread inherits directly, but only to add some additional functionality, without making any changes to the Thread’s scheduled execution. ForkJoinWorkerThreadForkJoinPool worker threads are created by ForkJoinPool and are set to becomeDaemon threadForkJoinTasks are executed by it. The main purpose of this class is to maintain the work queue created by ForkJoinPool when thread instances are created. Unlike the other two thread pools, where there is only one work queue, all worker threads managed by ForkJoinPool have their own work queue, which is designed as a double-ended queue for task stealing. The first task of a ForkJoinWorkerThread is to execute its own double-ended queue, and the second task is to steal the work queue of another thread. The following is a snippet:

public class ForkJoinWorkerThread extends Thread {
	// This thread works in a ForkJoinPool
    final ForkJoinPool pool;    
    // This thread owns the work queue of the work stealing mechanism
    final ForkJoinPool.WorkQueue workQueue; 

    // Create a ForkJoinWorkerThread in the given ForkJoinPool.
    protected ForkJoinWorkerThread(ForkJoinPool pool) {
        // Use a placeholder until a useful name can be set in registerWorker
        super("aForkJoinWorkerThread");
        this.pool = pool;
        ForkJoinPool registers the current worker thread with the ForkJoinPool execution pool and allocates a work queue to it
        this.workQueue = pool.registerWorker(this); 
    }

    // The execution of this worker thread is to execute the tasks in the work queue
    public void run(a) {
        if (workQueue.array == null) { // only run once
            Throwable exception = null;
            try {
                onStart();
                pool.runWorker(workQueue); // Execute tasks in the work queue
            } catch (Throwable ex) {
                exception = ex; // Record an exception
            } finally {
                try {
                    onTermination(exception);
                } catch (Throwable ex) {
                    if (exception == null)
                        exception = ex;
                } finally {
                    pool.deregisterWorker(this, exception); // Undo work
                }
            }
        }
    }

    .....
}
Copy the code

ForkJoinTask

ForkJoinTaskLike FutureTask, ForkJoinTask is a subclass of Future, but it is an abstract class. ForkJoinTaskTo use the ForkJoin framework, we must first create a ForkJoin task. It provides execution in tasksfork()join()ForkJoinTask does not inherit directly from ForkJoinTask, but from subclasses. The Fork/Join framework provides the following subclasses:

  • RecursiveAction: Used for tasks that do not return results. (Such as writing data to disk and then exiting. A RecursiveAvtion can break up direct work into smaller pieces so that they can be executed by separate threads or cpus. We can implement a RecusiveAction by inheritance.
  • RescursiveTask: For tasks that return results. You can split your work into smaller tasks and combine the execution of these subtasks into a collective result. There can be several levels of segmentation and merging)
  • CountedCompleter: Triggers the execution of a custom hook function after the task completes execution.

Constant introduction

ForkJoinTask has a status field of type INT:

  • Its high 16 bits store task execution states such as NORMAL, CANCELLED, or EXCEPTIONAL
  • The lower 16 bits are reserved for user-defined tags.

‘NORMAL’ > ‘NORMAL’ > ‘EXCEPTIONAL’ after ‘status’ >’ NORMAL ‘>’ EXCEPTIONAL ‘after’ NORMAL ‘>’ EXCEPTIONAL ‘.

public abstract class ForkJoinTask<V> implements Future<V>, Serializable {

    /** Execution status of the task */
    volatile int status; // accessed directly by pool and workers
    static final int DONE_MASK   = 0xf0000000;  // mask out non-completion bits
    static final int NORMAL      = 0xf0000000;  // must be negative
    static final int CANCELLED   = 0xc0000000;  // must be < NORMAL
    static final int EXCEPTIONAL = 0x80000000;  // must be < CANCELLED
    static final int SIGNAL      = 0x00010000;  // must be >= 1 << 16
    static final int SMASK       = 0x0000ffff;  // short bits for tags

    // Exception hash table

    // An array of exceptions thrown by the task for reporting to the caller. Because exceptions are rare, we don't store them directly in task objects, but instead use weak reference arrays. Note that the cancellation exception does not appear in the array, but is recorded in the statue field
    // Note that these are static class attributes shared by all ForkJoinTasks.
    private static final ExceptionNode[] exceptionTable;        // Exception hash list array
    private static final ReentrantLock exceptionTableLock;
    private static final ReferenceQueue<Object> exceptionTableRefQueue; // The reference queue for the abnormal node object after the ForkJoinTask is collected by the GC

    /** * Fixed capacity exceptionTable. */
    private static final int EXCEPTION_MAP_CAPACITY = 32;


    // Exception array key-value pair node.
    // The hash linked list array is compared using thread ids. The array has a fixed capacity because it only maintains task exceptions long enough for participants to access them, so it should not become very large for the duration. However, since we don't know when the last Joiner will be finished, we must use weak references and remove them. We do this for each operation (and therefore lock it completely). In addition, any ForkJoinPool in the pool will be called when some threads in the pool into isQuiescent helpExpungeStaleExceptions
    static final class ExceptionNode extends WeakReference<ForkJoinTask<? >>{
        final Throwable ex;
        ExceptionNode next;
        final long thrower;  // The id of the thread that threw the exception
        final int hashCode;  // Store hashCode until weak references disappearExceptionNode(ForkJoinTask<? > task, Throwable ex, ExceptionNode next) {super(task, exceptionTableRefQueue); ExceptionTableRefQueue After a ForkJoinTask is collected by the GC, the node is added to the exceptionTableRefQueue
            this.ex = ex;
            this.next = next;
            this.thrower = Thread.currentThread().getId();
            this.hashCode = System.identityHashCode(task); }}... }Copy the code

In addition to status, which records the execution status of the task, other fields are mainly used to handle the exceptions of the task execution. ForkJoinTask uses a hash array + linked list data structure (the previous implementation of HashMap in JDK8) to store all exceptions to the execution of ForkJoinTask tasks (because these fields are static).

Fork method (schedule tasks to be executed asynchronously)

The only thing fork() does is push the task into the work queue of the current worker thread (scheduling the task to execute asynchronously). See the source code below:

public final ForkJoinTask<V> fork(a) {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        ForkJoinPool.common.externalPush(this);
    return this;
}
Copy the code

This method is a non-blocking immediate return method that pushes a task to the work queue or submit queue of the current worker thread (submitted by an external non-ForkJoinWorkerThread thread via submit or execute) and waits to be executed by the thread pool.

It is important to note that the ForkJoinPool thread pool maps all worker thread owned queues and externally submitted tasks to different slots in the hash array using a hash array + a double-ended queue.

Join method (waiting for execution results)

Join () works much more complicated, which is why join() keeps threads from blocking — unlike thread.join ().

  1. Check whether the thread calling JOIN () is a ForkJoinThread. If it is not (for example, the main thread), it blocks the current thread and waits for the task to complete. If so, it is not blocked.
  2. View the task completion status. If the task has been completed, the system returns the result.
  3. If the task is not complete but is in its own work queue, complete it.
  4. If the task has been stolen by another worker thread, steal the task from the thief’s work queue (withFIFOTo help it complete the task it wants to join sooner.
  5. If the thief who stole the task has finished all his tasks and is waiting for the task that needs to join, he will find the thief who stole the task and help it to complete its task.
  6. Perform Step 5 recursively.

A sequence diagram of the above process would look like this:

Source code is as follows:

// Return the result when the calculation is complete. This method differs from GET () in that exception completion causes a RuntimeException or Error instead of an ExecutionException, and the calling thread is interrupted without causing the method to suddenly return by throwing InterruptedException.
public final V join(a) {
    int s;
    if((s = doJoin() & DONE_MASK) ! = NORMAL) reportException(s);// Abnormal end, throw the relevant exception stack information
    return getRawResult(); // The result is returned
}

// Wait for the task to complete and return its status status. This method implements JOIN, GET, quietlyJoin. Cases of completed external waits and unfork+exec are handled directly, and other cases are forwarded to ForkJoinPool.awaitJoin
// If status < 0, return s;
If the ForkJoinWorkerThread is not ForkJoinWorkerThread, wait for externalAwaitDone() to return
// If (w = (wt = (ForkJoinWorkerThread)t).workQueue).tryunpush (this) && (s = doExec()) < 0 return s;
// Otherwise, return wt.pool.awaitJoin(w, this, 0L)
private int doJoin(a) {
    int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
    return (s = status) < 0 ? s :  // A negative value of status indicates that the task is completed.
        ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
        (w = (wt = (ForkJoinWorkerThread)t).workQueue).
        tryUnpush(this) && (s = doExec()) < 0 ? s :        // Call the execution logic of the pool and wait for the execution result status to return
        wt.pool.awaitJoin(w, this.0L) :        // Call the pool wait mechanism
        externalAwaitDone();        // Instead of ForkJoinWorkerThread,
}

// Throws an exception (if any) associated with the given state, which is a CancellationException.
private void reportException(int s) {
    if (s == CANCELLED)
        throw new CancellationException();
    if (s == EXCEPTIONAL)
        rethrow(getThrowableException());
}

public abstract V getRawResult(a);

// Returns an execution exception (if any) for a given task. To provide accurate exception stack information, if the exception was not thrown by the current thread, an attempt will be made to create a new exception of the same type as the one thrown because of the logged exception.
// If there is no such constructor, try using the no-argument constructor and achieve the same effect by setting the initCause method, even though it may contain misleading stack trace information.
private Throwable getThrowableException(a) {
    if((status & DONE_MASK) ! = EXCEPTIONAL)return null;

    //1. Use the hash value of the current task object to find the corresponding abnormal node in the hash list array
    int h = System.identityHashCode(this); // Hash value of the current task
    ExceptionNode e;
    final ReentrantLock lock = exceptionTableLock;
    lock.lock(); / / lock
    try {
        expungeStaleExceptions(); // Clean up the exception node of the task collected by GC
        ExceptionNode[] t = exceptionTable;
        e = t[h & (t.length - 1)]; // Get the node in the slot of the hash array by modulating the desired index
        while(e ! =null&& e.get() ! =this)
            e = e.next;        // Traverse to find the abnormal node corresponding to the current task
    } finally {
        lock.unlock();
    }
    Throwable ex;
    if (e == null || (ex = e.ex) == null) // Indicates that no exception occurs
        return null;
    if(e.thrower ! = Thread.currentThread().getId()) {// There is an exception but it is not thrown by the current thread
        Class<? extends Throwable> ec = ex.getClass();
        try{ Constructor<? > noArgCtor =null; Constructor<? >[] cs = ec.getConstructors();// public ctors only
            // Find the constructor by reflection and construct a new exception
            for (int i = 0; i < cs.length; ++i) { Constructor<? > c = cs[i]; Class<? >[] ps = c.getParameterTypes();if (ps.length == 0)
                    noArgCtor = c; // Record the no-argument constructor in case the desired constructor is not found
                else if (ps.length == 1 && ps[0] == Throwable.class) {
                    Throwable wx = (Throwable)c.newInstance(ex); // Found the expected constructor for Throwable type arguments
                    return (wx == null)? ex : wx; }}if(noArgCtor ! =null) { // The expected constructor was not found and a new exception was created using the no-argument constructor
                Throwable wx = (Throwable)(noArgCtor.newInstance());
                if(wx ! =null) {
                    wx.initCause(ex); // Set the original exception
                    returnwx; }}}catch (Exception ignore) {
        }
    }
    return ex;
}



// Clear the exception node in the hash list array for tasks that have been reclaimed by the GC. Get the exception node from the exceptionTableRefQueue node reference queue and remove the corresponding node from the hash linked list array
private static void expungeStaleExceptions(a) {
    for(Object x; (x = exceptionTableRefQueue.poll()) ! =null;) {
        if (x instanceof ExceptionNode) {
            int hashCode = ((ExceptionNode)x).hashCode; / / the node hash
            ExceptionNode[] t = exceptionTable;
            int i = hashCode & (t.length - 1); // get the index of the hash table
            ExceptionNode e = t[i];
            ExceptionNode pred = null;
            while(e ! =null) {
                ExceptionNode next = e.next;
                if (e == x) { // The target node is found
                    if (pred == null)
                        t[i] = next;
                    else
                        pred.next = next;
                    break;
                }
                pred = e; // Iterate through the liste = next; }}}}// Steal the main execution method of the task, call exec() and record the state when it is done unless it has already done so.
final int doExec(a) {
    int s; boolean completed;
    if ((s = status) >= 0) { // The task is not completed
        try{ completed = exec(); Call exec() and record the state when it completes. }catch (Throwable rex) {
            return setExceptionalCompletion(rex); // Log the exception and return the status, and wake up the threads waiting for this task via join.
        }
        if (completed)
            s = setCompletion(NORMAL); // Update the status to normal end and wake up threads waiting for this task via JOIN.
    }
    return s;
}

// Perform the basic operations of this task immediately. Return true to indicate that the task completed normally, otherwise false to indicate that the task may or may not complete.
// This method may also throw (uncaught) exceptions to indicate that the exception exits. This method is intended to support extension and should generally not be called otherwise.
protected abstract boolean exec(a);

// Wait for an unfinished task submitted by a non-ForkJoinWorkerThread to complete and return to task status
private int externalAwaitDone(a) {

    / / if CountedCompleter task, mon. Wait for ForkJoinPool.com externalHelpComplete ((CountedCompleter 
      ) this, 0) returns
    / / otherwise, if ForkJoinPool.com mon. TryExternalUnpush (this), return doExec () results;
    // Otherwise, 0 is returned
    int s = ((this instanceof CountedCompleter) ? // try helping
             ForkJoinPool.common.externalHelpComplete(
                 (CountedCompleter<?>)this.0) :                             // Assist with the externally submitted CountedCompleter task
             ForkJoinPool.common.tryExternalUnpush(this)? doExec() :0);    // Assist with externally submitted non-countedCompleter tasks
    if (s >= 0 && (s = status) >= 0) { // Indicates that the task is not finished and needs to be blocked.
        boolean interrupted = false;
        do {
            if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { // flag that a thread needs to be woken up
                synchronized (this) {
                    if (status >= 0) {
                        try {
                            wait(0L); // Task is not finished, block indefinitely until awakened
                        } catch (InterruptedException ie) {
                            interrupted = true; }}else
                        notifyAll(); Wake up all blocked threads}}}while ((s = status) >= 0);
        if (interrupted)
            Thread.currentThread().interrupt(); // Restore the interrupt flag
    }
    return s;
}


// Log exceptions, update status, and wake up all waiting threads
private int setExceptionalCompletion(Throwable ex) {
    int s = recordExceptionalCompletion(ex);
    if ((s & DONE_MASK) == EXCEPTIONAL)
        internalPropagateException(ex); // Call the hook function to propagate the exception
    return s;
}

/** * Supported hook function */ for exception propagation of task exception end
void internalPropagateException(Throwable ex) {}// Record the exception and set the status status
final int recordExceptionalCompletion(Throwable ex) {
    int s;
    if ((s = status) >= 0) {
        int h = System.identityHashCode(this); / / hash value
        final ReentrantLock lock = exceptionTableLock;
        lock.lock();    / / lock
        try {
            expungeStaleExceptions();
            ExceptionNode[] t = exceptionTable;
            int i = h & (t.length - 1);
            for (ExceptionNode e = t[i]; ; e = e.next) {
                if (e == null) { // There is no exception node in the hash list array for this task
                    t[i] = new ExceptionNode(this, ex, t[i]); // Create an exception node to insert the hash list array using the header method
                    break;
                }
                if (e.get() == this) // The hash list array already has corresponding abnormal node, exit
                    break; }}finally {
            lock.unlock();
        }
        s = setCompletion(EXCEPTIONAL);
    }
    return s;
}

// Marks the completion of the task and wakes up the thread waiting for the task via join.
private int setCompletion(int completion) {
    for (int s;;) {
        if ((s = status) < 0)
            return s;
        if (U.compareAndSwapInt(this, STATUS, s, s | completion)) { // Update the status
            if ((s >>> 16) != 0)
                synchronized (this) { notifyAll(); } // Wake up all waiting threads
            returncompletion; }}}Copy the code

Get method (get asynchronous task results)

Since ForkJoinTask is a subclass of Future, the most important get method for the Future to get the results of asynchronous tasks must also be implemented:

// If necessary, wait for the calculation to complete and then retrieve its results.
public final V get(a) throws InterruptedException, ExecutionException {
    int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ? doJoin() : // If ForkJoinWorkerThread is used, execute doJoin
            externalInterruptibleAwaitDone();    / / externalInterruptibleAwaitDone execution
    Throwable ex;
    if ((s &= DONE_MASK) == CANCELLED)
        throw new CancellationException();    // A CancellationException is thrown
    if(s == EXCEPTIONAL && (ex = getThrowableException()) ! =null)
        throw new ExecutionException(ex);    // Throw an exception if an exception occurs during execution
    return getRawResult();                    // Returns a normal result
}

// Blocks a non-forkJoinWorkerThread until it finishes or is interrupted.
private int externalInterruptibleAwaitDone(a) throws InterruptedException {
    int s;
    if (Thread.interrupted())
        throw new InterruptedException();
    if ((s = status) >= 0 &&
        (s = ((this instanceof CountedCompleter) ?
              ForkJoinPool.common.externalHelpComplete(
                  (CountedCompleter<?>)this.0) :
              ForkJoinPool.common.tryExternalUnpush(this)? doExec() :0> =))0) { // Return the status of executing or waiting to be executed according to the task type
        while ((s = status) >= 0) { // Need to block wait
            if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
                synchronized (this) {
                    if (status >= 0)
                        wait(0L);     // block wait
                    else
                        notifyAll(); // Wake up all waiting threads}}}}return s;
}
Copy the code

The GET method is implemented using the doJoin method that implements the JOIN method. However, the GET method throws InterruptedException if the thread calling the GET method is interrupted, while the JOIN method does not. In addition, the get method will wrap the exceptions as ExecutionExceptions, while the JOIN method will throw the exceptions as is and will not wrap the exceptions as ExecutionExceptions. The wait/notifyAll thread communication mechanism is used to block and wake up the GET method. Similarly, the timeout version of GET supports interruptible and/or timed wait completion.

Invoke method (executes the task immediately and waits for the result to return)

// Start the task and, if you need to wait for it to complete and return its results, throw a corresponding (uncaught)RuntimeException or Error if there is an exception in the underlying execution of the task.
public final V invoke(a) {
    int s;
    if((s = doInvoke() & DONE_MASK) ! = NORMAL) reportException(s);return getRawResult();
}

// Invoke, implementation of quietlyInvoke
private int doInvoke(a) {
    int s; Thread t; ForkJoinWorkerThread wt;
    return (s = doExec()) < 0 ? s :      // Perform this task and return its status when complete
        ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? // Execute different wait logic according to different task types if the task is not completed or needs to wait
        (wt = (ForkJoinWorkerThread)t).pool.
        awaitJoin(wt.workQueue, this.0L) :
        externalAwaitDone();
}
Copy the code

The Invoke implementation uses the invoke thread to execute the exec method immediately. If the invoke thread is fork/join, the implementation uses the recursive execution strategy of the ForkJoinPool thread pool to wait for the subtasks to complete, merging them step by step into the final result of the task. And return. It is important to note that this method does not return immediately if the thread is interrupted and must not compensate for the interrupted state until there is a result of the task execution.

The invokeAll method (batch tasks and wait for them to finish)

// Perform two tasks
public static void invokeAll(ForkJoinTask
        t1, ForkJoinTask
        t2) {
    int s1, s2;
    t2.fork(); // The t2 task is assigned to the thread pool for execution
    if((s1 = t1.doInvoke() & DONE_MASK) ! = NORMAL)// The t1 task is executed immediately by the current thread
        t1.reportException(s1);         // If t1 ends with an exception, exceptions are thrown, including CancellationException
    if((s2 = t2.doJoin() & DONE_MASK) ! = NORMAL)// Wait for t2 to finish
        t2.reportException(s2);            // If t2 ends, an exception is thrown, including CancellationException
}

// Execute the task array
public static void invokeAll(ForkJoinTask
       ... tasks) {
    Throwable ex = null;
    int last = tasks.length - 1;
    for (int i = last; i >= 0; --i) { ForkJoinTask<? > t = tasks[i];if (t == null) {
            if (ex == null) // Both cannot be null
                ex = new NullPointerException();
        }
        else if(i ! =0)
            t.fork(); // All tasks are assigned to the thread pool except the first one
        else if (t.doInvoke() < NORMAL && ex == null) // The current thread executes the first task
            ex = t.getException();  // Record the exception of the first task
    }
    for (int i = 1; i <= last; ++i) { ForkJoinTask<? > t = tasks[i];if(t ! =null) {
            if(ex ! =null) // The first task ends abnormally. Cancel all other tasks
                t.cancel(false);
            else if (t.doJoin() < NORMAL) // A task ended unexpectedlyex = t.getException(); }}if(ex ! =null)
        rethrow(ex);  // If a task ends abnormally, throw an exception for the task that ends abnormally at the top of the array
}

// To execute tasks in batches, return a ForkJoinTask instance for each task.
public static<T extends ForkJoinTask<? >>Collection<T> invokeAll(Collection<T> tasks) {
    if(! (tasksinstanceofRandomAccess) || ! (tasksinstanceofList<? >)) { invokeAll(tasks.toArray(newForkJoinTask<? >[tasks.size()]));// Encapsulate the task into a ForkJoinTask and call the above method
        return tasks;
    }
    // The logic below is the same as invokeAll above.
    @SuppressWarnings("unchecked")List<? extends ForkJoinTask<? >> ts = (List<? extends ForkJoinTask<? >>) tasks; Throwable ex =null;
    int last = ts.size() - 1;
    for (int i = last; i >= 0; --i) { ForkJoinTask<? > t = ts.get(i);if (t == null) {
            if (ex == null)
                ex = new NullPointerException();
        }
        else if(i ! =0)
            t.fork();
        else if (t.doInvoke() < NORMAL && ex == null)
            ex = t.getException();
    }
    for (int i = 1; i <= last; ++i) { ForkJoinTask<? > t = ts.get(i);if(t ! =null) {
            if(ex ! =null)
                t.cancel(false);
            else if(t.doJoin() < NORMAL) ex = t.getException(); }}if(ex ! =null)
        rethrow(ex);
    return tasks;
}
Copy the code

Batch task executing its implementation is at the top task (only two parameters, the first parameter is the row in front of the task, is an array or a queue, the smaller the index is ranked ryoma below) performed by the current thread, the back of the task to the thread pool scheduler, if there are multiple tasks are abnormal, will only throw exceptions in the front of the mission.

QuietlyInvoke, quietlyJoin methods (invoke and JOIN without execution result)

public final void quietlyJoin(a) {
	doJoin();
}

public final void quietlyInvoke(a) {
	doInvoke();
}
Copy the code

QuietlyInvoke () and quietlyJoin() simply call doInvoke and doJoin, and then nothing else. They simply do not care about executing the resulting version of invoke and Join. This can be useful when performing a set of tasks and you need to defer processing of results or exceptions until all tasks are complete.

Cancel method (attempts to cancel the execution of the task)

public boolean cancel(boolean mayInterruptIfRunning) {
    return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED;
}
Copy the code

It mainly uses setCompletion to mark pending tasks as CANCELLED and wake up threads waiting for the task via join. A completed task cannot be cancelled. If true is returned, the task is cancelled successfully. Note that the mayInterruptIfRunning passed in by this method is not used, so ForkJoinTask does not support interrupting an already started task when canceling the task. However, subclasses of ForkJoinTask can override the implementation.

TryUnfork method (unfork, i.e. remove the task from the task queue)

// Cancel the task execution plan. If the task has only recently been forked by the current thread and has not yet been started in another thread, this method will usually succeed, but it is not 100% guaranteed.
public boolean tryUnfork(a) {
    Thread t;
    return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            ((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) :    // Cancel logic for ForkJoinWorkerThread
            ForkJoinPool.common.tryExternalUnpush(this));            // Cancel logic for external submitted tasks
}
Copy the code

TryUnfork attempts to eject the task from the task queue, after which the thread pool will no longer schedule the task. The implementation of this method will only succeed if the task is just pushed into the task queue and is still at the top of the task queue stack, otherwise it will fail 100%.

Reinitialize method (reinitialize the task)

public void reinitialize(a) {
    if ((status & DONE_MASK) == EXCEPTIONAL) / / there are exceptions
        clearExceptionalCompletion(); // Remove the exception node of the current task from the hash list array and reset status to 0
    else
        status = 0;
}
Copy the code

If a task ends abnormally, the exception record for the task is cleared from the exception hash table. This method simply resets the status of the task to 0 so that the task can be re-executed.

Methods isDone, isCompletedNormally, isCancelled, isCompletedAbnormally (task completion state query)

The execution status of a task can be queried at multiple levels:

  1. IsDone is true if the task is completed in any way (including if the task is canceled without execution).
  2. IsCompletedNormally is true if the task is completed without cancellation or without encountering an exception.
  3. If the task isCancelled (in which case the getException method returns a CancellationException), isCancelled is true.
  4. If the task was cancelled or exception encountered isCompletedAbnormally anomaly is true, in this case, the getException returns with abnormal or Java. Util. Concurrent. CancellationException.

ForkJoinTask may occasionally raise exceptions when they are being executed, but there is no way to catch exceptions when they are being recorded directly on the main thread, so ForkJoinTask provides an isCompletedAbnormally() method to check if a task has abnormally encountered or been canceled. An exception can be obtained through the getException method of ForkJoinTask. The following is an example:

if(task.isCompletedAbnormally()){
   System.out.println(task.getException());
}
Copy the code

The getException method returns the Throwable object or CancellationException if the task was canceled. Returns NULL if the task did not complete or no exception was thrown.

Adapt methods for Runnable and Callable

The ADAPT approach is designed to be compatible with traditional Runnable and Callable tasks by encapsulating them into ForkJoinTask tasks, which can be used when executing forkJoinTasks in combination with other types of tasks.

Some other methods

GetPool returns the thread pool instance of the executing thread. InForkJonPool determines whether a task has been submitted by a ForkJoinWorkerThread thread, which generally means that the task is a subtask of an internal split.

The getQueuedTaskCount method returns the number of tasks that have been forked for the current worker thread, but have not yet been executed, which is a transient value. Because the task forked by the worker thread is still in the worker thread’s queue, this value is known from the task.

Some other methods:

// The task may be executed while the execution pool hosting the current task is silent (idle). This method may be used in designs where many tasks are scheduled to be executed via fork, but no join calls are displayed until they are all executed.
HelpQuiesce is used when you have a batch of tasks scheduled for execution and don't know when they will end. If you want to schedule another task after all the tasks have finished, you can use helpQuiesce.
public static void helpQuiesce(a) {
    Thread t;
    // Call different silent execution logic depending on the type of execution thread
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
        ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
        wt.pool.helpQuiescePool(wt.workQueue);
    }
    else
        ForkJoinPool.quiesceCommonPool();
}

// Returns an estimate of how many more tasks a is held by the current worker thread than b is held by other worker threads that might steal its work, the difference between a and b. If the current worker thread is not in ForkJoinPool, 0 is returned
// Usually this value is kept constant at a very small value of 3. If this threshold is exceeded, it is processed locally.
public static int getSurplusQueuedTaskCount(a) {
    return ForkJoinPool.getSurplusQueuedTaskCount();
}

// Gets but does not remove (that is, does not cancel the execution schedule) the next task assigned to the current thread that may be about to be executed. There is no guarantee that the task will actually be executed immediately afterwards. This method may return NULL even if the task exists but is inaccessible because of a race
// This method is primarily intended to support extensions that might not otherwise be used.
protected staticForkJoinTask<? > peekNextLocalTask() { Thread t; ForkJoinPool.WorkQueue q;if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        q = ((ForkJoinWorkerThread)t).workQueue;
    else
        q = ForkJoinPool.commonSubmitterQueue();
    return (q == null)?null : q.peek();
}

// Gets and removes (that is, cancels) the next task scheduled for the current thread that is likely to be executed.
// This method is primarily intended to support extensions that might not otherwise be used.
protected staticForkJoinTask<? > pollNextLocalTask() { Thread t;return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
        ((ForkJoinWorkerThread)t).workQueue.nextLocalTask() :
        null;
}

If the current thread is being run by the ForkJoinPool, the ForkJoinPool retrieves and removes (cancels) the next task that the current thread may execute. The task may have been stolen from another thread.
// Returning a NULll does not necessarily mean that the ForkJoinPool being operated by this task is at rest. This method is primarily intended to support extensions that might not otherwise be used.
protected staticForkJoinTask<? > pollTask() { Thread t; ForkJoinWorkerThread wt;return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
        (wt = (ForkJoinWorkerThread)t).pool.nextTaskFor(wt.workQueue) :
        null;
}
Copy the code

summary

ForkJoinTask is usually used only for non-cyclic dependent calculations of pure functions or operations on isolated objects; otherwise, execution may encounter some form of deadlock as tasks wait for each other in a loop. However, the framework supports other methods and techniques (such as using Phaser, helpQuiesce, and Complete) that can be used to construct a ForkJoinTask subclass that resolves this dependent task. To support these usages, Can use setForkJoinTaskTag or compareAndSetForkJoinTaskTag atomically marking a short type of value, and use getForkJoinTaskTag for inspection. The ForkJoinTask implementation does not use these protected methods or tags for any purpose, but they can be used to construct specialized subclasses from which provided methods can be used to avoid revisiting already processed nodes/tasks.

ForkJoinTask should perform relatively few calculations and should avoid uncertain loops. Large tasks should be broken down into smaller subtasks, usually by recursive decomposition. If the task is too large, parallelism will not improve throughput. If it is too small, the memory and internal task maintenance overhead may exceed the processing overhead.

Forkjointasks are serializable, which enables them to be used in extensions such as the Remote Execution Framework. It is wise to serialize tasks only before or after execution, not during execution.

ForkJoinPool

ForkJoinPoolA ForkJoinTask is executed through a ForkJoinPool. Subtasks are added to the two-end queue maintained by the current worker thread and are placed at the head of the queue. When a worker thread has no work in its queue temporarily, it randomly fetches a task from the tail of another worker thread’s queue.

Constant introduction

ForkJoinPool shares some constants with the internal WorkQueue class

// Constants shared across ForkJoinPool and WorkQueue

// specify parameters
static final int SMASK = 0xffff;        // The low-order mask is also the maximum index bit
static final int MAX_CAP = 0x7fff;      // The maximum capacity of the worker thread
static final int EVENMASK = 0xfffe;     // Even low level mask
static final int SQMASK = 0x007e;       // workQueues array with up to 64 slots

// the mask and flag bits of CTL subfields and workqueue.scanstate
static final int SCANNING = 1;          // Flags whether a task is running
static final int INACTIVE = 1 << 31;    // The inactive state is negative
static final int SS_SEQ = 1 << 16;      // Version stamp to prevent ABA problems

// ForkJoinPool.config and workqueue. config configuration information flags
static final int MODE_MASK = 0xffff << 16;  // Mode mask
static final int LIFO_QUEUE = 0; 			/ / LIFO queue
static final int FIFO_QUEUE = 1 << 16;		/ / FIFO queue
static final int SHARED_QUEUE = 1 << 31;    // Shared mode queue, negative ForkJoinPool constants and instance fields:
Copy the code

Related constants and instance fields in ForkJoinPool

// Low and high level masks
private static final long SP_MASK = 0xffffffffL;
private static final long UC_MASK = ~SP_MASK;

// Number of active threads
private static final int AC_SHIFT = 48;
private static final long AC_UNIT = 0x0001L << AC_SHIFT; // Increase the number of active threads
private static final long AC_MASK = 0xffffL << AC_SHIFT; // Active thread count mask

// Number of worker threads
private static final int TC_SHIFT = 32;
private static final long TC_UNIT = 0x0001L << TC_SHIFT; // Increase the number of worker threads
private static final long TC_MASK = 0xffffL << TC_SHIFT; / / mask
private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15);  // Create a worker thread flag

/ / pool
private static final int RSLOCK = 1;
private static final int RSIGNAL = 1 << 1;
private static final int STARTED = 1 << 2;
private static final int STOP = 1 << 29;
private static final int TERMINATED = 1 << 30;
private static final int SHUTDOWN = 1 << 31;

// Instance field
volatile long ctl;                   // Master control parameters
volatile int runState;               // Run status lock
final int config;                    / / | parallel degree model
int indexSeed;                       // Used to generate the worker thread index
volatile WorkQueue[] workQueues;     // Primary object registration information, workQueue
final ForkJoinWorkerThreadFactory factory;// Thread factory
final UncaughtExceptionHandler ueh;  // Exception information for each worker thread
final String workerNamePrefix;       // The name used to create the worker thread
volatile AtomicLong stealCounter;    // Steal the total number of tasks, also used as a synchronization monitor

/** Static initialization field */
// Thread factory
public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory;
// The permission to start or kill the thread's method caller
private static final RuntimePermission modifyThreadPermission;
// Public static pool
static final ForkJoinPool common;
// Degree of parallelism, corresponding to the internal common pool
static final int commonParallelism;
// The number of spare threads to use in tryCompensate
private static int commonMaxSpares;
// The serial number when the workerNamePrefix(worker thread name prefix) is created
private static int poolNumberSequence;
// The timeout value (in nanoseconds) that the thread blocks to wait for a new task. The default is 2 seconds
private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec
// Indicates the idle timeout period to prevent a timer from missing
private static final long TIMEOUT_SLOP = 20L * 1000L * 1000L;  // 20ms
// Default number of standby threads
private static final int DEFAULT_COMMON_MAX_SPARES = 256;
The number of spins before blocking is used in awaitRunStateLock and awaitWork
private static final int SPINS  = 0;
/ / indexSeed increment
private static final int SEED_INCREMENT = 0x9e3779b9;
Copy the code

ForkJoinPool internal state is stored by a 64-bit long variable CTL, which consists of four 16-bit subfields:

  • AC: Number of running worker threads minus target parallelism, high 16 bits
  • TC: total number of worker threads minus target parallelism, medium and high 16 bits
  • SS: Version count and state of the waiting thread at the top of the stack, low 16 bits
  • ID: poolIndex of the WorkQueue at the top of the stack, 16 bits lower

ForkJoinPool.WorkQueue Related attributes:

// Initial queue capacity, power of 2
static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
// Maximum queue capacity
static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M

// Instance field
volatile int scanState;    // Woker state, <0: inactive; odd:scanning
int stackPred;             // Record the CTL at the top of the previous stack
int nsteals;               // Steal the task number
int hint;                  // Record the stealer index, initially a random index
int config;                // Pool index and schema
volatile int qlock;        // 1: locked, < 0: terminate; else 0
volatile int base;         // Index of the next poll operation (bottom of stack/head of queue)
int top;                   // Index of a push operation (top of stack/bottom of queue)ForkJoinTask<? >[] array;// Task array
final ForkJoinPool pool;   // the containing pool (may be null)
final ForkJoinWorkerThread owner; // The current worker thread of the work queue, null in shared mode
volatile Thread parker;    // Owner during call park blocking, null otherwise
volatileForkJoinTask<? > currentJoin;// Record the tasks that are joined
volatileForkJoinTask<? > currentSteal;// Records tasks stolen from other work queues
Copy the code

Internal data structure

ForkJoinPool uses a hash array and a double-ended queue to store tasks, but there are two types of tasks:

  • One is the external task submitted through execute and submit
  • ForkJoinWorkerThread Is a type of work that a worker thread splits into fork/join tasks

ForkJoinPool does not mix the two tasks together in a single task Queue. For external tasks, the random probe values inside the Thread map to a Submission Queue in the even slots of the hash array. This Submission Queue is a two-ended array implementation called a Submission Queue. A dedicated repository for externally submitted tasks.

For ForkJoinWorkerThread workers, each thread is assigned a Work Queue, a double-ended Queue called the Work Queue, which is mapped to the odd slot of the hash array. Each worker thread fork/join decomposed task is added to its own work queue.

The attributes WorkQueue[] workQueues in ForkJoinPool are what we call hash arrays, whose elements are the array-based, two-ended queues implemented by the internal WorkQueue class. The hash array is a power of two and supports expansion. Here is the schematic structure of the hash array:

As shown, the commit queue is in the odd index slot of the hash array workQueue, and the worker thread’s workQueue is in the even slot.

  • By default, asyncMode is false:
    • Therefore, the worker thread uses the work queue as a stack (last in, first out), pushes the decomposed sub-tasks to the top end of the work queue, and takes the tasks from the top end (usually there are two Pointers to both ends of the queue, which are base and TOP in the figure).
    • When a worker thread is empty, it steals a task from another queue (not only the workQueue, but also the commit queue). For example, a worker thread with workQueue2 steals a task from workQueue1. Stealing tasks using the first-in-first-out FIFO strategy (that is, stealing tasks from the base end), so that not only can avoid the task fetching time and have its queue worker thread conflict, thereby reducing the competition, but also can assist it to complete relatively large tasks.
  • When asyncMode is true, the worker thread that owns the work queue will fetch tasks from the base on a first-in, first-out policy. This is usually only used for tasks that do not need to return results or for event messaging frameworks.

ForkJoinPool constructor

Its complete construction method is as follows

private ForkJoinPool(int parallelism,
                     ForkJoinWorkerThreadFactory factory,
                     UncaughtExceptionHandler handler,
                     int mode,
                     String workerNamePrefix) {
    this.workerNamePrefix = workerNamePrefix;
    this.factory = factory;
    this.ueh = handler;
    this.config = (parallelism & SMASK) | mode;
    long np = (long)(-parallelism); // offset ctl counts
    this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}
Copy the code

Description of Important Parameters

  1. Parallelism: The parallelism level, which by default is the same as the number of cpus on our machine, uses Runtime.getruntime ().availableProcessors() to get the number of cpus available when our machine is running.
  2. Factory: The factory for creating new threads. By default use ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory.
  3. Processor under the circumstance of exception handler: Thread (Thread. UncaughtExceptionHandler handler), the processor in a Thread mission due to some unexpected to the error of some processing task Thread is interrupted, the default is null.
  4. AsyncMode: Note that in ForkJoinPool each worker thread has a separate queue of tasks

AsyncMode indicates the scheduling mode of the task queue in the worker thread. It can be FIFO or LIFO. If true, the worker threads in the thread pool schedule tasks using first-in, first-out (false by default).

ForkJoinPool. Submit method

public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
    if (task == null)
        throw new NullPointerException();
    // Commit to the work queue
    externalPush(task);
    return task;
}
Copy the code

The ForkJoinPool has its own work queues. These work queues are used to receive tasks submitted by external threads (forkJoinThreads). These work queues are called coby queues. Submit () and fork() are not substantially different, except that the submit object is just a codonic queue (along with some synchronizing, initialization operations). Coby’s Queue, like all work queues, is “stolen” by a worker thread, so when a task is successfully stolen by a worker thread, it means that the submitted task is actually being executed.

Related articles

  • Executor Thread Pools for Concurrent Programming
  • Concurrent Programming with Timed Tasks: An Analysis of timed Thread Pools
  • Future&FutureTask for Concurrent Programming
  • ThreadLocal in Depth for Concurrent Programming

PS: The above code is submitted to Github: github.com/Niuh-Study/…

GitHub Org_Hejianhui /JavaStudy GitHub Hejianhui /JavaStudy