04-1 line of code to complete multithreading, do not write runnable

Welcome to pay attention to b station account/public number [hexagon warrior Xia Ning], a man to pull the indicators full. This article is available on Github.

If it helps you, please click a like and add a favorite. This is really important to me. Don’t be sure next time. Don’t even care where we go next time.

  • Complete code that runs directly
  • Video on

1. Background

Java8 provides CompletableFuture and anonymous functions that allow us to multitask in one line of code

2. Create related classes

2.1. ThreadEntity

An entity class for multithreaded testing

public class ThreadEntity {
    private int num;
    private int price;
    public int countPrice(a){
        price = RandomUtils.nextInt();
        try {
            System.out.println(num);
            // Wait 1 to 10 seconds randomly
            Thread.sleep(RandomUtils.nextInt(1.10) * 1000);
            System.out.println(num);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        returnprice; }}Copy the code

2.2. ThreadPoolManager

/ * * * the tasks task per second, the default, 200, according to the traffic and calculated using the thread pool place * taskCost: spend time each task, the default 0.1 s * responseTime: maximum response time, the default value is 1 s, the average user biggest stand * * time for 3 seconds@author seal email:[email protected]
 * @date 2020/5/30 10:08 AM
 */
@Data
@Slf4j
@Configuration
public class ThreadPoolManager {
    /** * Average response time defaults to 2 seconds */
    private static final float ALL_COST_AVG = 2F;
    /** * Average I/O time defaults to 1.5 seconds */
    private static final float IO_COST_AVG = 1.5 F;
    /** * Number of server cores */
    private static final int SIZE_PROCESSOR = Runtime.getRuntime().availableProcessors();
    / * * * * block coefficient = https://www.cnblogs.com/dennyzhangdd/p/6909771.html?utm_source=itdadao&utm_medium=referral blocking time/(time) for calculating the block time + * number of threads = number of core/(1 - block coefficient) * is equal to number of CPU core * * CPU utilization (1 + waiting time and the ratio of computation time) the optimal efficiency * * N + 1 is usually < p > * https://blog.51cto.com/13527416/2056080 * /
    private static final int SIZE_CORE_POOL = SIZE_PROCESSOR + 1;

    /** * The maximum number of threads maintained in the thread pool. In conservative cases, use 2cpu * or use simple calculation thread pool size = ((thread IO time + thread CPU time)/thread CPU time) number of cpus ** * time consumed by the request /(time consumed by the request -DB processing)* number of CPUS, focus on the CPU wait time, usually for the database DB time * according to the usual 2 seconds display interface, database operation 1.5 seconds (2/0.5)* N, in fact, is optimized wait time * 

* default 4N, that is, 8 cores 32 threads */

private static final int SIZE_MAX_POOL = (int) (SIZE_PROCESSOR * (ALL_COST_AVG / (ALL_COST_AVG - IO_COST_AVG))); /** * Thread pool queue length, default is integer maximum value,Dubbo uses 1000, infinite large number of users are queuing, should select appropriate discard, * SIZE_MAX_POOL/IO_COST_AVG= Number of tasks that can be processed per second. The default value is * Number of tasks that can be processed per second =X number of queues */ private static final int SIZE_QUEUE = (int) (6 * (SIZE_MAX_POOL / IO_COST_AVG)); /** * Thread pool concrete class * LinkedBlockingDeque is commonly used for fixed threads, SynchronousQueue will used to cache the thread pool * Executors newCachedThreadPool () is often used in short-term task * < p > * thread factory selection, difference is small. * Spring CustomizableThreadFactory, new CustomizableThreadFactory (" springThread - the pool - ") * guava ThreadFactoryBuilder, new ThreadFactoryBuilder().setNameFormat("retryClient-pool-").build(); * apache - lang BasicThreadFactory, new BasicThreadFactory. Builder () namingPattern (" BasicThreadFactory - "). The build () * < p > * The default policy for full queues is AbortPolicy */ private static ThreadPoolManager threadPoolManager = new ThreadPoolManager(); private final ThreadPoolExecutor pool = new ThreadPoolExecutor( SIZE_CORE_POOL, SIZE_MAX_POOL, 30L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(SIZE_QUEUE), new CustomizableThreadFactory("springThread-pool-"), new ThreadPoolExecutor.AbortPolicy() ); private void prepare(a) { if(pool.isShutdown() && ! pool.prestartCoreThread()) {int coreSize = pool.prestartAllCoreThreads(); System.out.println("Current Thread Pool"); }}public static ThreadPoolManager getInstance(a) { if(threadPoolManager ! =null) { ThreadPoolExecutor pool = threadPoolManager.pool; } returnthreadPoolManager; }}Copy the code

3. Core code

3.1. The parallel flow

Parallel is a parallel core that can find internal multithreading, but it will be sorted after collect, so don’t worry. Small projects can use it. For large projects, it is recommended to use your own thread pool, and the JDK’s own fork/join does not fit the business

System.out.println(Stream.of(1.2.3.4.5.6).parallel().map(l -> {
    System.out.println(l);
    return l;
}).collect(Collectors.toList()));
Copy the code

The output is as follows, random output due to multi-threading, but the final result remains unchanged due to the use of collect

1, 2, 3, 4, 5, 6Copy the code

3.2. Synchronize the code

This can be done without implementing the thread interface, but still consider discarding when the queue is full

List<ThreadEntity> listEntity = IntStream.range(0.10).mapToObj(x -> ThreadEntity.builder().num(x).build()).collect(Collectors.toList());
List<CompletableFuture<Integer>> listCompletableFuture = listEntity.stream().map(x -> {
    try {
        / / here ThreadPoolManager. GetInstance (). GetPool () if you don't use the default commonPool pass this parameter, no special requirements trycatch generally don't write
        return CompletableFuture.supplyAsync(() -> x.countPrice(),
                ThreadPoolManager.getInstance().getPool());
    } catch (RejectedExecutionException e) {
        System.out.println("reject" + x);
        log.error("", e);
        return null;
    }
}).collect(Collectors.toList());
List<Integer> result = listCompletableFuture.stream().map(CompletableFuture::join).collect(Collectors.toList());
System.out.println(result);
System.out.println(listEntity);
Copy the code

The output below shows that the execution is multithreaded, but the result remains the same as before

start6
start9
start0
start3
start2
start1
start8
start5
start4
start7
end3
end8
end5
end7
end9
end1
end2
end6
end0
end4
[131523688, 1491605535, 222657954, 132274662, 1134597171, 2057763841, 1168687436, 1842194861, 1264173480, 56446450]
[ThreadEntity(super=com.example.demo.lesson.grace.thread.ThreadEntity@7d6f201, num=0, price=131523688), ThreadEntity(super=com.example.demo.lesson.grace.thread.ThreadEntity@58e825f3, num=1, price=1491605535), ThreadEntity(super=com.example.demo.lesson.grace.thread.ThreadEntity@d458bb1, num=2, price=222657954), ThreadEntity(super=com.example.demo.lesson.grace.thread.ThreadEntity@7e26830, num=3, price=132274662), ThreadEntity(super=com.example.demo.lesson.grace.thread.ThreadEntity@43a0a2b8, num=4, price=1134597171), ThreadEntity(super=com.example.demo.lesson.grace.thread.ThreadEntity@7aa70ac1, num=5, price=2057763841), ThreadEntity(super=com.example.demo.lesson.grace.thread.ThreadEntity@45a8d047, num=6, price=1168687436), ThreadEntity(super=com.example.demo.lesson.grace.thread.ThreadEntity@6dcdb8e3, num=7, price=1842194861), ThreadEntity(super=com.example.demo.lesson.grace.thread.ThreadEntity@4b59d119, num=8, price=1264173480), ThreadEntity(super=com.example.demo.lesson.grace.thread.ThreadEntity@35d5d9e, num=9, price=56446450)]
Copy the code

Intstream. range(0, 10) is set to (0, 1000)

java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$AsyncSupply@5af97850 rejected from java.util.concurrent.ThreadPoolExecutor@491666ad[Running, pool size = 64, active threads = 64, queued tasks = 256, completed tasks = 0]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
    at java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1618)
    at java.util.concurrent.CompletableFuture.supplyAsync(CompletableFuture.java:1843)
    at com.example.demo.lesson.grace.thread.TestMain.lambda$threadEx1$2(TestMain.java:34)
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
    at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
    at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
    at com.example.demo.lesson.grace.thread.TestMain.threadEx1(TestMain.java:41)
    at com.example.demo.lesson.grace.thread.TestMain.main(TestMain.java:26)
rejectThreadEntity(super=com.example.demo.lesson.grace.thread.ThreadEntity@1a9, num=366)
Copy the code

3.3. Asynchronous code

The following code can be directly abbreviated as a line, in the treatment of the asynchronous task extremely convenient CompletableFuture. RunAsync (() – > fun ())

List<ThreadEntity> listEntity = IntStream.range(0.500).mapToObj(x -> ThreadEntity.builder().num(x).build()).collect(Collectors.toList());
List<CompletableFuture> listCompletableFuture = listEntity.stream().map(x -> {
    try {
        / / here ThreadPoolManager. GetInstance (). GetPool () if you don't use the default commonPool pass this parameter, no special requirements trycatch generally don't write
        return CompletableFuture.runAsync(() -> x.countPrice(), ThreadPoolManager.getInstance().getPool());
    } catch (RejectedExecutionException e) {
        System.out.println("reject" + x);
        return null;
    }
}).collect(Collectors.toList());
listCompletableFuture.stream().map(CompletableFuture::join);
System.out.println("1234");
// A line of multithreading is executed asynchronously
CompletableFuture.runAsync(() -> System.out.println(1));
Copy the code

The output is as follows, you can see that the main thread has finished before the other child threads are output, there is no multithreading waiting at all

1234
1
start7
start0
start6
start5
start4
start2
start8
start1
start9
start3
end8
end4
end9
end6
end2
end0
end1
end3
end5
end7
Copy the code