preface

Today, we will talk about the forkJion concurrency framework. Learning the implementation of this framework will be very helpful for programming thinking

ForkJion’s programming philosophy is easy to understand if you’ve studied big data frameworks like Storm and Spark

In general, a large task is divided into many small tasks for execution, and the results of each small task are combined to finally complete this large task

ForkJoinPool

ForkJoinPool is an implementation of the ExecutoeService interface that is designed for work that can be recursively broken down into small pieces

The fork/ Join framework distributes tasks to worker threads in the thread pool, taking full advantage of multiple processors and improving program performance

The first step in using the fork/ Join framework is to write code that does some of the work, as follows

// If the current working part reaches the size that needs to be split
	// Split the current work into two parts
	// Call the two parts of the task and wait for the result
Otherwise, do the work directly
Copy the code

The size of the split (that is, the number of small tasks to be broken down), and the way the small tasks are broken down recursively

Wrap this code in a ForkJionTask subclass, usually a RecursiveTask (which can return a result) or a RecursiveAction (which does not return a result)

use

RecursiveTask

/ * * *@authorThousand hands shura *@dateIn the 2021-01-01 s when *@descriptionForkJoInPool demo * /
public class ForkJoInPoolDemo {

    /** * Create a ForkJoinPool * Parallelism: The number of parallel threads is 3 * Factory: the default thread pool factory * Handler: The thread executes the exception handler * asyncMode: True (fifO) false LIFO */
    private static ForkJoinPool forkJoinPool = new ForkJoinPool(3, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null.true);


    private static List<String> list = new ArrayList<>();

    static {
        for (int i = 1; i <= 100; i++) {
            list.add(String.format("Task %s", i)); }}/ * * *@authorThousand hands shura *@dateIn the 2021-01-01 s when *@descriptionForkJoInPool demo * /
    static class Task extends RecursiveTask<String> {

        /** * The task to be processed */
        private List list;

        /** * The start subscript of this task processing */
        private int beginIndex;

        /** * The end subscript of this task processing */
        private int endIndex;

        /** * Divide a small task by size */
        private int split;

        public Task(List list, int split) {
            this.list = list;
            this.endIndex = 0;
            this.endIndex = list.size();
            this.split = split;
        }

        private Task(List list, int beginIndex, int endIndex, int split) {
            this.list = list;
            this.beginIndex = beginIndex;
            this.endIndex = endIndex;
            this.split = split;
        }

        @Override
        public String compute(a) {
            int count = endIndex - beginIndex;
            if (count > split) {
                int x = (beginIndex + endIndex) / 2;

                Task beginJob = new Task(list, beginIndex, x, split);
                beginJob.fork();

                Task endJob = new Task(list, x, endIndex, split);
                endJob.fork();
                return beginJob.join() + "-" + endJob.join();
            } else {
                StringJoiner stringJoiner = new StringJoiner("-");
                for (int i = beginIndex; i < endIndex; i++) {
                    stringJoiner.add(list.get(i).toString());
                }
                returnstringJoiner.toString(); }}}public static void main(String[] args) throws ExecutionException, InterruptedException {
        LocalDateTime now = LocalDateTime.now();
        StringJoiner stringJoiner = new StringJoiner("-");
        list.forEach(stringJoiner::add);
        System.out.println(stringJoiner.toString());
        System.out.println("ForEach execution time :" + Duration.between(now, LocalDateTime.now()).toMillis() + "毫秒");


        LocalDateTime now1 = LocalDateTime.now();
        ForkJoinTask<String> submit = forkJoinPool.submit(new Task(list, 10));
        String rs = submit.get();
        System.out.println(rs);

        System.out.println("ForkJoinTask execution time :" + Duration.between(now1, LocalDateTime.now()).toMillis() + "毫秒"); }}Copy the code

A list of size 100 took 18 milliseconds to iterate and 4 milliseconds to ForkJoinPool

That’s for sure! Multiple threads are definitely more efficient than single threads

RecursiveAction

/ * * *@authorThousand hands shura *@dateIn the 2021-01-01 s when *@descriptionForkJoInPool demo * /
public class ForkJoInPoolDemo {

    /** * Create a ForkJoinPool * Parallelism: The number of parallel threads is 3 * Factory: the default thread pool factory * Handler: The thread executes the exception handler * asyncMode: True (fifO) false LIFO */
    private static ForkJoinPool forkJoinPool = new ForkJoinPool(3, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null.true);


    private static List<String> list = new ArrayList<>();

    static {
        for (int i = 1; i <= 100; i++) {
            list.add(String.format("Task %s", i)); }}@FunctionalInterface
    interface TaskCallBack<T> {

        /** * Task callback **@param t
         */
        void callback(T t);
    }

    / * * *@authorThousand hands shura *@dateIn the 2021-01-01 s when *@descriptionForkJoInPool demo * /
    static class Task<T> extends RecursiveAction {

        /** * The task to be processed */
        private List<T> list;

        /** * The start subscript of this task processing */
        private int beginIndex;

        /** * The end subscript of this task processing */
        private int endIndex;

        /** * Divide a small task by size */
        private int split;

        /** * subscript task callback */
        private TaskCallBack<T> taskCallBack;


        public Task(List<T> list, int split, TaskCallBack<T> taskCallBack) {
            this.list = list;
            this.endIndex = 0;
            this.endIndex = list.size();
            this.split = split;
            this.taskCallBack = taskCallBack;
        }

        private Task(List<T> list, int beginIndex, int endIndex, int split, TaskCallBack<T> taskCallBack) {
            this.list = list;
            this.beginIndex = beginIndex;
            this.endIndex = endIndex;
            this.split = split;
            this.taskCallBack = taskCallBack;
        }

        @Override
        public void compute(a) {
            int count = endIndex - beginIndex;
            if (count > split) {
                int x = (beginIndex + endIndex) / 2;

                Task beginJob = new Task(list, beginIndex, x, split, taskCallBack);
                beginJob.fork();

                Task endJob = new Task(list, x, endIndex, split, taskCallBack);
                endJob.fork();

                beginJob.join();
                endJob.join();
            } else {
                for (inti = beginIndex; i < endIndex; i++) { taskCallBack.callback(list.get(i)); }}}}public static void main(String[] args) {
        LocalDateTime now = LocalDateTime.now();
        StringJoiner stringJoiner = new StringJoiner("-");
        list.forEach(stringJoiner::add);
        System.out.println(stringJoiner.toString());
        System.out.println("ForEach execution time :" + Duration.between(now, LocalDateTime.now()).toMillis() + "毫秒");


        LocalDateTime now1 = LocalDateTime.now();
        StringJoiner stringJoiner1 = new StringJoiner("-");
        forkJoinPool.invoke(new Task(list, 10, (s) -> stringJoiner1.add((String) s)));

        System.out.println(stringJoiner1.toString());
        System.out.println("ForkJoinTask execution time :" + Duration.between(now1, LocalDateTime.now()).toMillis() + "毫秒"); }}Copy the code

Although there is no return value for RecursiveAction, we can extend a method to deliver each element to the caller as he wishes

conclusion

The principle of forkJion is to encapsulate recursion on a thread pool basis

The compute method lets us decide for ourselves when we need to disassemble the task and when we need to execute the task, okay

  • Calling fork basically means calling compute again

  • Calling the join method is basically waiting for the task to complete

There are a number of scenarios for this framework: file traversal, looping lists to assemble data, and counting certain values…..

Any application scenario that can be implemented recursively can use the fork/ Join framework to improve performance