ForkJoin builds an easy-to-use concurrency component
In actual business development, knowledge of concurrent programming is needed. There are not many asynchronous task execution scenarios using Thread pool in practice. In addition, when concurrent use is really needed, it may be more common to directly implement Runnable/Callable interface and throw it into Thread for execution. Or, more advanced, define a thread pool and throw it into execution; This blog post takes another look at how to design an easy-to-use concurrency framework with the help of ForkJoin provided by the JDK
I. background
In actual projects, a concurrent case is the display of commodity details page. In addition to the basic commodity data, there are sales volume, address, evaluation, recommendation, store information, decoration information, etc., and a piece of pseudo-code is used to describe the process of assembling the whole detailed data
ItemInfo ItemInfo = ItemService. getInfo(itemId); Int sellCount = SellService. getSellCount(itemId); RateInfo RateInfo = rateService. GetRateInfo (itemId); ShopInfo = shopService.getShopInfo(shopId); . / / decoration information DecorateInfo decoreateInfo = decorateService getDecorateInfo (itemId); / / for recommended RecommandInfo RecommandInfo = recommandService. GetRecommand (itemId);Copy the code
If it is a normal execution process, then the above 6 calls are executed sequentially. Assuming that the RT of each service is 10ms, the execution time of these 6 services alone will be >60ms.
But from a business perspective, the above six service invocation, there is no connection between each other, that is, a service call, does not depend on another service returns as a result, they can execute concurrently, six service execution down so, time is one of the longest six services, may also be a little more than 10 ms
By comparing the two, the advantages of using concurrency in this scenario are obvious. The next question is, we want to change the above code to concurrency in the simplest way possible
II. Design and implementation
For example, if we use thread pool, how can we implement?
1. Thread pool mode
Since the thread pool approach is not important, I will simply show you how it can be implemented, and how it works after implementation
Create a thread pool
ExecutorService alarmExecutorService = new ThreadPoolExecutor(3.5.60,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(10),
new DefaultThreadFactory("service-pool"),
new ThreadPoolExecutor.CallerRunsPolicy());
// 2. Encapsulate the service invocation into a threaded task
Future<ItemInfo> itemFuture = alarmExecutorService.submit(new Callable<ItemInfo>() {
@Override
public ItemInfo call(a) throws Exception {
returnitemService.getInfo(itemId); }});/ /... The same goes for other services
// 3. Obtain data
ItemInfo = itemFutre.get(); // block until returned
Copy the code
This is a fairly straightforward implementation, so let’s look at how we can play with Fork/Join and what the benefits are
2. ForkJoin way
The Fork/Join framework is a framework provided by Java7 for parallel execution of tasks. It is a framework for splitting large tasks into several smaller tasks and finally summarizing the results of each small task
To put it simply, it is about the mechanism that a complex task is divided into many small tasks and executed concurrently. The execution of tasks and tasks may not monopolize the thread, but adopt a method called job stealing. For details, please refer to it
ForkJoin learns to use notes
How can ForkJoin support the above scenario? A simple solution is as follows
1. Create a pool
ForkJoinPool pool = new ForkJoinPool(10);
// 2. Create a task and submit it
ForkJoinTask<ItemInfo> future = joinPool.submit(new RecursiveTask<ItemInfo>() {
public ItemInfo compute(a) {
returnitemService.getItemInfo(itemId); }});// 3. Get the result
future.join();
Copy the code
There’s no difference between the two, and there’s no mission debunking, right
3. Advanced
How can ForkJoin solve this problem by taking advantage of the idea of task breakdown?
For the above example, let’s change it a little bit and treat the data return of the entire detail page as a task. For the internal service call, it is provided for different applications and then divided into tasks, assuming the following hierarchy
Can be seen from the above, in front of the service call, still can continue to divide, such as our common commodity information, commodity information can be divided into basic and sku information, inventory information, and the three can execute concurrently, in other words, from using forjoin task disassemble, we can do more fine-grained concurrency scenarios
So now the goal is, how to achieve the above requirements of the task split scene, but also hope to change existing code is not very big, the key lies in writing after the easy read + maintenance (this is very important, the author approached a wrapper especially well, lead to too much switching operations of maintenance costs, and troubleshoot problems surging difficulty)
4. To achieve
A. Design ideas
The first is to define a basic execution unit, which encapsulates the specific business logic, known as tasks (the final effect is to execute tasks one Task at a time).
Considering the disassembly of tasks, we need a special task, which can be a collection of multiple tasks (i.e., a bigTask, first called bigTask).
Then, all tasks are wrapped in a bigTask and thrown into the forkJoinPool for execution (invoke and Execute asynchronously).
So, the key is how to design the BigTask and break it down into finer grained BigTasks or tasks during execution, and finally combine all the results of the task execution and return them
B. to achieve
Basic Task Interface
/** * Created by yihui on 2018/4/8. */
public interface IDataLoader<T> {
/** * The specific business logic is executed in this method, encapsulating the returned result into the context **@param context
*/
void load(T context);
}
Copy the code
An abstract implementation class that inherits ForkJoin RecuriAction corresponds to the basic Task we defined earlier
public abstract class AbstractDataLoader<T> extends RecursiveAction implements IDataLoader {
// This is used to store the returned result, and the business itself writes the data to the implemented load() method
protected T context;
public AbstractDataLoader(T context) {
this.context = context;
}
public void compute(a) {
load(context);
}
/** * Obtain the execution result and wait until the execution is complete@return* /
public T getContext(a) {
this.join();
return context;
}
public void setContext(T context) {
this.context = context; }}Copy the code
Then there is the implementation of BigTask, which is also relatively simple and maintains a List internally
public class DefaultForkJoinDataLoader<T> extends AbstractDataLoader<T> {
/** * List of tasks to be executed */
private List<AbstractDataLoader> taskList;
public DefaultForkJoinDataLoader(T context) {
super(context);
taskList = new ArrayList<>();
}
public DefaultForkJoinDataLoader<T> addTask(IDataLoader dataLoader) {
taskList.add(new AbstractDataLoader(this.context) {
@Override
public void load(Object context) { dataLoader.load(context); }});return this;
}
// Note here that the task is disassembled with the help of fork
@Override
public void load(Object context) {
this.taskList.forEach(ForkJoinTask::fork);
}
/** * Get the execution result *@return* /
public T getContext(a) {
this.taskList.forEach(ForkJoinTask::join);
return this.context; }}Copy the code
The next simple thread pool design requires an extension to ForkJoinPool because we need to provide both synchronous and asynchronous fetching positions
public class ExtendForkJoinPool extends ForkJoinPool {
public ExtendForkJoinPool(a) {}public ExtendForkJoinPool(int parallelism) {
super(parallelism);
}
public ExtendForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, Thread.UncaughtExceptionHandler handler, boolean asyncMode) {
super(parallelism, factory, handler, asyncMode);
}
// To block calls synchronously, join each task to ensure completion
public <T> T invoke(ForkJoinTask<T> task) {
if (task instanceof AbstractDataLoader) {
super.invoke(task);
return (T) ((AbstractDataLoader) task).getContext();
} else {
return super.invoke(task); }}}Copy the code
And then create the Pool factory class, nothing special
public class ForkJoinPoolFactory {
private int parallelism;
private ExtendForkJoinPool forkJoinPool;
public ForkJoinPoolFactory(a) {
this(Runtime.getRuntime().availableProcessors() * 16);
}
public ForkJoinPoolFactory(int parallelism) {
this.parallelism = parallelism;
forkJoinPool = new ExtendForkJoinPool(parallelism);
}
public ExtendForkJoinPool getObject(a) {
return this.forkJoinPool;
}
public int getParallelism(a) {
return parallelism;
}
public void setParallelism(int parallelism) {
this.parallelism = parallelism;
}
public void destroy(a) throws Exception {
this.forkJoinPool.shutdown(); }}Copy the code
At this point, the whole thing is basically done, each class is very simple, just a few things, and then you need to see how to use it, right
III. Test validation
So let’s do a simple case to show you how it works
@Data
static class Context {
public int addAns;
public int mulAns;
public String concatAns;
public Map<String, Object> ans = new ConcurrentHashMap<>();
}
@Test
public void testForkJoinFramework(a) {
ForkJoinPool forkJoinPool = new ForkJoinPoolFactory().getObject();
Context context = new Context();
DefaultForkJoinDataLoader<Context> loader = new DefaultForkJoinDataLoader<>(context);
loader.addTask(new IDataLoader<Context>() {
@Override
public void load(Context context) {
context.addAns = 100;
System.out.println("add thread: "+ Thread.currentThread()); }}); loader.addTask(new IDataLoader<Context>() {
@Override
public void load(Context context) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
context.mulAns = 50;
System.out.println("mul thread: "+ Thread.currentThread()); }}); loader.addTask(new IDataLoader<Context>() {
@Override
public void load(Context context) {
context.concatAns = "hell world";
System.out.println("concat thread: "+ Thread.currentThread()); }}); DefaultForkJoinDataLoader<Context> subTask =new DefaultForkJoinDataLoader<>(context);
subTask.addTask(new IDataLoader<Context>() {
@Override
public void load(Context context) {
System.out.println("sub thread1: " + Thread.currentThread() + " | now: " + System.currentTimeMillis());
try {
Thread.sleep(200);
} catch(InterruptedException e) { e.printStackTrace(); } context.ans.put(Thread.currentThread().getName(), System.currentTimeMillis()); }}); subTask.addTask(new IDataLoader<Context>() {
@Override
public void load(Context context) {
System.out.println("sub thread2: " + Thread.currentThread() + " | now: "+ System.currentTimeMillis()); context.ans.put(Thread.currentThread().getName(), System.currentTimeMillis()); }}); loader.addTask(subTask);long start = System.currentTimeMillis();
System.out.println("------- start: " + start);
// Submit the task, block the call method synchronously
forkJoinPool.invoke(loader);
System.out.println("------- end: " + (System.currentTimeMillis() - start));
// the output returns the result after 3s, all the results are set
System.out.println("the ans: " + context);
}
Copy the code
It is relatively simple to use, simple four steps:
- To create the Pool
- Specifies the container class ContextHolder to hold the result
- Create a task
- Creating a root Task
new DefaultForkJoinDataLoader<>(context);
- Adding subtasks
- Creating a root Task
- submit
In the above implementation, it is very easy to split the Task again. Take a look at the output above
------- start: 1523200221827 Add thread: Thread[ForkJoinpool-1-worker-50,5,main] concat thread: Thread [ForkJoinPool - 1 - worker - 36, 5, and the main] sub thread2: Thread [ForkJoinPool - 1 - worker - 29, 5, the main] | now: 1523200222000 sub thread1: Thread [ForkJoinPool - 1 - worker - 36, 5, and the main] | now: 1523200222000 the mul Thread: Thread[ForkJoinpool-1-worker-43,5,main] ------- end: 3176 The ans: ForJoinTest.Context(addAns=100, mulAns=50, concatAns=hell world, ans={ForkJoinPool-1-worker-36=1523200222204, ForkJoinPool-1-worker-29=1523200222000})Copy the code
- The first one is the thread output of each subtask and you can see that it is actually a different thread executing the task (concurrent)
- After 3s, the output result, that is, after invoke, will block until all tasks are completed
- SubTask performs task disassembly, the execution time of two subtasks is the same, but one is sleep and the other is not affected (subtasks are also executed in parallel).
In the case of asynchronous execution, it is also relatively simple, just submit the task, a little change, and then when the need to obtain data, through the Loader to obtain the results
@Test
public void testForkJoinFramework2(a) {
ForkJoinPool forkJoinPool = new ForkJoinPoolFactory().getObject();
Context context = new Context();
DefaultForkJoinDataLoader<Context> loader = new DefaultForkJoinDataLoader<>(context);
loader.addTask(new IDataLoader<Context>() {
@Override
public void load(Context context) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
context.addAns = 100;
System.out.println("add thread: "+ Thread.currentThread()); }}); loader.addTask(new IDataLoader<Context>() {
@Override
public void load(Context context) {
context.mulAns = 50;
System.out.println("mul thread: "+ Thread.currentThread()); }}); loader.addTask(new IDataLoader<Context>() {
@Override
public void load(Context context) {
context.concatAns = "hell world";
System.out.println("concat thread: "+ Thread.currentThread()); }});long start = System.currentTimeMillis();
System.out.println("------- start: " + start);
// If you do not care about the result, execute the command asynchronously
forkJoinPool.execute(loader);
/ /... There are other things you can do here and at this point, it's not blocking, addAns is not set
System.out.println("context is: " + context);
System.out.println("------- then: " + (System.currentTimeMillis() - start));
loader.getContext(); // Call this actively to wait for all tasks to complete before continuing
System.out.println("context is: " + context);
System.out.println("------- end: " + (System.currentTimeMillis() - start));
}
Copy the code
IV. The other
The source code
The source code is available at Git, mainly in the Quick-Alarm project
- QuickAlarm
- Concurrent code
Personal Blog:A gray Blog
Personal blog, record all the study and work in the blog, welcome everyone to go to stroll
The statement
As far as letter book is inferior, has on the content, pure one’s opinion, because my ability is general, knowledge is limited, if discover bug or have better suggestion, welcome the criticism to point out at any time
- Micro Blog address: Small Gray Blog
- QQ: a gray /3302797840