Hello everyone, today we are going to talk about a practical topic, dynamic monitoring thread pool practice, new open source project (DynamicTp) address at the end of the article, welcome to exchange learning.
Writing in the front
If you have some experience in Java programming, you will know that the essence of Java is in the JUC package, which is the masterpiece of Doug Lea. The evaluation of a programmer’s Java level, to a certain extent, depends on his mastery of some of the technologies under the JUC package. This is also one of the basic technical points that must be asked in the interview.
Juc packages mainly include:
1. AtomicXXX
2. Lock class (XXXLock)
3. Thread synchronization (AQS, CountDownLatch, CyclicBarrier, Semaphore, sanoradiator)
4. Task Executor classes (Executor architecture classes, including today’s hero, ThreadPoolExecutor)
Concurrent collection classes (ConcurrentXXX, CopyOnWriteXXX) related collection classes
6. BlockingQueue class
7. The Future related classes
8. Other auxiliary tools
In multithreaded programming scenarios, these classes are essential skills that can help us write high-quality, high-performance, bug-less code. At the same time, these are some of the more difficult technologies in Java, which need to be persistent, learn to apply, and feel the secrets they bring in the use.
In this article, we will mainly introduce dynamic monitorable thread pools, so we will not expand on the specific content, and we will talk about it separately sometime later. Before reading this article, I hope you have some experience with ThreadPoolExecutor, otherwise it will seem a bit confusing.
If you are not familiar with ThreadPoolExecutor, the following two articles are recommended
Javadoop: www.javadoop.com/post/java-t…
Meituan technology blog: tech.meituan.com/2020/04/02/…
background
Do you have any of the following pain points when using ThreadPoolExecutor?
1. Create a ThreadPoolExecutor, but don’t know which core parameters are appropriate
2. Set parameter values based on experience, and find that it needs to be adjusted after going online. Change the code to restart the service, which is very troublesome
3. The thread pool is a black box to the developer, and the performance is not perceived until something goes wrong
If you have any of these pain points, DynamicTp, which is introduced in this article, may help.
If you look at ThreadPoolExecutor’s source code, you probably know that it provides set methods that can be modified dynamically at runtime. These methods include:
public void setCorePoolSize(int corePoolSize);
public void setMaximumPoolSize(int maximumPoolSize);
public void setKeepAliveTime(long time, TimeUnit unit);
public void setThreadFactory(ThreadFactory threadFactory);
public void setRejectedExecutionHandler(RejectedExecutionHandler handler);
Copy the code
At present, most Internet projects are actually microservitization deployment, and have their own service governance system. The distributed configuration center in microservice components plays the role of dynamic modification of configuration and real-time effect. So can we do dynamic adjustment of thread pool parameters at run time in conjunction with configuration center? The answer is yes, and the configuration center is relatively highly available, so you don’t have to worry too much about configuration push issues, and it also reduces the difficulty and effort of developing dynamic thread pool components.
To sum up, we conclude the following background
- Universality: Thread pooling is already a basic tool of choice for more than 90% of people in Java development to improve system performance
- Uncertainty: Many thread pools can be created in a project, both IO and CPU intensive, but the parameters of the thread pool are not easy to determine; There needs to be a mechanism to adjust parameters dynamically during operation
- It is not perceptive, and the indicators in the running process of the thread pool are generally not perceived; There needs to be a set of monitoring and alarm mechanism in advance, so that developers can sense the running status of the thread pool, timely processing
- With high availability, configuration changes need to be pushed to the client in a timely manner. There needs to be a highly available configuration management push service. Configuration center is a component that is used by most Internet systems today, which can be combined with it to greatly reduce the amount of development and access difficulty
Introduction to the
We extend the thread pool ThreadPoolExecutor based on the configuration center to implement dynamic changes to the running thread pool parameters. And real-time monitoring of the running status of the thread pool, trigger the alarm when the alarm policy, alarm information will be pushed to the office platform (Dingding, Weiwei, etc.). Alarm dimensions include (queue capacity, thread pool activity, reject trigger, etc.); At the same time, thread pool indicator data will be regularly collected for visual use by the monitoring platform. This enables us to be aware of the load of the thread pool at all times and adjust it timely according to the situation to avoid problems affecting online services.
| __ \ | __ __ (_) | | | | | _ _ _ __ __ _ _ __ ___ __ ___ | | _ __ | | | | | | |'_ \ / _` | '_ ` _ | | / __ | |Payable '_ \ | | | | | _ | | | | | (_ | | | | | | | | (__ | | | _) | | _____ / __, |_| |_|__,_|_| |_| |_|_|___|_| .__/ __/ | | | |___/ |_| :: Dynamic Thread Pool ::Copy the code
features
- By referring to the thread pool practice of Meituan, dynamic management of thread pool parameters is implemented, and monitoring and alarm functions are added
- Based on the Spring framework, now only support SpringBoot project use, lightweight, introduced starter can be eaten
- Dynamic adjustment of thread pool parameters based on configuration center takes effect in real time. Integrated mainstream configuration center, default support Nacos, Apollo, also provides SPI interface can be customized extension implementation
- Built-in notification alarm function, providing a variety of alarm dimensions (configuration change notification, active alarm, capacity threshold alarm, reject policy trigger alarm), default support for enterprise wechat, nail alarm, while providing SPI interface can be customized expansion implementation
- The built-in thread pool indicator collection function can be implemented through MicroMeter, JsonLog log output, and Endpoint, and can be customized through the SPI interface
- Integration Management Thread pools of common third-party components have been integrated with SpringBoot built-in WebServer (Tomcat, Undertow, and Jetty) thread pool management
Architecture design
It is mainly divided into four modules
-
Configuration change monitoring module:
1. Listen to the specified configuration file of a specific configuration center (Nacos, Apollo by default), and extend other implementations through the SPI interface provided internally
2. Parse the content of the configuration file, implement the built-in parsing of yML and Properties configuration files, and extend other implementations through the SPI interface provided internally
3. Notify the thread pool management module to refresh
-
Thread pool management module:
1. When the service starts, the configuration information is pulled from the configuration center and the thread pool instance is generated and registered in the internal thread pool registry
2. When the monitoring module detects configuration changes, it transfers the change information to the management module to refresh the thread pool parameters
3. The getExecutor() method in the code gets the thread pool object instance based on the thread pool name
-
Monitoring module:
To collect and output monitoring indicators, the following three methods are provided by default, or other implementations can be extended through internal SPI interfaces
1. Output Json log to disk by default
2.MicroMeter collection and introduction of Micrometer-related dependencies
3. The Endpoint of thunderbolt can be accessed through HTTP
-
Notification and alarm module:
Interconnect with the office platform to realize the alarm notification function. The default implementation is pin and enterprise micro. Other implementations can be extended through the INTERNAL SPI interface
1. Notification of thread pool parameter change
2. The blocked queue capacity reached the threshold
3. The thread pool activity reached the threshold
4. The denial policy alarm is triggered
use
- Maven rely on
- Apollo applications use this dependency for access
<dependency> <groupId>io.github.lyh200</groupId> <artifactId>dynamic-tp-spring-boot-starter-apollo</artifactId> <version>1.0.0</version> </dependency> Copy the code
- Nacos applications in spring-Cloud scenarios access using this dependency
<dependency> <groupId>io.github.lyh200</groupId> <artifactId>dynamic-tp-spring-cloud-starter-nacos</artifactId> <version>1.0.0</version> </dependency> Copy the code
- Nacos applications in non-Spring-Cloud scenarios access using this dependency
<dependency> <groupId>io.github.lyh200</groupId> <artifactId>dynamic-tp-spring-boot-starter-nacos</artifactId> <version>1.0.0</version> </dependency> Copy the code
-
Thread pool configuration
spring: dynamic: tp: enabled: true enabledBanner: true # whether to enable banner printing. Default is true enabledCollect: false # Whether to enable monitoring counter collection. The default value is false collectorType: logging # monitoring data collector type (JsonLog | MicroMeter), the default logging logPath: /home/logs ${user.home}/logs monitorInterval: 5 # Monitoring interval (alarm judgment, indicator collection), 5s by default nacos: # nacos configuration, no default value (rule name-dev.yml) dataId: dynamic-tp-demo-dev.yml group: DEFAULT_GROUP apollo: The first namespace is configured by Apollo namespace: dynamic-tp-demo-dev.yml configType: yml Config file type platforms: Notification alarm platform configuration - platform: wechat urlKey: 3a7500-1287-4bd-a798-c5c3d8b69c # replace receivers: test1,test2 # Recipient's wechat name - platform: ding urlKey: f80dad441fcd655438f4a08dcd6a # replace secret: SECb5441fa6f375d5b9d21 # replace, non-sign mode can not have this value receivers: 15810119805 # Nail phone number tomcatTp: Tomcat Web Server thread pool configuration minSpare: 100 max: 400 jettyTp: # Jetty Web Server thread pool configuration min: 100 max: 400 undertowTp: # Undertow Web Server thread pool configuration ioThreads: 100 workerThreads: 400 executors: Dynamic thread pool configuration - threadPoolName: dynamic-tp-test-1 corePoolSize: 6 maximumPoolSize: 8 queueCapacity: 200 queueType: VariableLinkedBlockingQueue QueueTypeEnum enum class rejectedHandlerType: CallerRunsPolicy View the RejectedTypeEnum enum class keepAliveTime: 50 allowCoreThreadTimeOut: false threadNamePrefix: test # thread name prefix notifyItems: # Alarm items, if not configured, will be automatically configured (change notification, capacity alarm, activity alarm, refuse alarm) - type: capacity NotifyTypeEnum Enum class enabled: true threshold: 80 # Alarm threshold platforms: [ding.wechat] # optional configuration, not configured by default with the configuration of the upper platforms so platform interval: 120 Alarm interval (unit: s) - type: change enabled: true - type: liveness enabled: true threshold: 80 - type: reject enabled: true threshold: 1 Copy the code
-
Generated in code, service startup is automatically registered
@Configuration public class DtpConfig { @Bean public DtpExecutor demo1Executor(a) { return DtpCreator.createDynamicFast("demo1-executor"); } @Bean public ThreadPoolExecutor demo2Executor(a) { return ThreadPoolBuilder.newBuilder() .threadPoolName("demo2-executor") .corePoolSize(8) .maximumPoolSize(16) .keepAliveTime(50) .allowCoreThreadTimeOut(true) .workQueue(QueueTypeEnum.SYNCHRONOUS_QUEUE.getName(), null.false) .rejectedExecutionHandler(RejectedTypeEnum.CALLER_RUNS_POLICY.getName()) .buildDynamic(); }}Copy the code
-
Code call, based on the thread pool name
public static void main(String[] args) { DtpExecutor dtpExecutor = DtpRegistry.getExecutor("dynamic-tp-test-1"); dtpExecutor.execute(() -> System.out.println("test")); } Copy the code
Matters needing attention
-
The parameters configured in the configuration file override those configured through code generation
-
Blocking queue only VariableLinkedBlockingQueue type can modify the capacity, the type and LinkedBlockingQueue similar function, but capacity is not final, you can modify,
The realization of the RabbitMq VariableLinkedBlockingQueue reference
-
If the following log output is displayed, the system is successfully connected
| __ \ | __ __ (_) | | | | | _ _ _ __ __ _ _ __ ___ __ ___ | | _ __ | | | | | | |'_ \ / _` | '_ ` _ | | / __ | |Payable '_ \ | | | | | _ | | | | | (_ | | | | | | | | (__ | | | _) | | _____ / __, |_| |_|__,_|_| |_| |_|_|___|_| .__/ __/ | | | |___/ |_| :: Dynamic Thread Pool :: DynamicTp register, executor: DtpMainPropWrapper(dtpName=dynamic-tp-test-1, corePoolSize=6, maxPoolSize=8, keepAliveTime=50, queueType=VariableLinkedBlockingQueue, queueCapacity=200, rejectType=RejectedCountableCallerRunsPolicy, allowCoreThreadTimeOut=false)Copy the code
-
Configuration changes push notification messages and highlight the changed fields
DynamicTp [dynamic-tp-test-2] refresh end, changed keys: [corePoolSize, queueCapacity], corePoolSize: [6 => 4], maxPoolSize: [8 => 8], queueType: [VariableLinkedBlockingQueue => VariableLinkedBlockingQueue], queueCapacity: [200 => 2000], keepAliveTime: [50s => 50s], rejectedType: [CallerRunsPolicy => CallerRunsPolicy], allowsCoreThreadTimeOut: [false= >false] Copy the code
Inform the police
When the alarm threshold is triggered, the corresponding alarm message will be pushed, and the related fields, such as active alarm, capacity alarm, and reject alarm, will be highlighted
Configuration changes push notification messages and highlight the changed fields
Monitor log
Configure the indicator collection type using the collectType attribute of the main configuration file. The default value is logging
- Micrometer approach: Collect the platform by introducing micrometer dependencies
(e.g. Prometheus, InfluxDb…)
-
Logging: index data in json format output logs to disk, address the logPath/dynamictp / {logPath} / dynamictp/logPath dynamictp / {appName}. The monitor. The log
The 2022-01-16 15:25:20. 599 INFO [DTP - monitor - thread - 1: D.M.L og] {"activeCount":2,"queueSize":100,"largestPoolSize":4,"poolSize":2,"rejectHandlerName":"CallerRunsPolicy","queueCapacity" :1024,"fair":false,"rejectCount":0,"waitTaskCount":10,"taskCount":120,"queueRemainingCapacity":1024,"corePoolSize":6,"qu eueType":"VariableLinkedBlockingQueue","completedTaskCount":1078,"dtpName":"remoting-call","maximumPoolSize":8} The 2022-01-16 15:25:25. 603 INFO [DTP - monitor - thread - 1: D.M.L og] {"activeCount":2,"queueSize":120,"largestPoolSize":4,"poolSize":2,"rejectHandlerName":"CallerRunsPolicy","queueCapacity" :1024,"fair":false,"rejectCount":0,"waitTaskCount":20,"taskCount":140,"queueRemainingCapacity":1024,"corePoolSize":6,"qu eueType":"VariableLinkedBlockingQueue","completedTaskCount":1459,"dtpName":"remoting-call","maximumPoolSize":8} The 2022-01-16 15:25:30. 609 INFO [DTP - monitor - thread - 1: D.M.L og] {"activeCount":2,"queueSize":140,"largestPoolSize":4,"poolSize":2,"rejectHandlerName":"CallerRunsPolicy","queueCapacity" :1024,"fair":false,"rejectCount":0,"waitTaskCount":89,"taskCount":180,"queueRemainingCapacity":1024,"corePoolSize":6,"qu eueType":"VariableLinkedBlockingQueue","completedTaskCount":1890,"dtpName":"remoting-call","maximumPoolSize":8} The 2022-01-16 15:25:35. 613 INFO [DTP - monitor - thread - 1: D.M.L og] {"activeCount":2,"queueSize":160,"largestPoolSize":4,"poolSize":2,"rejectHandlerName":"CallerRunsPolicy","queueCapacity" :1024,"fair":false,"rejectCount":0,"waitTaskCount":99,"taskCount":230,"queueRemainingCapacity":1024,"corePoolSize":6,"qu eueType":"VariableLinkedBlockingQueue","completedTaskCount":2780,"dtpName":"remoting-call","maximumPoolSize":8} The 2022-01-16 15:25:40. 616 INFO [DTP - monitor - thread - 1: D.M.L og] {"activeCount":2,"queueSize":230,"largestPoolSize":4,"poolSize":2,"rejectHandlerName":"CallerRunsPolicy","queueCapacity" :1024,"fair":false,"rejectCount":0,"waitTaskCount":0,"taskCount":300,"queueRemainingCapacity":1024,"corePoolSize":6,"que ueType":"VariableLinkedBlockingQueue","completedTaskCount":4030,"dtpName":"remoting-call","maximumPoolSize":8}Copy the code
-
Expose the EndPoint (dynamic-TP), which can be requested over HTTP
[ { "dtp_name": "remoting-call", "core_pool_size": 8, "maximum_pool_size": 16, "queue_type": "SynchronousQueue", "queue_capacity": 0, "queue_size": 0, "fair": false, "queue_remaining_capacity": 0, "active_count": 2, "task_count": 2760, "completed_task_count": 2760, "largest_pool_size": 16, "pool_size": 8, "wait_task_count": 0, "reject_count": 12462, "reject_handler_name": "CallerRunsPolicy" }, { "max_memory": "220 MB", "total_memory": "140 MB", "free_memory": "44 MB", "usable_memory": "125 MB" } ] Copy the code
The project address
Gitee address: gitee.com/yanhom/dyna…
Github address: github.com/lyh200/dyna…
To contact me
If you have any ideas or suggestions about the project, please add me to wechat for communication, or create issues to improve the project together
Public id: CodeFox
WeChat: yanhom1314