Welcome to pay attention to the public account “JAVA Front” to view more wonderful sharing articles, mainly including source code analysis, practical application, architecture thinking, workplace sharing, product thinking and so on, at the same time, welcome to add my wechat “JAVA_front” to communicate and learn together


1. Article Overview

According to the Ali development manual, thread pools cannot be created by Executors. Instead, use ThreadPoolExecutor to clarify thread pool operation rules and avoid resource depletion risks. In this article, we analyze from the perspective of resources and troubleshooting, and create a compliant thread pool by referring to the DUBBO thread pool declaration.


2 Resource Perspective

Alibaba JAVA Development manual mainly analyzes this problem from the perspective of resources:

FixedThreadPool and SingleThreadPool allow request queue length integer. MAX_VALUE may pile up requests and result in OOM CachedThreadPool and ScheduledThreadPool The number of threads allowed to be created at integer. MAX_VALUE may create a large number of threads resulting in OOMCopy the code

A FixedThreadPool (SingleThreadPool, FixedThreadPool, SingleThreadPool, FixedThreadPool, SingleThreadPool, FixedThreadPool, SingleThreadPool, FixedThreadPool, SingleThreadPool, FixedThreadPool, SingleThreadPool, FixedThreadPool, SingleThreadPool, FixedThreadPool)

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newSingleThreadExecutor(a) {
    return new FinalizableDelegatedExecutorService
           (new ThreadPoolExecutor(1.1.0L, TimeUnit.MILLISECONDS,
                                   new LinkedBlockingQueue<Runnable>()));
}
Copy the code

CachedThreadPool, ScheduledThreadPool maxSize specifies the maximum number of threads to be created.

public static ExecutorService newCachedThreadPool(a) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

public static ScheduledExecutorService newSingleThreadScheduledExecutor(a) {
    return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());
}
Copy the code


3 Check the Angle of the problem

If you create a thread pool by using Executors, use the following statements:

public void testThread(a) throws Exception {
    ExecutorService fixedExecutor = Executors.newFixedThreadPool(10);
    for (int i = 0; i < 1000; i++) {
        fixedExecutor.execute(new Runnable() {
            @Override
            public void run(a) {
                System.out.println("Public account" JAVA Frontier"); }}); }}Copy the code

The above statement is fine at the functional level, but in a production environment, CPU spikes, thread count increases, and memory leaks can occur, which we often need to observe through thread snapshots. Let’s observe the code thread snapshot above with the jstack command:

"pool-1-thread-2" #525 prio=5 os_prio=0 tid=0x00006f6561039100 nid=0xdaa waiting on condition [0x00006f64e646d000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
parking to wait for <0x00000006e6f3e230> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:165)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1066)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1126)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:616)
at java.lang.Thread.run(Thread.java:645)
Copy the code

We found that no service information could be obtained from the thread cache, and only numbered information such as pool-1-thread-2 was not conducive to troubleshooting. We needed to name the thread.


4 Name the thread

DUBBO, a FixedThreadPool, uses a naming factory to name producer and consumer threads:

public class FixedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {

        // The thread name
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);

        // The default number of threads is 200
        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);

        // The queue capacity defaults to 0
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);

        SynchronousQueue is used when queue capacity equals 0
        // If the queue size is less than 0, use the unbounded queue LinkedBlockingQueue
        // Queue size greater than 0 uses the bounded blocking queue LinkedBlockingQueue
        / / NamedInternalThreadFactory give a name to the thread
        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                                      queues == 0 ? new SynchronousQueue<Runnable>()
                                      : (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                         : new LinkedBlockingQueue<Runnable>(queues)),
                                      new NamedInternalThreadFactory(name, true), newAbortPolicyWithReport(name, url)); }}Copy the code

Producer default thread name DubboServerHandler:

public abstract class AbstractServer extends AbstractEndpoint implements Server {
    protected static final String SERVER_THREAD_POOL_NAME = "DubboServerHandler";
}
Copy the code

Analysis of producer thread snapshot finds DubboServerHandler:

"DubboServerHandler-1.1.1.1:20881-thread-20" #511 daemon prio = 5 os_prio = 0 tid = 0x00001f153121f200 nid = 0xd1a waiting on condition [0x00001f14edcdf000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000001e1f3abc0> (a java.util.concurrent.SynchronousQueue$TransferStack)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java : 115)
at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java : 452)
at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java : 312)
at java.util.concurrent.SynchronousQueue.take(SynchronousQueue.java : 924)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java : 1011)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java : 1121)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java : 111)
Copy the code

Consumer default thread name DubboClientHandler:

public abstract class AbstractClient extends AbstractEndpoint implements Client {
    protected static final String CLIENT_THREAD_POOL_NAME = "DubboClientHandler";
}
Copy the code

Analysis of consumer thread snapshots found DubboClientHandler:

Daemon prio=1 OS_PRIO =0 TID = 0x00001F6114004800 NID = 0x14D8 waiting on "DubboClientHandler-1.1.1.1:20881-thread-10" # daemon prio=1 OS_PRIO =0 TID = 0x00001F6114004800 nID = 0x14D8 waiting on condition [0x00001f63e131a000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000006e21df0d0> (a java.util.concurrent.SynchronousQueue$TransferStack) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:111) at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460) at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:361) at java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:141) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1066) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1111) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:611) at java.lang.Thread.run(Thread.java:141)Copy the code


5 Create a canonical thread pool

We reference the DUBBO thread pool definition naming factory to write the code:

public class NamedInternalThreadFactory extends NamedThreadFactory {
    public NamedInternalThreadFactory(a) {
        super(a); }public NamedInternalThreadFactory(String prefix) {
        super(prefix, false);
    }

    public NamedInternalThreadFactory(String prefix, boolean daemon) {
        super(prefix, daemon);
    }

    @Override
    public Thread newThread(Runnable runnable) {
        String name = mPrefix + mThreadNum.getAndIncrement();
        InternalThread ret = new InternalThread(mGroup, runnable, name, 0);
        ret.setDaemon(mDaemon);
        returnret; }}public class NamedThreadFactory implements ThreadFactory {
    protected static final AtomicInteger POOL_SEQ = new AtomicInteger(1);
    protected final AtomicInteger mThreadNum = new AtomicInteger(1);
    protected final String mPrefix;
    protected final boolean mDaemon;
    protected final ThreadGroup mGroup;

    public NamedThreadFactory(a) {
        this("pool-" + POOL_SEQ.getAndIncrement(), false);
    }

    public NamedThreadFactory(String prefix) {
        this(prefix, false);
    }

    public NamedThreadFactory(String prefix, boolean daemon) {
        mPrefix = prefix + "-thread-";
        mDaemon = daemon;
        SecurityManager s = System.getSecurityManager();
        mGroup = (s == null)? Thread.currentThread().getThreadGroup() : s.getThreadGroup(); }@Override
    public Thread newThread(Runnable runnable) {
        String name = mPrefix + mThreadNum.getAndIncrement();
        Thread ret = new Thread(mGroup, runnable, name, 0);
        ret.setDaemon(mDaemon);
        return ret;
    }

    public ThreadGroup getThreadGroup(a) {
        returnmGroup; }}Copy the code

Define a thread pool and open a business name parameter in the thread pool execution method for the caller to set:

public class ThreadPoolStarter {
    public static ThreadPoolExecutor getExecutor(String threadName) {
        if (executor == null) {
            synchronized (ThreadPoolStarter.class) {
                if (executor == null) {
                    int coreSize = Runtime.getRuntime().availableProcessors();
                    BlockingQueue<Runnable> queueToUse = new LinkedBlockingQueue<Runnable>(QUEUE_SIZE);
                    executor = new ThreadPoolExecutor(coreSize, POOL_CORE_SIZE, MAX_SIZE, TimeUnit.SECONDS, queueToUse, new NamedInternalThreadFactory(threadName, true), newAbortPolicyDoReport(threadName)); }}}returnexecutor; }}public class ThreadExecutor {
    public static void execute(String bizName, Runnable job) {
        ThreadPoolStarter.getExecutor(bizName).execute(job);
    }

    public staticFuture<? > sumbit(String bizName, Runnable job) {returnThreadPoolStarter.getExecutor(bizName).submit(job); }}Copy the code

Write an example to test:

public void testThread(a) throws Exception {
    for (int i = 0; i < 10000; i++) {
        ThreadExecutor.execute("BizName".new Runnable() {
            @Override
            public void run(a) {
                System.out.println("Public account" JAVA Frontier"); }}); Thread.sleep(1000L); }}Copy the code

If you observe the thread snapshot, you can clearly view the service name:

"BizName-thread-8" #262 daemon prio=5 os_prio=0 tid=0x0000000023b5c000 nid=0x31d4 waiting on condition [0x000000003c0be000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006c35781f0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Copy the code


6 Article Summary

This article first introduces the alibaba JAVA development manual Executors are not allowed to be used to create a thread pool this rule, and then from two aspects: resources and troubleshoot problems analyzed why so, finally we refer to DUBBO thread pool declarative way to create a standard thread pool, so using the thread pool will help fast positioning and troubleshoot problems.

Welcome to pay attention to the public account “JAVA Front” to view more wonderful sharing articles, mainly including source code analysis, practical application, architecture thinking, workplace sharing, product thinking and so on, at the same time, welcome to add my wechat “JAVA_front” to communicate and learn together