Multithreaded Fork/Join
Java7 provides a new way to handle multi-threaded tasks: Fork/Join, the use of a ForkJoinPool to split a task into several “small tasks” for parallel computation, and then combine the results of several “small tasks” into a total calculation. In a word, divide and conquer.
First ForkJoinPool
To use Fork/Join, create a ForkJoinPool.
The way constructors are used in Java7 is ForkJoinPool pool = new ForkJoinPool(6); And 6 is the number of parallel processors, usually set to the number of CPU cores, because MINE is 6 cores, so this is 6.
If set to 2, it looks like this:
After Java8, ForkJoinPool pool = ForkJoinPool.commonPool(); This is the way to get ForkJoinPool.
Can view the source code that is in Java. Util. Concurrent. ForkJoinPool# makeCommonPool ForkJoinPool created in this method, it is mainly:
return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
"ForkJoinPool.commonPool-worker-");
Copy the code
I didn’t get the configuration of the computer, so it’s 11.
The next Task
ForkJoinPool can now handle tasks. A ForkJoinTask is a task, but we do not normally use this class, but a subclass of it. The differences are as follows:
RecursiveAction<V>
There is no need for the task to return resultsRecursiveTask<V>
Requires the task to return results
The sample
Function: Concatenate ___changed after the name of each User in the list.
Test entity class
@Data
public class User {
private String name;
public User(String name) {
this.name = name; }}Copy the code
public class ForkJoinDemo {
private List<User> addString(a) {
List<User> list = new ArrayList<>();
list.add(new User("xiao"));
list.add(new User("hua"));
list.add(new User("zhao"));
list.add(new User("li"));
list.add(new User("liu"));
list.add(new User("sun"));
list.add(new User("zhou"));
list.add(new User("wu"));
list.add(new User("zheng"));
list.add(new User("wang"));
//return list.stream().peek(e -> e.setName(e.getName() + "__changed")).collect(Collectors.toList());
// ForkJoinPool
ForkJoinPool pool = new ForkJoinPool(6);
TaskDemo task = new TaskDemo(list);
//pool.execute(task);
return pool.invoke(task);
}
public static void main(String[] args) {
long start = System.currentTimeMillis();
ForkJoinDemo demo = new ForkJoinDemo();
List<User> list = demo.addString();
System.out.println(JSONObject.toJSONString(list));
long time = System.currentTimeMillis() - start;
System.out.println("ForkJoinDemo_main_time:{} " + time + "ms"); }}Copy the code
A ForkJoinPool is created and a task is thrown to the ForkJoinPool. There are two methods to call ForkJoinPool, execute and invoke.
execute
No return valueinvoke
Returns a value
ForkJoinPool pool = new ForkJoinPool(6);
TaskDemo task = new TaskDemo(list);
//pool.execute(task);
return pool.invoke(task);
Copy the code
Here’s the processing logic:
public class TaskDemo extends RecursiveTask<List<User>> {
private List<User> list;
private static final int MIN_LENGTH = 1;
TaskDemo(List<User> list) {
this.list = list;
}
@Override
protected List<User> compute(a) {
List<User> listResult = new ArrayList<>();
if (list.size() <= MIN_LENGTH) {
return chengeName(list);
} else {
TaskDemo task1 = new TaskDemo(list.subList(0, MIN_LENGTH));
TaskDemo task2 = new TaskDemo(list.subList(MIN_LENGTH, list.size()));
invokeAll(task1, task2);
listResult.addAll(task1.join());
listResult.addAll(task2.join());
}
return listResult;
}
private List<User> chengeName(List<User> list) {
for (User user : list) {
user.setName(user.getName() + "___changed");
System.out.println(Thread.currentThread().getName() + "" + user.getName());
}
returnlist; }}Copy the code
This uses the constructor approach, which uses a member variable to receive and pass in data that needs to be processed.
The method that you modify is written here in chengeName, and the core of being able to divide and conquer is the compute method.
The internal logic of the compute method is used to determine whether the length of the list meets the size of the shard.
And there are two ways to do it
invokeAll
join
When the task is complete, the calculated result is returned
Look at the invokeAll method
public static void invokeAll(ForkJoinTask
t1, ForkJoinTask
t2) {
int s1, s2;
t2.fork();
if((s1 = t1.doInvoke() & DONE_MASK) ! = NORMAL) t1.reportException(s1);if((s2 = t2.doJoin() & DONE_MASK) ! = NORMAL) t2.reportException(s2); }Copy the code
There are fork, doInvoke, doJoin methods.
fork
The current thread isForkJoinWorkerThread
To joinForkJoinPool
theworkQueue
, wait to execute, otherwise callForkJoinPool.common.externalPush(this);
.doInvoke
First try this thread to execute, if not successful, proceed to the subsequent process, the current thread isForkJoinWorkerThread
When not trying to will thetask
Move off the stack and execute, but wait. The current thread is notForkJoinWorkerThread
, the callexternalAwaitDone
Methods.doJoin
Completed, return status. Not completed. The current thread isForkJoinWorkerThread
, from the threadworkQueue
And try to put the currenttask
Queue and execute, return status if done, otherwise use when the thread pool is inForkJoinPool
theawaitJoin
Method wait. The current thread is notForkJoinWorkerThread
Call the aboveexternalAwaitDone
Methods.
conclusion
ForkJoin uses multi-threading in a new way to divide and conquer and divide into parts. Based on the steal algorithm, efficient use of CPU, the ultimate goal is to run full CPU, suitable for computing intensive tasks.
Once upon a time, I encountered a requirement, a lot of rule checking, and computation, which took more than 10 seconds for a single thread, 100 milliseconds for ForkJoin, and then a cache.
Take a look at the results:
useForkJoin
uselambda
expression
The optimization is not obvious, but the more computationally intensive the task, the more obvious the optimization is, and the task of changing the name is still too simple.
Reference:
Segmentfault.com/a/119000001…
Finally, welcome everyone to pay attention to my public number, learn together, progress together. Refueling 🤣
Search concerns: Nanzhao Blog