Multithreaded Fork/Join

Java7 provides a new way to handle multi-threaded tasks: Fork/Join, the use of a ForkJoinPool to split a task into several “small tasks” for parallel computation, and then combine the results of several “small tasks” into a total calculation. In a word, divide and conquer.

First ForkJoinPool

To use Fork/Join, create a ForkJoinPool.

The way constructors are used in Java7 is ForkJoinPool pool = new ForkJoinPool(6); And 6 is the number of parallel processors, usually set to the number of CPU cores, because MINE is 6 cores, so this is 6.

If set to 2, it looks like this:

After Java8, ForkJoinPool pool = ForkJoinPool.commonPool(); This is the way to get ForkJoinPool.

Can view the source code that is in Java. Util. Concurrent. ForkJoinPool# makeCommonPool ForkJoinPool created in this method, it is mainly:

return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
                                "ForkJoinPool.commonPool-worker-");
Copy the code

I didn’t get the configuration of the computer, so it’s 11.

The next Task

ForkJoinPool can now handle tasks. A ForkJoinTask is a task, but we do not normally use this class, but a subclass of it. The differences are as follows:

  • RecursiveAction<V>There is no need for the task to return results
  • RecursiveTask<V>Requires the task to return results

The sample

Function: Concatenate ___changed after the name of each User in the list.

Test entity class

@Data
public class User {

    private String name;

    public User(String name) {
        this.name = name; }}Copy the code
public class ForkJoinDemo {

    private List<User> addString(a) {
        List<User> list = new ArrayList<>();
        list.add(new User("xiao"));
        list.add(new User("hua"));
        list.add(new User("zhao"));
        list.add(new User("li"));
        list.add(new User("liu"));
        list.add(new User("sun"));
        list.add(new User("zhou"));
        list.add(new User("wu"));
        list.add(new User("zheng"));
        list.add(new User("wang"));

        //return list.stream().peek(e -> e.setName(e.getName() + "__changed")).collect(Collectors.toList());

        // ForkJoinPool
        ForkJoinPool pool = new ForkJoinPool(6);
        TaskDemo task = new TaskDemo(list);
        //pool.execute(task);
        return pool.invoke(task);
    }

    public static void main(String[] args) {
        long start = System.currentTimeMillis();
        ForkJoinDemo demo = new ForkJoinDemo();
        List<User> list = demo.addString();
        System.out.println(JSONObject.toJSONString(list));
        long time = System.currentTimeMillis() - start;
        System.out.println("ForkJoinDemo_main_time:{} " + time + "ms"); }}Copy the code

A ForkJoinPool is created and a task is thrown to the ForkJoinPool. There are two methods to call ForkJoinPool, execute and invoke.

  • executeNo return value
  • invokeReturns a value
ForkJoinPool pool = new ForkJoinPool(6);
TaskDemo task = new TaskDemo(list);
//pool.execute(task);
return pool.invoke(task);
Copy the code

Here’s the processing logic:

public class TaskDemo extends RecursiveTask<List<User>> {

    private List<User> list;
    private static final int MIN_LENGTH = 1;

    TaskDemo(List<User> list) {
        this.list = list;
    }

    @Override
    protected List<User> compute(a) {
        List<User> listResult = new ArrayList<>();
        if (list.size() <= MIN_LENGTH) {
            return chengeName(list);
        } else {
            TaskDemo task1 = new TaskDemo(list.subList(0, MIN_LENGTH));
            TaskDemo task2 = new TaskDemo(list.subList(MIN_LENGTH, list.size()));
            invokeAll(task1, task2);

            listResult.addAll(task1.join());
            listResult.addAll(task2.join());
        }
        return listResult;
    }

    private List<User> chengeName(List<User> list) {
        for (User user : list) {
            user.setName(user.getName() + "___changed");
            System.out.println(Thread.currentThread().getName() + "" + user.getName());
        }
        returnlist; }}Copy the code

This uses the constructor approach, which uses a member variable to receive and pass in data that needs to be processed.

The method that you modify is written here in chengeName, and the core of being able to divide and conquer is the compute method.

The internal logic of the compute method is used to determine whether the length of the list meets the size of the shard.

And there are two ways to do it

  • invokeAll
  • joinWhen the task is complete, the calculated result is returned

Look at the invokeAll method

public static void invokeAll(ForkJoinTask
        t1, ForkJoinTask
        t2) {
    int s1, s2;
    t2.fork();
    if((s1 = t1.doInvoke() & DONE_MASK) ! = NORMAL) t1.reportException(s1);if((s2 = t2.doJoin() & DONE_MASK) ! = NORMAL) t2.reportException(s2); }Copy the code

There are fork, doInvoke, doJoin methods.

  • forkThe current thread isForkJoinWorkerThreadTo joinForkJoinPooltheworkQueue, wait to execute, otherwise callForkJoinPool.common.externalPush(this);.
  • doInvokeFirst try this thread to execute, if not successful, proceed to the subsequent process, the current thread isForkJoinWorkerThreadWhen not trying to will thetaskMove off the stack and execute, but wait. The current thread is notForkJoinWorkerThread, the callexternalAwaitDoneMethods.
  • doJoinCompleted, return status. Not completed. The current thread isForkJoinWorkerThread, from the threadworkQueueAnd try to put the currenttaskQueue and execute, return status if done, otherwise use when the thread pool is inForkJoinPooltheawaitJoinMethod wait. The current thread is notForkJoinWorkerThreadCall the aboveexternalAwaitDoneMethods.

conclusion

ForkJoin uses multi-threading in a new way to divide and conquer and divide into parts. Based on the steal algorithm, efficient use of CPU, the ultimate goal is to run full CPU, suitable for computing intensive tasks.

Once upon a time, I encountered a requirement, a lot of rule checking, and computation, which took more than 10 seconds for a single thread, 100 milliseconds for ForkJoin, and then a cache.

Take a look at the results:

useForkJoin

uselambdaexpression

The optimization is not obvious, but the more computationally intensive the task, the more obvious the optimization is, and the task of changing the name is still too simple.

Reference:

Segmentfault.com/a/119000001…

Finally, welcome everyone to pay attention to my public number, learn together, progress together. Refueling 🤣

Search concerns: Nanzhao Blog