Hello everyone, today we are going to talk about a practical topic, dynamic monitoring thread pool practice, new open source project (DynamicTp) address at the end of the article, welcome to exchange learning.


Writing in the front

If you have some experience in Java programming, you will know that the essence of Java is in the JUC package, which is the masterpiece of Doug Lea. The evaluation of a programmer’s Java level, to a certain extent, depends on his mastery of some of the technologies under the JUC package. This is also one of the basic technical points that must be asked in the interview.

Juc packages mainly include:

1. AtomicXXX

2. Lock class (XXXLock)

3. Thread synchronization (AQS, CountDownLatch, CyclicBarrier, Semaphore, sanoradiator)

4. Task Executor classes (Executor architecture classes, including today’s hero, ThreadPoolExecutor)

Concurrent collection classes (ConcurrentXXX, CopyOnWriteXXX) related collection classes

6. BlockingQueue class

7. The Future related classes

8. Other auxiliary tools

In multithreaded programming scenarios, these classes are essential skills that can help us write high-quality, high-performance, bug-less code. At the same time, these are some of the more difficult technologies in Java, which need to be persistent, learn to apply, and feel the secrets they bring in the use.

In this article, we will mainly introduce dynamic monitorable thread pools, so we will not expand on the specific content, and we will talk about it separately sometime later. Before reading this article, I hope you have some experience with ThreadPoolExecutor, otherwise it will seem a bit confusing.

If you are not familiar with ThreadPoolExecutor, the following two articles are recommended

Javadoop: www.javadoop.com/post/java-t…

Meituan technology blog: tech.meituan.com/2020/04/02/…


background

Do you have any of the following pain points when using ThreadPoolExecutor?

1. Create a ThreadPoolExecutor, but don’t know which core parameters are appropriate

2. Set parameter values based on experience, and find that it needs to be adjusted after going online. Change the code to restart the service, which is very troublesome

3. The thread pool is a black box to the developer, and the performance is not perceived until something goes wrong

If you have any of these pain points, DynamicTp, which is introduced in this article, may help.

If you look at ThreadPoolExecutor’s source code, you probably know that it provides set methods that can be modified dynamically at runtime. These methods include:

public void setCorePoolSize(int corePoolSize);
public void setMaximumPoolSize(int maximumPoolSize);
public void setKeepAliveTime(long time, TimeUnit unit);
public void setThreadFactory(ThreadFactory threadFactory);
public void setRejectedExecutionHandler(RejectedExecutionHandler handler);
Copy the code

At present, most Internet projects are actually microservitization deployment, and have their own service governance system. The distributed configuration center in microservice components plays the role of dynamic modification of configuration and real-time effect. So can we do dynamic adjustment of thread pool parameters at run time in conjunction with configuration center? The answer is yes, and the configuration center is relatively highly available, so you don’t have to worry too much about configuration push issues, and it also reduces the difficulty and effort of developing dynamic thread pool components.

To sum up, we conclude the following background

  • Universality: Thread pooling is already a basic tool of choice for more than 90% of people in Java development to improve system performance
  • Uncertainty: Many thread pools can be created in a project, both IO and CPU intensive, but the parameters of the thread pool are not easy to determine; There needs to be a mechanism to adjust parameters dynamically during operation
  • It is not perceptive, and the indicators in the running process of the thread pool are generally not perceived; There needs to be a set of monitoring and alarm mechanism in advance, so that developers can sense the running status of the thread pool, timely processing
  • With high availability, configuration changes need to be pushed to the client in a timely manner. There needs to be a highly available configuration management push service. Configuration center is a component that is used by most Internet systems today, which can be combined with it to greatly reduce the amount of development and access difficulty

Introduction to the

We extend the thread pool ThreadPoolExecutor based on the configuration center to implement dynamic changes to the running thread pool parameters. And real-time monitoring of the running status of the thread pool, trigger the alarm when the alarm policy, alarm information will be pushed to the office platform (Dingding, Weiwei, etc.). Alarm dimensions include (queue capacity, thread pool activity, reject trigger, etc.); At the same time, thread pool indicator data will be regularly collected for visual use by the monitoring platform. This enables us to be aware of the load of the thread pool at all times and adjust it timely according to the situation to avoid problems affecting online services.

| __ \ | __ __ (_) | | | | | _ _ _ __ __ _ _ __ ___ __ ___ | | _ __ | | | | | | |'_ \ / _` | '_ ` _ | | / __ | |Payable '_ \ | | | | | _ | | | | | (_ | | | | | | | | (__ | | | _) | | _____ / __, |_| |_|__,_|_| |_| |_|_|___|_| .__/ __/ | | | |___/ |_| :: Dynamic Thread Pool ::Copy the code

features

  • By referring to the thread pool practice of Meituan, dynamic management of thread pool parameters is implemented, and monitoring and alarm functions are added
  • Based on the Spring framework, now only support SpringBoot project use, lightweight, introduced starter can be eaten
  • Dynamic adjustment of thread pool parameters based on configuration center takes effect in real time. Integrated mainstream configuration center, default support Nacos, Apollo, also provides SPI interface can be customized extension implementation
  • Built-in notification alarm function, providing a variety of alarm dimensions (configuration change notification, active alarm, capacity threshold alarm, reject policy trigger alarm), default support for enterprise wechat, nail alarm, while providing SPI interface can be customized expansion implementation
  • The built-in thread pool indicator collection function can be implemented through MicroMeter, JsonLog log output, and Endpoint, and can be customized through the SPI interface
  • Integration Management Thread pools of common third-party components have been integrated with SpringBoot built-in WebServer (Tomcat, Undertow, and Jetty) thread pool management

Architecture design

It is mainly divided into four modules

  • Configuration change monitoring module:

    1. Listen to the specified configuration file of a specific configuration center (Nacos, Apollo by default), and extend other implementations through the SPI interface provided internally

    2. Parse the content of the configuration file, implement the built-in parsing of yML and Properties configuration files, and extend other implementations through the SPI interface provided internally

    3. Notify the thread pool management module to refresh

  • Thread pool management module:

    1. When the service starts, the configuration information is pulled from the configuration center and the thread pool instance is generated and registered in the internal thread pool registry

    2. When the monitoring module detects configuration changes, it transfers the change information to the management module to refresh the thread pool parameters

    3. The getExecutor() method in the code gets the thread pool object instance based on the thread pool name

  • Monitoring module:

    To collect and output monitoring indicators, the following three methods are provided by default, or other implementations can be extended through internal SPI interfaces

    1. Output Json log to disk by default

    2.MicroMeter collection and introduction of Micrometer-related dependencies

    3. The Endpoint of thunderbolt can be accessed through HTTP

  • Notification and alarm module:

    Interconnect with the office platform to realize the alarm notification function. The default implementation is pin and enterprise micro. Other implementations can be extended through the INTERNAL SPI interface

    1. Notification of thread pool parameter change

    2. The blocked queue capacity reached the threshold

    3. The thread pool activity reached the threshold

    4. The denial policy alarm is triggered


use

  • Maven rely on
  1. Apollo applications use this dependency for access
        <dependency>
            <groupId>io.github.lyh200</groupId>
            <artifactId>dynamic-tp-spring-boot-starter-apollo</artifactId>
            <version>1.0.0</version>
        </dependency>
    Copy the code
  2. Nacos applications in spring-Cloud scenarios access using this dependency
        <dependency>
            <groupId>io.github.lyh200</groupId>
            <artifactId>dynamic-tp-spring-cloud-starter-nacos</artifactId>
            <version>1.0.0</version>
        </dependency>
    Copy the code
  3. Nacos applications in non-Spring-Cloud scenarios access using this dependency
        <dependency>
            <groupId>io.github.lyh200</groupId>
            <artifactId>dynamic-tp-spring-boot-starter-nacos</artifactId>
            <version>1.0.0</version>
        </dependency>
    Copy the code
  • Thread pool configuration

    spring:
      dynamic:
        tp:
          enabled: true
          enabledBanner: true        # whether to enable banner printing. Default is true
          enabledCollect: false      # Whether to enable monitoring counter collection. The default value is false
          collectorType: logging     # monitoring data collector type (JsonLog | MicroMeter), the default logging
          logPath: /home/logs        ${user.home}/logs
          monitorInterval: 5         # Monitoring interval (alarm judgment, indicator collection), 5s by default
          nacos:                     # nacos configuration, no default value (rule name-dev.yml)
            dataId: dynamic-tp-demo-dev.yml
            group: DEFAULT_GROUP
          apollo:                    The first namespace is configured by Apollo
            namespace: dynamic-tp-demo-dev.yml
          configType: yml            Config file type
          platforms:                 Notification alarm platform configuration
            - platform: wechat
              urlKey: 3a7500-1287-4bd-a798-c5c3d8b69c  # replace
              receivers: test1,test2                   # Recipient's wechat name
            - platform: ding
              urlKey: f80dad441fcd655438f4a08dcd6a     # replace
              secret: SECb5441fa6f375d5b9d21           # replace, non-sign mode can not have this value
              receivers: 15810119805                   # Nail phone number
          tomcatTp:                                    Tomcat Web Server thread pool configuration
              minSpare: 100
              max: 400      
          jettyTp:                                     # Jetty Web Server thread pool configuration
              min: 100
              max: 400     
          undertowTp:                                  # Undertow Web Server thread pool configuration
              ioThreads: 100
              workerThreads: 400      
          executors:                                   Dynamic thread pool configuration
            - threadPoolName: dynamic-tp-test-1
              corePoolSize: 6
              maximumPoolSize: 8
              queueCapacity: 200
              queueType: VariableLinkedBlockingQueue   QueueTypeEnum enum class
              rejectedHandlerType: CallerRunsPolicy    View the RejectedTypeEnum enum class
              keepAliveTime: 50
              allowCoreThreadTimeOut: false
              threadNamePrefix: test           # thread name prefix
              notifyItems:                     # Alarm items, if not configured, will be automatically configured (change notification, capacity alarm, activity alarm, refuse alarm)
                - type: capacity               NotifyTypeEnum Enum class
                  enabled: true
                  threshold: 80                # Alarm threshold
                  platforms: [ding.wechat]     # optional configuration, not configured by default with the configuration of the upper platforms so platform
                  interval: 120                Alarm interval (unit: s)
                - type: change
                  enabled: true
                - type: liveness
                  enabled: true
                  threshold: 80
                - type: reject
                  enabled: true
                  threshold: 1
    Copy the code
  • Generated in code, service startup is automatically registered

    @Configuration
    public class DtpConfig {
    
       @Bean
       public DtpExecutor demo1Executor(a) {
           return DtpCreator.createDynamicFast("demo1-executor");
      }
    
       @Bean
       public ThreadPoolExecutor demo2Executor(a) {
           return ThreadPoolBuilder.newBuilder()
                  .threadPoolName("demo2-executor")
                  .corePoolSize(8)
                  .maximumPoolSize(16)
                  .keepAliveTime(50)
                  .allowCoreThreadTimeOut(true)
                  .workQueue(QueueTypeEnum.SYNCHRONOUS_QUEUE.getName(), null.false) .rejectedExecutionHandler(RejectedTypeEnum.CALLER_RUNS_POLICY.getName()) .buildDynamic(); }}Copy the code
  • Code call, based on the thread pool name

    public static void main(String[] args) {
           DtpExecutor dtpExecutor = DtpRegistry.getExecutor("dynamic-tp-test-1");
           dtpExecutor.execute(() -> System.out.println("test"));
    }
    Copy the code

Matters needing attention

  1. The parameters configured in the configuration file override those configured through code generation

  2. Blocking queue only VariableLinkedBlockingQueue type can modify the capacity, the type and LinkedBlockingQueue similar function, but capacity is not final, you can modify,

The realization of the RabbitMq VariableLinkedBlockingQueue reference

  1. If the following log output is displayed, the system is successfully connected

    | __ \ | __ __ (_) | | | | | _ _ _ __ __ _ _ __ ___ __ ___ | | _ __ | | | | | | |'_ \ / _` | '_ ` _ | | / __ | |Payable '_ \ | | | | | _ | | | | | (_ | | | | | | | | (__ | | | _) | | _____ / __, |_| |_|__,_|_| |_| |_|_|___|_| .__/ __/ | | | |___/ |_| :: Dynamic Thread Pool :: DynamicTp register, executor: DtpMainPropWrapper(dtpName=dynamic-tp-test-1, corePoolSize=6, maxPoolSize=8, keepAliveTime=50, queueType=VariableLinkedBlockingQueue, queueCapacity=200, rejectType=RejectedCountableCallerRunsPolicy, allowCoreThreadTimeOut=false)Copy the code
  2. Configuration changes push notification messages and highlight the changed fields

    
    DynamicTp [dynamic-tp-test-2] refresh end, changed keys: [corePoolSize, queueCapacity], corePoolSize: [6 => 4], maxPoolSize: [8 => 8], queueType: [VariableLinkedBlockingQueue => VariableLinkedBlockingQueue], queueCapacity: [200 => 2000], keepAliveTime: [50s => 50s], rejectedType: [CallerRunsPolicy => CallerRunsPolicy], allowsCoreThreadTimeOut: [false= >false]
    Copy the code

Inform the police

When the alarm threshold is triggered, the corresponding alarm message will be pushed, and the related fields, such as active alarm, capacity alarm, and reject alarm, will be highlighted

Configuration changes push notification messages and highlight the changed fields


Monitor log

Configure the indicator collection type using the collectType attribute of the main configuration file. The default value is logging

  • Micrometer approach: Collect the platform by introducing micrometer dependencies

(e.g. Prometheus, InfluxDb…)

  • Logging: index data in json format output logs to disk, address the logPath/dynamictp / {logPath} / dynamictp/logPath dynamictp / {appName}. The monitor. The log

    The 2022-01-16 15:25:20. 599 INFO [DTP - monitor - thread - 1: D.M.L og] {"activeCount":2,"queueSize":100,"largestPoolSize":4,"poolSize":2,"rejectHandlerName":"CallerRunsPolicy","queueCapacity" :1024,"fair":false,"rejectCount":0,"waitTaskCount":10,"taskCount":120,"queueRemainingCapacity":1024,"corePoolSize":6,"qu eueType":"VariableLinkedBlockingQueue","completedTaskCount":1078,"dtpName":"remoting-call","maximumPoolSize":8} The 2022-01-16 15:25:25. 603 INFO [DTP - monitor - thread - 1: D.M.L og] {"activeCount":2,"queueSize":120,"largestPoolSize":4,"poolSize":2,"rejectHandlerName":"CallerRunsPolicy","queueCapacity" :1024,"fair":false,"rejectCount":0,"waitTaskCount":20,"taskCount":140,"queueRemainingCapacity":1024,"corePoolSize":6,"qu eueType":"VariableLinkedBlockingQueue","completedTaskCount":1459,"dtpName":"remoting-call","maximumPoolSize":8} The 2022-01-16 15:25:30. 609 INFO [DTP - monitor - thread - 1: D.M.L og] {"activeCount":2,"queueSize":140,"largestPoolSize":4,"poolSize":2,"rejectHandlerName":"CallerRunsPolicy","queueCapacity" :1024,"fair":false,"rejectCount":0,"waitTaskCount":89,"taskCount":180,"queueRemainingCapacity":1024,"corePoolSize":6,"qu eueType":"VariableLinkedBlockingQueue","completedTaskCount":1890,"dtpName":"remoting-call","maximumPoolSize":8} The 2022-01-16 15:25:35. 613 INFO [DTP - monitor - thread - 1: D.M.L og] {"activeCount":2,"queueSize":160,"largestPoolSize":4,"poolSize":2,"rejectHandlerName":"CallerRunsPolicy","queueCapacity" :1024,"fair":false,"rejectCount":0,"waitTaskCount":99,"taskCount":230,"queueRemainingCapacity":1024,"corePoolSize":6,"qu eueType":"VariableLinkedBlockingQueue","completedTaskCount":2780,"dtpName":"remoting-call","maximumPoolSize":8} The 2022-01-16 15:25:40. 616 INFO [DTP - monitor - thread - 1: D.M.L og] {"activeCount":2,"queueSize":230,"largestPoolSize":4,"poolSize":2,"rejectHandlerName":"CallerRunsPolicy","queueCapacity" :1024,"fair":false,"rejectCount":0,"waitTaskCount":0,"taskCount":300,"queueRemainingCapacity":1024,"corePoolSize":6,"que ueType":"VariableLinkedBlockingQueue","completedTaskCount":4030,"dtpName":"remoting-call","maximumPoolSize":8}Copy the code
  • Expose the EndPoint (dynamic-TP), which can be requested over HTTP

    [
        {
            "dtp_name": "remoting-call",
            "core_pool_size": 8,
            "maximum_pool_size": 16,
            "queue_type": "SynchronousQueue",
            "queue_capacity": 0,
            "queue_size": 0,
            "fair": false,
            "queue_remaining_capacity": 0,
            "active_count": 2,
            "task_count": 2760,
            "completed_task_count": 2760,
            "largest_pool_size": 16,
            "pool_size": 8,
            "wait_task_count": 0,
            "reject_count": 12462,
            "reject_handler_name": "CallerRunsPolicy"
        },
        {
            "max_memory": "220 MB",
            "total_memory": "140 MB",
            "free_memory": "44 MB",
            "usable_memory": "125 MB"
        }
    ]
    Copy the code

The project address

Gitee address: gitee.com/yanhom/dyna…

Github address: github.com/lyh200/dyna…


To contact me

If you have any ideas or suggestions about the project, please add me to wechat for communication, or create issues to improve the project together

Public id: CodeFox

WeChat: yanhom1314