In the past two months, DUE to work and family affairs, I have been very busy, and there is not much time to absorb nutrients, so there is no output. Recently, I have been a little relaxed, and the subsequent [advanced path] will slowly return to the right track.
The word, for the first time contact to multithreaded processing the same task, is to use the IO multi-threaded download file, also has not again after processing it a task, until a few days ago a colleague asked me, why multithreading a set list will appear all sorts of bugs, and how to use multithreading approach to gather the same list.
First, why do problems like reprocessing a module occur?
As we all know, in Java, each thread has its own separate working memory, and all operations on shared variables must be performed by the thread in its own working memory and cannot be read or written directly from main memory.
If thread 1’s changes are to be obtained by thread 2, the modified shared variables in thread 1’s working memory need to be flushed to main memory first, and the updated shared variables in main memory need to be updated to working memory 2.
At this time, we generally consider using various synchronization methods in Java. Firstly, since the list collection needs to be processed efficiently, so synchronized methods can be excluded, so I came up with the idea of using CompletionService to operate asynchronous tasks.
The thread pool extends with the CompletionService to handle asynchronous tasks
A, CompletionService
First, customize a WeedThreadPool as described in the previous article
public class WeedThreadPool extends ThreadPoolExecutor { private final ThreadLocal<Long> startTime =new ThreadLocal<>(); private final Logger log =Logger.getLogger("WeedThreadPool"); Private Final AtomicLong numTasks =new AtomicLong(); Private Final AtomicLong totalTime =new AtomicLong(); /** * here is the constructor to implement the thread pool, I randomly selected one, Public WeedThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); }}Copy the code
Then there are ways to implement thread pools to handle list collections
public class WeedExecutorServiceDemo { BlockingQueue<Runnable> taskQueue; final static WeedThreadPool weedThreadPool = new WeedThreadPool(3, 10, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100)); Public static void main(String[] args) throws InterruptedException, ExecutionException {// Record the task start time long Start = system.currentTimemillis (); CompletionService<List<Integer>> cs = new ExecutorCompletionService<>(weedThreadPool); int tb=1; List<List<Integer>> list1 =new ArrayList(); for (int i = 0; i < 10; i++) { List<Integer> list =new ArrayList(); Int hb= TB; tb =tb*2; int finalTb = tb; cs.submit(new Callable<List<Integer>>(){ @Override public List<Integer> call() throws Exception { for (int j = hb; j< finalTb; j++){ list.add(j); } System.out.println(Thread.currentThread().getName()+"["+list+"]"); return list; }}); } weedThreadPool.shutdown(); for (int i = 0; i < 10; i++) { Future<List<Integer>> future = cs.take(); if (future ! = null) { list1.add(future.get()); System.out.println(future.get()); + (system.currentTimemillis () -start) + "milliseconds "); [System. Out. Println (" result "+ list1. The size () +"] = = = "+ list1); }}Copy the code
Treatment results:
As I mentioned before, a reasonable thread pool size can help improve the processing efficiency of the task. The general setting method on the Internet is as follows:
Optimal number of threads = ((thread wait time + thread CPU time)/thread CPU time) * number of cpus
Thus it is concluded that
Optimal number of threads = (ratio of thread wait time to thread CPU time + 1) * number of cpus
Second, the ForkJoinPool
Of course, in addition to using CompletionService, you can also use ForkJoinPool to design a processing method.
ForkJoinPool and ThreadPoolExecutor both inherit from the AbstractExecutorService class, so there is little difference between the use of ForkJoinPool and that of ThreadPoolExecutor. The core idea is to break a large task into smaller tasks and then aggregate the smaller tasks into a single result.
The ForkJoinPool framework initializes ForkJoinTask to perform tasks and provides the following two subclasses:
- RecursiveAction: Used for tasks that do not return results.
- RecursiveTask: Used for tasks that return results.
Our implementation can use the RecursiveTask method to fragment the list collection.
public class RecursiveTaskDemo { private static final ExecutorService executor = new ThreadPoolExecutor(2, 3, 10, TimeUnit.SECONDS, new LinkedBlockingQueue(10)); private static final int totalRow = 53000; private static final int splitRow = 10000; public static void main(String[] args) throws InterruptedException, ExecutionException { long start = System.currentTimeMillis(); List<Integer> List = new ArrayList<>(totalRow); for (int i = 0; i < totalRow; i++) { list.add(i); Int loopNum = (int) math.ceil ((double)totalRow/splitRow); ForkJoinPool pool = new ForkJoinPool(loopNum); ForkJoinTask<List> submit = pool.submit(new MyTask(list, 0, list.size())); List<List<Integer>>list1=new ArrayList<>(); list1.add(submit.get()); System.err. Println (" Execute task: "+ (system.currentTimemillis () -start) +" milliseconds "); [System. Out. Println (" result "+ list1. The size () +"] = = = "+ list1); } static class MyTask extends RecursiveTask<List> {private List<Integer> List; private int startRow; private int endRow; public MyTask(List<Integer> list, int startRow, int endRow) { this.list = list; this.startRow = startRow; this.endRow = endRow; } /** * recursive processing of data, * @return */ @override protected List compute() {if (endrow-startrow <= splitRow) {List<Integer> ret = new ArrayList<>(); for (int i = startRow; i < endRow; I ++) {ret.add(list.get(I)); } System.out.println(Thread.currentThread().getName()+"["+ret+"]"); return ret; } int loopNum = (int)Math.ceil((double)totalRow/splitRow); int startRow = 0; List<MyTask> myTaskList = new ArrayList<>(); for (int i = 0; i < loopNum; i++) { if (startRow > totalRow) { break; } int endRow = Math.min(startRow + splitRow, totalRow); Format ("startRow :%s, endRow:%s", startRow, endRow)); myTaskList.add(new MyTask(list, startRow, endRow)); startRow += splitRow; } // invokeAll(myTaskList) is executed independently on different threads; List<Integer> ret = new ArrayList<>(); // merge for (MyTask MyTask: myTaskList) {ret.addall (mytask.join ()); } return ret; }}}Copy the code
Treatment results:
Through the method shown above, we can increase the efficiency of task processing without locking. It can be used in scenarios like crawler data processing and data migration, and the measured results are good. Of course, the CompletionService is probably more efficient based on the results.
Hello everyone, I am Nanju who has been practicing Java for two and a half years. Here is my wechat. If you need the previous map or want to exchange experience with each other, you can communicate with each other.