Welcome to pay attention to the public account “JAVA Front” to view more wonderful sharing articles, mainly including source code analysis, practical application, architecture thinking, workplace sharing, product thinking and so on, at the same time, welcome to add my wechat “JAVA_front” to communicate and learn together


0 Article Overview

Traffic peak is often encountered in the Internet production environment, such as commodity buying activities at a certain point in time, or centralized triggering of scheduled tasks at a certain point in time, these scenarios may cause traffic peak, so how to deal with traffic peak we must face the problem.

In the vertical dimension, we can think from the proxy layer, WEB layer, service layer, cache layer and data layer. In the horizontal dimension, we can think from the directions of high frequency detection, cache preposition, node redundancy and service degradation. In this article, we think about dealing with peak traffic from the perspective of dynamic thread pools at the service layer.

Dynamic thread pool means that we can adjust some parameters of the thread pool according to different traffic. For example, we can lower the number of threads in the low peak period and increase the number of threads in the peak period to cope with the traffic peak. In this article, we combine Apollo and thread pools to implement a dynamic thread pool.


1 Thread pool basics

1.1 Seven Parameters

Let’s start with a review of the seven Java thread pool parameters, which will help you set up thread pool parameters later. We look at the ThreadPoolExecutor constructor as follows:


public class ThreadPoolExecutor extends AbstractExecutorService {
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                   null :
                   AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler; }}Copy the code

corePoolSize

The number of core threads in a thread pool, analogous to a fixed window in a business lobby. For example, if the business lobby has two fixed Windows, the two Windows will not be closed and business will be handled all day long

workQueue

Store tasks that have been submitted but not yet executed, analogous to a business lobby waiting area. For example, when the business hall opens, many customers come in. There are two fixed Windows for business handling, and other customers go to the waiting area

maximumPoolSize

A thread pool can hold the maximum number of concurrent threads, similar to the maximum number of Windows in a business lobby. For example, the maximum number of Windows in the business hall is 5. When the salesman sees that the two fixed Windows and the waiting area are full, he can add 3 Windows temporarily

keepAliveTime

Non-core thread lifetime. When the service is not busy, the three new Windows need to be closed. If the idle time exceeds keepAliveTime, the three new Windows will be closed

unit

KeepAliveTime unit of keepAliveTime

threadFactory

Thread factories can be used to specify thread names

handler

The reject policy is executed when the number of threads in the thread pool reaches maximumPoolSize and the queue is full. For example, if all five Windows in the service lobby are busy and the waiting area is full, the attendant selects a rejection policy based on the actual situation


1.2 Four rejection strategies

(1) AbortPolicy

The default policy throws RejectExecutionException to prevent the system from running properly

/**
 * AbortPolicy
 *
 * @authorWechat official account "JAVA Front" * */
public class AbortPolicyTest {
    public static void main(String[] args) {
        int coreSize = 1;
        int maxSize = 2;
        int queueSize = 1;
        AbortPolicy abortPolicy = new ThreadPoolExecutor.AbortPolicy();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize, maxSize, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(queueSize), Executors.defaultThreadFactory(), abortPolicy);
        for (int i = 0; i < 100; i++) {
            executor.execute(new Runnable() {
                @Override
                public void run(a) {
                    System.out.println(Thread.currentThread().getName() + " -> run"); }}); }}}Copy the code

Program execution result:

pool-1-thread-1 -> run
pool-1-thread-2 -> run
pool-1-thread-1 -> run
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.xy.juc.threadpool.reject.AbortPolicyTest$1@70dea4e rejected from java.util.concurrent.ThreadPoolExecutor@5c647e05[Running, pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 2]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
	at com.xy.juc.threadpool.reject.AbortPolicyTest.main(AbortPolicyTest.java:21)
Copy the code


(2) CallerRunsPolicy

The task falls back to the caller to run itself

/**
 * CallerRunsPolicy
 *
 * @authorWechat official account "JAVA Front" * */
public class CallerRunsPolicyTest {
    public static void main(String[] args) {
        int coreSize = 1;
        int maxSize = 2;
        int queueSize = 1;
        CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize, maxSize, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(queueSize), Executors.defaultThreadFactory(), callerRunsPolicy);
        for (int i = 0; i < 10; i++) {
            executor.execute(new Runnable() {
                @Override
                public void run(a) {
                    System.out.println(Thread.currentThread().getName() + " -> run"); }}); }}}Copy the code

Program execution result:

main -> run pool-1-thread-1 -> run pool-1-thread-2 -> run pool-1-thread-1 -> run main -> run main -> run pool-1-thread-2  -> run pool-1-thread-1 -> run main -> run pool-1-thread-2 -> runCopy the code


(3) DiscardOldestPolicy

Discarding the longest waiting task in the queue does not throw an exception

/**
 * DiscardOldestPolicy
 *
 * @authorWechat official account "JAVA Front" * */
public class DiscardOldestPolicyTest {
    public static void main(String[] args) {
        int coreSize = 1;
        int maxSize = 2;
        int queueSize = 1;
        DiscardOldestPolicy discardOldestPolicy = new ThreadPoolExecutor.DiscardOldestPolicy();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize, maxSize, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(queueSize), Executors.defaultThreadFactory(), discardOldestPolicy);
        for (int i = 0; i < 10; i++) {
            executor.execute(new Runnable() {
                @Override
                public void run(a) {
                    System.out.println(Thread.currentThread().getName() + " -> run"); }}); }}}Copy the code

Program execution result:

pool-1-thread-1 -> run
pool-1-thread-2 -> run
pool-1-thread-1 -> run
Copy the code


(4) DiscardPolicy

Dropping a task directly does not throw an exception

/**
 * DiscardPolicy
 *
 * @authorWechat official account "JAVA Front" * */
public class DiscardPolicyTest {
    public static void main(String[] args) {
        int coreSize = 1;
        int maxSize = 2;
        int queueSize = 1;
        DiscardPolicy discardPolicy = new ThreadPoolExecutor.DiscardPolicy();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize, maxSize, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(queueSize), Executors.defaultThreadFactory(), discardPolicy);
        for (int i = 0; i < 10; i++) {
            executor.execute(new Runnable() {
                @Override
                public void run(a) {
                    System.out.println(Thread.currentThread().getName() + " -> run"); }}); }}}Copy the code

Program execution result:

pool-1-thread-1 -> run
pool-1-thread-2 -> run
pool-1-thread-1 -> run
Copy the code


1.3 Modifying Parameters

Can we modify some of the thread pool parameters after initializing the thread pool? The answer is yes. We chose the four modification methods provided by the thread pool for source analysis.

(1) setCorePoolSize

public class ThreadPoolExecutor extends AbstractExecutorService {
    public void setCorePoolSize(int corePoolSize) {
        if (corePoolSize < 0)
            throw new IllegalArgumentException();
        // The number of new core threads minus the number of old core threads
        int delta = corePoolSize - this.corePoolSize;
        // New core thread count assignment
        this.corePoolSize = corePoolSize;
        // If the number of current threads is greater than the number of new core threads
        if (workerCountOf(ctl.get()) > corePoolSize)
            // Interrupt the idle thread
            interruptIdleWorkers();
        // If you need to add a new thread, add a worker thread with addWorker
        else if (delta > 0) {
            int k = Math.min(delta, workQueue.size());
            while (k-- > 0 && addWorker(null.true)) {
                if (workQueue.isEmpty())
                    break; }}}}Copy the code


(2) setMaximumPoolSize

public class ThreadPoolExecutor extends AbstractExecutorService {
    public void setMaximumPoolSize(int maximumPoolSize) {
        if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
            throw new IllegalArgumentException();
        this.maximumPoolSize = maximumPoolSize;
		// If the current number of threads is greater than the new maximum number of threads
        if (workerCountOf(ctl.get()) > maximumPoolSize)
			// Interrupt the idle threadinterruptIdleWorkers(); }}Copy the code


(3) setKeepAliveTime

public class ThreadPoolExecutor extends AbstractExecutorService {
    public void setKeepAliveTime(long time, TimeUnit unit) {
        if (time < 0)
            throw new IllegalArgumentException();
        if (time == 0 && allowsCoreThreadTimeOut())
            throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
        long keepAliveTime = unit.toNanos(time);
        // New timeout minus old timeout
        long delta = keepAliveTime - this.keepAliveTime;
        this.keepAliveTime = keepAliveTime;
        // If the new timeout is smaller than the old one
        if (delta < 0)
            // Interrupt the idle threadinterruptIdleWorkers(); }}Copy the code


(4) setRejectedExecutionHandler

public class ThreadPoolExecutor extends AbstractExecutorService {
    public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
        if (handler == null)
            throw new NullPointerException();
        // Set the rejection policy
        this.handler = handler; }}Copy the code

Now we know how the thread pool system adjusts parameters above, but this analysis is not enough, because if there is no way to adjust parameters dynamically, each change must be republished to take effect, is there a way to dynamically adjust thread pool parameters without publishing?


2 Apollo Configuration center

2.1 Core Principles

Apollo is a distributed configuration center developed by the Framework department of Ctrip. It can centrally manage configurations in different environments and clusters, and push configurations to applications in real time after modification. It has standardized permissions and process governance features, and is suitable for micro-service configuration management scenarios. Apollo open source address:

https://github.com/ctripcorp/apollo
Copy the code

When we use the configuration center, in the first step, users modify configuration items in the configuration center. In the second step, the configuration center notifies The Apollo client of configuration updates. In the third step, the Apollo client pulls the latest configuration from the configuration center, updates the local configuration and notifies the application.


How does the client sense when a configuration item in the configuration center changes? Divided into push and pull two ways. The push dependent client maintains a long connection with the server. When data changes, the server pushes information to the client. This is the long polling mechanism. Pull depends on the client to periodically pull the latest application configuration from the configuration center server. This is a fallback mechanism. The design drawing of the official website client is as follows:


This article focuses on the analysis of configuration update push mode, we first look at the official website server design:



The ConfigService module provides functions such as read and push configurations. The service object is Apollo client. The AdminService module provides functions such as modifying and publishing configurations. The service object is the Portal module, that is, the management interface. Note that Apollo does not refer to messaging middleware. In the figure, sending asynchronous messages means that ConfigService periodically scans the data table of asynchronous messages:

The message data is stored in the MySQL message table:

CREATE TABLE `releasemessage` (
  `Id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT 'Increment primary key',
  `Message` varchar(1024) NOT NULL DEFAULT ' ' COMMENT 'Published message content',
  `DataChange_LastTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Last modified time'.PRIMARY KEY (`Id`),
  KEY `DataChange_LastTime` (`DataChange_LastTime`),
  KEY `IX_Message` (`Message`(191))
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COMMENT='Post a message'
Copy the code

The core principles of Apollo are discussed here for the moment, and I will write a follow-up article to analyze Apollo’s long polling mechanism through source code. Please stay tuned.


2.2 Case Analysis

2.2.1 Server Installation

The key steps for the server are to import the database and change the port number. For details, see the official website:

https://ctripcorp.github.io/apollo/#/zh/deployment/quick-start
Copy the code

Address to access after successful startup:

http://localhost:8070
Copy the code



Enter the user name Apollo and password admin to log in:



The DEV environment, the default cluster, and the application namespace contain a timeout configuration item. 100 is the value of this configuration item.



2.2.2 Applications

(1) Introducing dependencies

<dependencies>
    <dependency>
	<groupId>com.ctrip.framework.apollo</groupId>
	<artifactId>apollo-client</artifactId>
	<version>1.7.0</version>
    </dependency>
</dependencies>	
Copy the code

(2) Simple examples

public class GetApolloConfigTest extends BaseTest {

    /** * -Dapp.id=myApp -Denv=DEV -Dapollo.cluster=default -Ddev_meta=http://localhost:8080 * * myApp+DEV+default+application */
    @Test
    public void testGet(a) throws InterruptedException {
        Config appConfig = ConfigService.getAppConfig();
        while (true) {
            String value = appConfig.getProperty("timeout"."200");
            System.out.println("timeout=" + value);
            TimeUnit.SECONDS.sleep(1); }}}Copy the code

Since the above program continuously fetches the value of the configuration item through while(true), the program outputs the following:

timeout=100
timeout=100
timeout=100
timeout=100
timeout=100
timeout=100
Copy the code

We now change the value of the configuration item to 200 and the program outputs the following:

timeout=100
timeout=100
timeout=100
timeout=100
timeout=200
timeout=200
timeout=200
Copy the code

(3) Listening instances

In production, we don’t normally listen for changes while(true). Instead, we register listeners to sense changes:

public class GetApolloConfigTest extends BaseTest {

    /** * Listen for namespace changes ** -dapp. id= myapp-denv = dev-dapollo. cluster= default-ddev_meta =http://localhost:8080 ** myApp+DEV+default+application */
    @Test
    public void testListen(a) throws InterruptedException {
        Config config = ConfigService.getConfig("application");
        config.addChangeListener(new ConfigChangeListener() {
            @Override
            public void onChange(ConfigChangeEvent changeEvent) {
                System.out.println("Change namespace =" + changeEvent.getNamespace());
                for (String key : changeEvent.changedKeys()) {
                    ConfigChange change = changeEvent.getChange(key);
                    System.out.println(String.format(Change key=%s,oldValue=%s,newValue=%s,changeType=%s, change.getPropertyName(), change.getOldValue(), change.getNewValue(), change.getChangeType())); }}}); Thread.sleep(1000000L); }}Copy the code

We now change the timeout value from 200 to 300, and the program outputs:

Namespace changed = Application changed Key =timeout,oldValue=200,newValue=300,changeType=MODIFIEDCopy the code


Dynamic thread pools

Now let’s combine a thread pool with Apollo to build a dynamic thread pool. It’s not too complicated to write with this knowledge. We first build a thread pool with default values, and the pool listens to Apollo for configuration items and flushes parameters if the configuration changes. The first step is to set three thread pool parameters in the Apollo configuration center (the principle is the same without setting rejection policy in this article) :



Step 2 Write core code:

/** * Dynamic thread pool factory **@authorWechat official account "JAVA Front" * */
@Slf4j
@Component
public class DynamicThreadPoolFactory {
    private static final String NAME_SPACE = "threadpool-config";

    /** thread executor **/
    private volatile ThreadPoolExecutor executor;

    /** Number of core threads **/
    private Integer CORE_SIZE = 10;

    /** Maximum number of threads **/
    private Integer MAX_SIZE = 20;

    /** Queue length **/
    private Integer QUEUE_SIZE = 2000;

    /** Thread lifetime **/
    private Long KEEP_ALIVE_TIME = 1000L;

    /** Thread name **/
    private String threadName;

    public DynamicThreadPoolFactory(a) {
        Config config = ConfigService.getConfig(NAME_SPACE);
        init(config);
        listen(config);
    }

    /** * initializes */
    private void init(Config config) {
        if (executor == null) {
            synchronized (DynamicThreadPoolFactory.class) {
                if (executor == null) {
                    String coreSize = config.getProperty(KeysEnum.CORE_SIZE.getNodeKey(), CORE_SIZE.toString());
                    String maxSize = config.getProperty(KeysEnum.MAX_SIZE.getNodeKey(), MAX_SIZE.toString());
                    String keepAliveTIme = config.getProperty(KeysEnum.KEEP_ALIVE_TIME.getNodeKey(), KEEP_ALIVE_TIME.toString());
                    BlockingQueue<Runnable> queueToUse = new LinkedBlockingQueue<Runnable>(QUEUE_SIZE);
                    executor = new ThreadPoolExecutor(Integer.valueOf(coreSize), Integer.valueOf(maxSize), Long.valueOf(keepAliveTIme), TimeUnit.MILLISECONDS, queueToUse, new NamedThreadFactory(threadName, true), newAbortPolicyDoReport(threadName)); }}}}/** * listener */
    private void listen(Config config) {
        config.addChangeListener(new ConfigChangeListener() {
            @Override
            public void onChange(ConfigChangeEvent changeEvent) {
                log.info("Namespace changes ={}", changeEvent.getNamespace());
                for (String key : changeEvent.changedKeys()) {
                    ConfigChange change = changeEvent.getChange(key);
                    String newValue = change.getNewValue();
                    refreshThreadPool(key, newValue);
                    log.info("Change the key = {}, oldValue = {}, newValue = {}, changeType = {}", change.getPropertyName(), change.getOldValue(), change.getNewValue(), change.getChangeType()); }}}); }/** * refresh the thread pool */
    private void refreshThreadPool(String key, String newValue) {
        if (executor == null) {
            return;
        }
        if (KeysEnum.CORE_SIZE.getNodeKey().equals(key)) {
            executor.setCorePoolSize(Integer.valueOf(newValue));
            log.info(Key ={},value={}, key, newValue);
        }
        if (KeysEnum.MAX_SIZE.getNodeKey().equals(key)) {
            executor.setMaximumPoolSize(Integer.valueOf(newValue));
            log.info(Key ={},value={}", key, newValue);
        }
        if (KeysEnum.KEEP_ALIVE_TIME.getNodeKey().equals(key)) {
            executor.setKeepAliveTime(Integer.valueOf(newValue), TimeUnit.MILLISECONDS);
            log.info(Key ={},value={}", key, newValue); }}public ThreadPoolExecutor getExecutor(String threadName) {
        return executor;
    }

    enum KeysEnum {

        CORE_SIZE("coreSize".Number of core threads),

        MAX_SIZE("maxSize"."Maximum threads"),

        KEEP_ALIVE_TIME("keepAliveTime"."Thread active time");private String nodeKey;
        private String desc;

        KeysEnum(String nodeKey, String desc) {
            this.nodeKey = nodeKey;
            this.desc = desc;
        }

        public String getNodeKey(a) {
            return nodeKey;
        }

        public void setNodeKey(String nodeKey) {
            this.nodeKey = nodeKey;
        }

        public String getDesc(a) {
            return desc;
        }

        public void setDesc(String desc) {
            this.desc = desc; }}}/** * dynamic thread pool executor **@authorWechat official account "JAVA Front" * */
@Component
public class DynamicThreadExecutor {

	@Resource
	private DynamicThreadPoolFactory threadPoolFactory;

	public void execute(String bizName, Runnable job) {
		threadPoolFactory.getExecutor(bizName).execute(job);
	}

	publicFuture<? > sumbit(String bizName, Runnable job) {returnthreadPoolFactory.getExecutor(bizName).submit(job); }}Copy the code

Step 3 Run the test case and observe the thread count with VisualVM:

/** * Dynamic thread pool test **@authorWechat official account "JAVA Front" * */
public class DynamicThreadExecutorTest extends BaseTest {

    @Resource
    private DynamicThreadExecutor dynamicThreadExecutor;

    /** * -Dapp.id=myApp -Denv=DEV -Dapollo.cluster=default -Ddev_meta=http://localhost:8080 * * myApp+DEV+default+thread-pool */
    @Test
    public void testExecute(a) throws InterruptedException {
        while (true) {
            dynamicThreadExecutor.execute("bizName".new Runnable() {
                @Override
                public void run(a) {
                    System.out.println("bizInfo"); }}); TimeUnit.SECONDS.sleep(1); }}}Copy the code



We modify the configuration item in the configuration center to set the number of core threads to 50 and the maximum number of threads to 100:



A significant increase in the number of threads can be observed with VisualVM:



4 Article Summary

In this paper, we first introduce the basic knowledge of thread pool, including seven parameters and four rejection strategies, and then we introduce the principle and application of Apollo configuration center. Finally, we combine the thread pool and configuration center to achieve the effect of dynamic adjustment of the number of threads. I hope this paper is helpful to you.


Welcome to pay attention to the public account “JAVA Front” to view more wonderful sharing articles, mainly including source code analysis, practical application, architecture thinking, workplace sharing, product thinking and so on, at the same time, welcome to add my wechat “JAVA_front” to communicate and learn together