The premise

In a recent project involving file upload and download, the thread pool ThreadPoolExecutor used for JUC was running at full load at some point in the production environment. The application interface calls were not responding to the full load due to the CallerRunsPolicy reject policy. In suspended animation. Considering the previous monitoring system built with Micrometer + Prometheus + Grafana, we considered using Micrometer to do an active thread pool measurement data collection, which could eventually be displayed in relative real time in the grafana panel.

Practice process

The following through the real combat process to do a simulation example for redisk.

Code transformation

First we need to sort out the mapping between the metrics items provided in ThreadPoolExecutor and micrometer tags:

  • Thread pool name, Tag:thread.pool.nameIf the IOC container is used to manage thread pool data, use BeanName instead.
  • int getCorePoolSize(): number of core threads, Tag:thread.pool.core.size.
  • int getLargestPoolSize(): Historical peak number of threads, Tag:thread.pool.largest.size.
  • int getMaximumPoolSize(): maximum number of threads (thread pool thread capacity), Tag:thread.pool.max.size.
  • int getActiveCount(): Number of active threads, Tag:thread.pool.active.size.
  • int getPoolSize(): The total number of threads (core and non-core) running in the current thread pool, Tag:thread.pool.thread.count.
  • Total number of backlog tasks in the current task queue, Tag:thread.pool.queue.sizeThis is going to have to be computed dynamically.

Then write specific code to achieve the following functions:

  • 1. Build oneThreadPoolExecutorExample, the number of core threads and maximum threads is 10, the length of the task queue is 10, and the rejection policy isAbortPolicy.
  • 2. Provide two methods to simulate short and long time consuming tasks using thread pool instances respectively.
  • Provide a method to empty the task queue in the thread pool instance.
  • 4. Provide a single thread pool for scheduled collectionThreadPoolExecutorThe measures listed above in the instance are saved to the Micrometer memory state collector.

Since the values of these statistics fluctuate with time, the Gauge Meter can be used for recording.

// ThreadPoolMonitor
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Service;

import java.util.Collections;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/ * * *@author throwable
 * @versionV1.0 *@description
 * @since2019/4/7 21:02 * /
@Service
public class ThreadPoolMonitor implements InitializingBean {
    
	private static final String EXECUTOR_NAME = "ThreadPoolMonitorSample";
	private static final Iterable<Tag> TAG = Collections.singletonList(Tag.of("thread.pool.name", EXECUTOR_NAME));
	private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();

	private final ThreadPoolExecutor executor = new ThreadPoolExecutor(10.10.0, TimeUnit.SECONDS,
			new ArrayBlockingQueue<>(10), new ThreadFactory() {

		private final AtomicInteger counter = new AtomicInteger();

		@Override
		public Thread newThread(Runnable r) {
			Thread thread = new Thread(r);
			thread.setDaemon(true);
			thread.setName("thread-pool-" + counter.getAndIncrement());
			returnthread; }},new ThreadPoolExecutor.AbortPolicy());


	private Runnable monitor = () -> {
		// The exception needs to be caught, even though no exception is actually generated, but you must prevent the exception from invalidation of the scheduled thread pool thread
		try {
			Metrics.gauge("thread.pool.core.size", TAG, executor, ThreadPoolExecutor::getCorePoolSize);
			Metrics.gauge("thread.pool.largest.size", TAG, executor, ThreadPoolExecutor::getLargestPoolSize);
			Metrics.gauge("thread.pool.max.size", TAG, executor, ThreadPoolExecutor::getMaximumPoolSize);
			Metrics.gauge("thread.pool.active.size", TAG, executor, ThreadPoolExecutor::getActiveCount);
			Metrics.gauge("thread.pool.thread.count", TAG, executor, ThreadPoolExecutor::getPoolSize);
			// Note that if the blocking queue uses an unbounded queue, we cannot take size directly
			Metrics.gauge("thread.pool.queue.size", TAG, executor, e -> e.getQueue().size());
		} catch (Exception e) {
			//ignore}};@Override
	public void afterPropertiesSet(a) throws Exception {
		// Execute every 5 seconds
		scheduledExecutor.scheduleWithFixedDelay(monitor, 0.5, TimeUnit.SECONDS);
	}

	public void shortTimeWork(a) {
		executor.execute(() -> {
			try {
				/ / 5 seconds
				Thread.sleep(5000);
			} catch (InterruptedException e) {
				//ignore}}); }public void longTimeWork(a) {
		executor.execute(() -> {
			try {
				/ / 500 seconds
				Thread.sleep(5000 * 100);
			} catch (InterruptedException e) {
				//ignore}}); }public void clearTaskQueue(a) { executor.getQueue().clear(); }}//ThreadPoolMonitorController
import club.throwable.smp.service.ThreadPoolMonitor;
import lombok.RequiredArgsConstructor;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/ * * *@author throwable
 * @versionV1.0 *@description
 * @since2019/4/7 * / mark
@RequiredArgsConstructor
@RestController
public class ThreadPoolMonitorController {

	private final ThreadPoolMonitor threadPoolMonitor;

	@GetMapping(value = "/shortTimeWork")
	public ResponseEntity<String> shortTimeWork(a) {
		threadPoolMonitor.shortTimeWork();
		return ResponseEntity.ok("success");
	}

	@GetMapping(value = "/longTimeWork")
	public ResponseEntity<String> longTimeWork(a) {
		threadPoolMonitor.longTimeWork();
		return ResponseEntity.ok("success");
	}

	@GetMapping(value = "/clearTaskQueue")
	public ResponseEntity<String> clearTaskQueue(a) {
		threadPoolMonitor.clearTaskQueue();
		return ResponseEntity.ok("success"); }}Copy the code

The configuration is as follows:

server:
  port: 9091
management:
  server:
    port: 9091
  endpoints:
    web:
      exposure:
        include: '*'
      base-path: /management
Copy the code

The scheduling Job of Prometheus can also be adjusted at a higher frequency, where the default is 15 seconds for pulling/Prometheus endpoint, i.e., three collection cycles of data will be submitted at a time. After the project has started, try calling/Management/Prometheus to see the data submitted by the endpoint:

Since ThreadPoolMonitorSample is our custom named Tag, seeing the words indicates that the data collection is normal. If the Job of Prometheus is correctly configured, check the background of Prometheus after the local spring-Boot project is started:

OK, perfect, ready for the next step.

Grafana panel configuration

The next important step is to configure the Grafana panel to ensure that JVM applications and Prometheus’ scheduling jobs are normal. If you don’t want to learn about PSQL for Prometheus, copy sample expressions from Prometheus’s/Graph panel into the Grafana configuration, or read Prometheus’s documentation system to learn how to write PSQL.

  • Basic configuration:

  • For visual configuration, check the label on the right and increase the width as much as possible:

  • Query configuration, which is the most important, is how the final diagram is presented:

The query configuration is as follows:

  • A: thread_pool_active_size, Legend:{{instance}}-{{thread_pool_name}} Number of active threads in the thread pool.
  • B: thread_pool_largest_size, Legend:{{instance}}-{{thread_pool_name}} Number of historical peak threads in the thread pool.
  • C: thread_pool_max_size, Legend:{{instance}}-{{thread_pool_name}} Thread pool capacity.
  • D: thread_pool_core_size, Legend:{{instance}}-{{thread_pool_name}} Number of core threads in the thread pool.
  • E: thread_pool_thread_count, Legend:{{instance}}-{{thread_pool_name}} Number of running threads in the thread pool.
  • F: thread_pool_queue_size, Legend:{{instance}}-{{thread_pool_name}} Number of backlog tasks in the thread pool.

The final result

A few more calls to the interfaces provided in the example yield a chart that monitors thread pool rendering:

summary

Monitoring the data of thread pool ThreadPoolExecutor helps to detect the exceptions of interfaces using the thread pool in time. The most effective way to recover quickly is to clear the backlog of tasks in the task queue in the thread pool. To do this, delegate ThreadPoolExecutor to the IOC container and expose ThreadPoolExecutor’s task queue emptying method as a REST endpoint. Monitoring of HTTP Client connection pools such as Apache-HTTP-client or OkHttp can be implemented in a similar way. Data collection may incur a small performance loss due to locking and other reasons, but these can be ignored. If there is a real performance impact, You can avoid the performance penalty of locking by trying to use the reflection API to directly obtain the property values inside the ThreadPoolExecutor instance.

  • Personal blog links: www.throwable.club/2019/04/14/…

(C-2-D 20190414)

Technical official account (Throwable Digest), push the author’s original technical articles from time to time (never plagiarize or reprint) :