1. Friendship links
The directory can run directly with the complete code video explaining the text version
Please click “like” and “bookmark” if it’s helpful. It’s really important to me
2. Create related classes
2.1 ThreadEntity
An entity class for multithreaded testing
public class ThreadEntity {
private int num;
public int countPrice(a){
int a = RandomUtils.nextInt();
try {
Thread.sleep(RandomUtils.nextInt(1.10) * 1000);
System.out.println(num);
} catch (InterruptedException e) {
e.printStackTrace();
}
returna; }}Copy the code
2.2 ThreadPoolManager
/ * * * the tasks task per second, the default, 200, according to the traffic and calculated using the thread pool place * taskCost: spend time each task, the default 0.1 s * responseTime: maximum response time, the default value is 1 s, the average user biggest stand * * time for 3 seconds@author seal email:[email protected]
* @date 2020/5/30 10:08 AM
*/
@Data
@Slf4j
@Configuration
public class ThreadPoolManager {
/** * Average response time defaults to 2 seconds */
private static final float ALL_COST_AVG = 2F;
/** * Average I/O time defaults to 1.5 seconds */
private static final float IO_COST_AVG = 1.5 F;
/** * Number of server cores */
private static final int SIZE_PROCESSOR = Runtime.getRuntime().availableProcessors();
/ * * * * block coefficient = https://www.cnblogs.com/dennyzhangdd/p/6909771.html?utm_source=itdadao&utm_medium=referral blocking time/(time) for calculating the block time + * number of threads = number of core/(1 - block coefficient) * is equal to number of CPU core * * CPU utilization (1 + waiting time and the ratio of computation time) the optimal efficiency * * N + 1 is usually < p > * https://blog.51cto.com/13527416/2056080 * /
private static final int SIZE_CORE_POOL = SIZE_PROCESSOR + 1;
/** * The maximum number of threads maintained in the thread pool. In conservative cases, use 2cpu * or use simple calculation thread pool size = ((thread IO time + thread CPU time)/thread CPU time) number of cpus ** * time consumed by the request /(time consumed by the request -DB processing)* number of CPUS, focus on the CPU wait time, usually for the database DB time * according to the usual 2 seconds display interface, database operation 1.5 seconds (2/0.5)* N, in fact, is optimized wait time * * default 4N, that is, 8 cores 32 threads */
private static final int SIZE_MAX_POOL = (int) (SIZE_PROCESSOR * (ALL_COST_AVG / (ALL_COST_AVG - IO_COST_AVG)));
/** * Thread pool queue length, default is integer maximum value,Dubbo uses 1000, infinite large number of users are queuing, should select appropriate discard, * SIZE_MAX_POOL/IO_COST_AVG= Number of tasks that can be processed per second. The default value is * Number of tasks that can be processed per second =X number of queues */
private static final int SIZE_QUEUE = (int) (6 * (SIZE_MAX_POOL / IO_COST_AVG));
/** * Thread pool concrete class * LinkedBlockingDeque is commonly used for fixed threads, SynchronousQueue will used to cache the thread pool * Executors newCachedThreadPool () is often used in short-term task * < p > * thread factory selection, difference is small. * Spring CustomizableThreadFactory, new CustomizableThreadFactory (" springThread - the pool - ") * guava ThreadFactoryBuilder, new ThreadFactoryBuilder().setNameFormat("retryClient-pool-").build(); * apache - lang BasicThreadFactory, new BasicThreadFactory. Builder () namingPattern (" BasicThreadFactory - "). The build () * < p > * The default policy for full queues is AbortPolicy */
private static ThreadPoolManager threadPoolManager = new ThreadPoolManager();
private final ThreadPoolExecutor pool = new ThreadPoolExecutor(
SIZE_CORE_POOL,
SIZE_MAX_POOL,
30L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(SIZE_QUEUE),
new CustomizableThreadFactory("springThread-pool-"),
new ThreadPoolExecutor.AbortPolicy()
);
private void prepare(a) {
if(pool.isShutdown() && ! pool.prestartCoreThread()) {int coreSize = pool.prestartAllCoreThreads();
System.out.println("Current Thread Pool"); }}public static ThreadPoolManager getInstance(a) {
if(threadPoolManager ! =null) {
ThreadPoolExecutor pool = threadPoolManager.pool;
}
returnthreadPoolManager; }}Copy the code
3. Core code
3.1 the parallel flow
Parallel is a parallel core that can find internal multithreading, but it will be sorted after collect, so don’t worry. Small projects can use it. For large projects, it is recommended to use your own thread pool, and the JDK’s own fork/join does not fit the business
System.out.println(Stream.of(1.2.3.4.5.6).parallel().map(l -> {
System.out.println(l);
return l;
}).collect(Collectors.toList()));
Copy the code
3.2 Synchronization Code
This can be done without implementing the thread interface, but still consider discarding when the queue is full
List<ThreadEntity> listEntity = IntStream.range(0.10).mapToObj(x -> new ThreadEntity(x)).collect(Collectors.toList());
List<CompletableFuture<Integer>> listCompletableFuture = listEntity.stream().map(x -> {
try {
return CompletableFuture.supplyAsync(() -> x.countPrice(),
ThreadPoolManager.getInstance().getPool());
} catch (RejectedExecutionException e) {
System.out.println("reject" + x);
log.error("", e);
return null;
}
}).collect(Collectors.toList());
List<Integer> result = listCompletableFuture.stream().map(CompletableFuture::join).collect(Collectors.toList());
System.out.println(result);
Copy the code
3.3 Asynchronous Code
The following code can be directly abbreviated as a line, in the treatment of the asynchronous task extremely convenient CompletableFuture. RunAsync (() – > fun ())
List<ThreadEntity> listEntity = IntStream.range(0.500).mapToObj(x -> new ThreadEntity(x)).collect(Collectors.toList());
List<CompletableFuture> listCompletableFuture = listEntity.stream().map(x -> {
try {
return CompletableFuture.runAsync(() -> x.countPrice(), ThreadPoolManager.getInstance().getPool());
} catch (RejectedExecutionException e) {
System.out.println("reject" + x);
return null;
}
}).collect(Collectors.toList());
listCompletableFuture.stream().map(CompletableFuture::join);
System.out.println("1234");
Copy the code