ForkJoin builds an easy-to-use concurrency component

In actual business development, knowledge of concurrent programming is needed. There are not many asynchronous task execution scenarios using Thread pool in practice. In addition, when concurrent use is really needed, it may be more common to directly implement Runnable/Callable interface and throw it into Thread for execution. Or, more advanced, define a thread pool and throw it into execution; This blog post takes another look at how to design an easy-to-use concurrency framework with the help of ForkJoin provided by the JDK

I. background

In actual projects, a concurrent case is the display of commodity details page. In addition to the basic commodity data, there are sales volume, address, evaluation, recommendation, store information, decoration information, etc., and a piece of pseudo-code is used to describe the process of assembling the whole detailed data

ItemInfo ItemInfo = ItemService. getInfo(itemId); Int sellCount = SellService. getSellCount(itemId); RateInfo RateInfo = rateService. GetRateInfo (itemId); ShopInfo = shopService.getShopInfo(shopId); . / / decoration information DecorateInfo decoreateInfo = decorateService getDecorateInfo (itemId); / / for recommended RecommandInfo RecommandInfo = recommandService. GetRecommand (itemId);Copy the code

If it is a normal execution process, then the above 6 calls are executed sequentially. Assuming that the RT of each service is 10ms, the execution time of these 6 services alone will be >60ms.

But from a business perspective, the above six service invocation, there is no connection between each other, that is, a service call, does not depend on another service returns as a result, they can execute concurrently, six service execution down so, time is one of the longest six services, may also be a little more than 10 ms

By comparing the two, the advantages of using concurrency in this scenario are obvious. The next question is, we want to change the above code to concurrency in the simplest way possible

II. Design and implementation

For example, if we use thread pool, how can we implement?

1. Thread pool mode

Since the thread pool approach is not important, I will simply show you how it can be implemented, and how it works after implementation

Create a thread pool
ExecutorService alarmExecutorService = new ThreadPoolExecutor(3.5.60,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(10), 
                new DefaultThreadFactory("service-pool"),
                new ThreadPoolExecutor.CallerRunsPolicy());


// 2. Encapsulate the service invocation into a threaded task
Future<ItemInfo> itemFuture = alarmExecutorService.submit(new Callable<ItemInfo>() {
    @Override
    public ItemInfo call(a) throws Exception {
        returnitemService.getInfo(itemId); }});/ /... The same goes for other services


// 3. Obtain data
ItemInfo = itemFutre.get(); // block until returned
Copy the code

This is a fairly straightforward implementation, so let’s look at how we can play with Fork/Join and what the benefits are

2. ForkJoin way

The Fork/Join framework is a framework provided by Java7 for parallel execution of tasks. It is a framework for splitting large tasks into several smaller tasks and finally summarizing the results of each small task

To put it simply, it is about the mechanism that a complex task is divided into many small tasks and executed concurrently. The execution of tasks and tasks may not monopolize the thread, but adopt a method called job stealing. For details, please refer to it

ForkJoin learns to use notes

How can ForkJoin support the above scenario? A simple solution is as follows

1. Create a pool
ForkJoinPool pool = new ForkJoinPool(10);


// 2. Create a task and submit it
ForkJoinTask<ItemInfo> future = joinPool.submit(new RecursiveTask<ItemInfo>() {
    public ItemInfo compute(a) {
        returnitemService.getItemInfo(itemId); }});// 3. Get the result
future.join();
Copy the code

There’s no difference between the two, and there’s no mission debunking, right

3. Advanced

How can ForkJoin solve this problem by taking advantage of the idea of task breakdown?

For the above example, let’s change it a little bit and treat the data return of the entire detail page as a task. For the internal service call, it is provided for different applications and then divided into tasks, assuming the following hierarchy

Can be seen from the above, in front of the service call, still can continue to divide, such as our common commodity information, commodity information can be divided into basic and sku information, inventory information, and the three can execute concurrently, in other words, from using forjoin task disassemble, we can do more fine-grained concurrency scenarios

So now the goal is, how to achieve the above requirements of the task split scene, but also hope to change existing code is not very big, the key lies in writing after the easy read + maintenance (this is very important, the author approached a wrapper especially well, lead to too much switching operations of maintenance costs, and troubleshoot problems surging difficulty)

4. To achieve

A. Design ideas

The first is to define a basic execution unit, which encapsulates the specific business logic, known as tasks (the final effect is to execute tasks one Task at a time).

Considering the disassembly of tasks, we need a special task, which can be a collection of multiple tasks (i.e., a bigTask, first called bigTask).

Then, all tasks are wrapped in a bigTask and thrown into the forkJoinPool for execution (invoke and Execute asynchronously).

So, the key is how to design the BigTask and break it down into finer grained BigTasks or tasks during execution, and finally combine all the results of the task execution and return them

B. to achieve

Basic Task Interface

/** * Created by yihui on 2018/4/8. */
public interface IDataLoader<T> {


    /** * The specific business logic is executed in this method, encapsulating the returned result into the context **@param context
     */
    void load(T context);

}
Copy the code

An abstract implementation class that inherits ForkJoin RecuriAction corresponds to the basic Task we defined earlier

public abstract class AbstractDataLoader<T> extends RecursiveAction implements IDataLoader {

    // This is used to store the returned result, and the business itself writes the data to the implemented load() method
    protected T context;

    public AbstractDataLoader(T context) {
        this.context = context;
    }

    public void compute(a) {
        load(context);
    }


    /** * Obtain the execution result and wait until the execution is complete@return* /
    public T getContext(a) {
        this.join();
        return context;
    }

    public void setContext(T context) {
        this.context = context; }}Copy the code

Then there is the implementation of BigTask, which is also relatively simple and maintains a List internally

public class DefaultForkJoinDataLoader<T> extends AbstractDataLoader<T> {
    /** * List of tasks to be executed */
    private List<AbstractDataLoader> taskList;


    public DefaultForkJoinDataLoader(T context) {
        super(context);
        taskList = new ArrayList<>();
    }


    public DefaultForkJoinDataLoader<T> addTask(IDataLoader dataLoader) {
        taskList.add(new AbstractDataLoader(this.context) {
            @Override
            public void load(Object context) { dataLoader.load(context); }});return this;
    }


    // Note here that the task is disassembled with the help of fork
    @Override
    public void load(Object context) {
        this.taskList.forEach(ForkJoinTask::fork);
    }


    /** * Get the execution result *@return* /
    public T getContext(a) {
        this.taskList.forEach(ForkJoinTask::join);
        return this.context; }}Copy the code

The next simple thread pool design requires an extension to ForkJoinPool because we need to provide both synchronous and asynchronous fetching positions

public class ExtendForkJoinPool extends ForkJoinPool {

    public ExtendForkJoinPool(a) {}public ExtendForkJoinPool(int parallelism) {
        super(parallelism);
    }

    public ExtendForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, Thread.UncaughtExceptionHandler handler, boolean asyncMode) {
        super(parallelism, factory, handler, asyncMode);
    }


    // To block calls synchronously, join each task to ensure completion
    public <T> T invoke(ForkJoinTask<T> task) {
        if (task instanceof AbstractDataLoader) {
            super.invoke(task);
            return (T) ((AbstractDataLoader) task).getContext();
        } else {
            return super.invoke(task); }}}Copy the code

And then create the Pool factory class, nothing special

public class ForkJoinPoolFactory {

    private int parallelism;

    private ExtendForkJoinPool forkJoinPool;

    public ForkJoinPoolFactory(a) {
        this(Runtime.getRuntime().availableProcessors() * 16);
    }

    public ForkJoinPoolFactory(int parallelism) {
        this.parallelism = parallelism;
        forkJoinPool = new ExtendForkJoinPool(parallelism);
    }

    public ExtendForkJoinPool getObject(a) {
        return this.forkJoinPool;
    }

    public int getParallelism(a) {
        return parallelism;
    }

    public void setParallelism(int parallelism) {
        this.parallelism = parallelism;
    }


    public void destroy(a) throws Exception {
        this.forkJoinPool.shutdown(); }}Copy the code

At this point, the whole thing is basically done, each class is very simple, just a few things, and then you need to see how to use it, right

III. Test validation

So let’s do a simple case to show you how it works

 @Data
static class Context {
    public int addAns;

    public int mulAns;

    public String concatAns;

    public Map<String, Object> ans = new ConcurrentHashMap<>();
}


@Test
public void testForkJoinFramework(a) {
    ForkJoinPool forkJoinPool = new ForkJoinPoolFactory().getObject();

    Context context = new Context();
    DefaultForkJoinDataLoader<Context> loader = new DefaultForkJoinDataLoader<>(context);
    loader.addTask(new IDataLoader<Context>() {
        @Override
        public void load(Context context) {
            context.addAns = 100;
            System.out.println("add thread: "+ Thread.currentThread()); }}); loader.addTask(new IDataLoader<Context>() {
        @Override
        public void load(Context context) {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            context.mulAns = 50;
            System.out.println("mul thread: "+ Thread.currentThread()); }}); loader.addTask(new IDataLoader<Context>() {
        @Override
        public void load(Context context) {
            context.concatAns = "hell world";
            System.out.println("concat thread: "+ Thread.currentThread()); }}); DefaultForkJoinDataLoader<Context> subTask =new DefaultForkJoinDataLoader<>(context);
    subTask.addTask(new IDataLoader<Context>() {
        @Override
        public void load(Context context) {
            System.out.println("sub thread1: " + Thread.currentThread() + " | now: " + System.currentTimeMillis());
            try {
                Thread.sleep(200);
            } catch(InterruptedException e) { e.printStackTrace(); } context.ans.put(Thread.currentThread().getName(), System.currentTimeMillis()); }}); subTask.addTask(new IDataLoader<Context>() {
        @Override
        public void load(Context context) {
            System.out.println("sub thread2: " + Thread.currentThread() + " | now: "+ System.currentTimeMillis()); context.ans.put(Thread.currentThread().getName(), System.currentTimeMillis()); }}); loader.addTask(subTask);long start = System.currentTimeMillis();
    System.out.println("------- start: " + start);

    // Submit the task, block the call method synchronously
    forkJoinPool.invoke(loader);


    System.out.println("------- end: " + (System.currentTimeMillis() - start));

    // the output returns the result after 3s, all the results are set
    System.out.println("the ans: " + context);
}
Copy the code

It is relatively simple to use, simple four steps:

  • To create the Pool
  • Specifies the container class ContextHolder to hold the result
  • Create a task
    • Creating a root Tasknew DefaultForkJoinDataLoader<>(context);
    • Adding subtasks
  • submit

In the above implementation, it is very easy to split the Task again. Take a look at the output above

------- start: 1523200221827 Add thread: Thread[ForkJoinpool-1-worker-50,5,main] concat thread: Thread [ForkJoinPool - 1 - worker - 36, 5, and the main] sub thread2: Thread [ForkJoinPool - 1 - worker - 29, 5, the main] | now: 1523200222000 sub thread1: Thread [ForkJoinPool - 1 - worker - 36, 5, and the main] | now: 1523200222000 the mul Thread: Thread[ForkJoinpool-1-worker-43,5,main] ------- end: 3176 The ans: ForJoinTest.Context(addAns=100, mulAns=50, concatAns=hell world, ans={ForkJoinPool-1-worker-36=1523200222204, ForkJoinPool-1-worker-29=1523200222000})Copy the code
  • The first one is the thread output of each subtask and you can see that it is actually a different thread executing the task (concurrent)
  • After 3s, the output result, that is, after invoke, will block until all tasks are completed
  • SubTask performs task disassembly, the execution time of two subtasks is the same, but one is sleep and the other is not affected (subtasks are also executed in parallel).

In the case of asynchronous execution, it is also relatively simple, just submit the task, a little change, and then when the need to obtain data, through the Loader to obtain the results

@Test
public void testForkJoinFramework2(a) {
    ForkJoinPool forkJoinPool = new ForkJoinPoolFactory().getObject();

    Context context = new Context();
    DefaultForkJoinDataLoader<Context> loader = new DefaultForkJoinDataLoader<>(context);
    loader.addTask(new IDataLoader<Context>() {
        @Override
        public void load(Context context) {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            context.addAns = 100;
            System.out.println("add thread: "+ Thread.currentThread()); }}); loader.addTask(new IDataLoader<Context>() {
        @Override
        public void load(Context context) {
            context.mulAns = 50;
            System.out.println("mul thread: "+ Thread.currentThread()); }}); loader.addTask(new IDataLoader<Context>() {
        @Override
        public void load(Context context) {
            context.concatAns = "hell world";
            System.out.println("concat thread: "+ Thread.currentThread()); }});long start = System.currentTimeMillis();
    System.out.println("------- start: " + start);

    // If you do not care about the result, execute the command asynchronously
    forkJoinPool.execute(loader);

    / /... There are other things you can do here and at this point, it's not blocking, addAns is not set
    System.out.println("context is: " + context);
    System.out.println("------- then: " + (System.currentTimeMillis() - start));


    loader.getContext(); // Call this actively to wait for all tasks to complete before continuing
    System.out.println("context is: " + context);
    System.out.println("------- end: " + (System.currentTimeMillis() - start));
}
Copy the code

IV. The other

The source code

The source code is available at Git, mainly in the Quick-Alarm project

  • QuickAlarm
  • Concurrent code

Personal Blog:A gray Blog

Personal blog, record all the study and work in the blog, welcome everyone to go to stroll

The statement

As far as letter book is inferior, has on the content, pure one’s opinion, because my ability is general, knowledge is limited, if discover bug or have better suggestion, welcome the criticism to point out at any time

  • Micro Blog address: Small Gray Blog
  • QQ: a gray /3302797840

Scanning attention