1. Overall design
The whole is divided into two parts: business service and management, including three main function points:
(1) Simplify thread pool configuration: corePoolSize, maximumPoolSize, and workQueue, which determine the maximum task allocation and thread allocation policy of the thread pool. Considering that in practical applications, there are mainly two scenarios for obtaining concurrency :(1) execute subtasks in parallel to improve the response speed. In this case, a synchronous queue should be used, and nothing should be cached, but should be executed immediately. (2) Execute a large number of tasks in parallel to improve throughput. In this case, a bounded queue should be used and a queue should be used to buffer a large number of tasks. The queue size must be declared to prevent unrestricted accumulation of tasks. So a thread pool can satisfy most business needs by providing only the configuration of these three key parameters and providing a choice of two queues
(2) Parameters can be dynamically configured: on the basis of the remaining Java thread pool, it encapsulates the thread pool, allows the thread pool to modify the configuration according to the change of external configuration, and develops the control interface in the management end for convenient development;
(3) Add thread pool monitoring: Add monitoring capability in the entire life cycle of thread pool tasks, such as the number of active tasks, the number of exceptions occurred in the task execution, the size of task queue and other indicators, to facilitate related development to understand the status of thread pool;
2. Overall architecture design
Dynamic parameter tuning: Provides a management interface to dynamically adjust thread pool parameters, including the number of core threads in the thread pool, the maximum number of threads, and the buffer queue length. The modified parameters take effect in time
Task monitoring: Supports transaction monitoring of application granularity, thread pool granularity, and task granularity. You can view the task execution status, maximum task execution time, and average task execution time of the thread pool
Load alarm: Supports the configuration of alarm rules to notify the development responsible person when the threshold is exceeded
Operation monitoring and Log: Management segment configures operation access audit logs
Permission verification: different application users
3. Main business processes
It can be divided into two parts: application (client) and management, using mysql and ES database. The monitoring page is configured with Kibana visual tool, and the thread pool configuration page is completed with simple front end.
4. Part of the core code
4.1 Client Implementation
4.1.1 Dynamic thread pool definition and implementation
Because only a ThreadPoolExecutor maxCorePoolSize corePoolSize/open
Only BlockingQueue interface can block the queue, and open the method of changing the queue capacity, so that the set method can be dynamically changed, and other parts of the implementation of ArrayBlockingQueue can be dynamically blocked;
/ / custom list blocking queue public class PhxResizeLinkedBlockingQueue extends AbstractQueue implements BlockingQueue, java.io.Serializable { private volatile int capacity; Public void setCapacity(int capacity) {this.capacity = capacity; this.capacity = capacity; }}Copy the code
In addition to dynamic modification, it also needs to support status reporting, overload information reporting, task running time statistics, etc. Therefore, it is necessary to rewrite afterExecute method to monitor the status of the task, custom RejectHandler monitoring RejectHandler, as shown in the following.
// Override the method, Public class PhxThreadPool extends ThreadPoolExecutor {@override protected void afterExecute(Runnable r, Throwable throwable) { LocalDateTime startTime = LocalDateTime.now(); super.afterExecute(r, throwable); LocalDateTime endTime = LocalDateTime.now(); if (throwable == null && r instanceof Future) { try { ((Future) r).get(); } catch (CancellationException ce) { throwable = ce; } catch (ExecutionException ee) { throwable = ee.getCause(); } catch(InterruptedException ie){ Thread.currentThread().interrupt(); } } try { if (throwable ! = null) { publishEvent(EventEnums.POOL_RUNNABLE_EXECUTE_ERROR.name(), null); } else { Duration duration = Duration.between(startTime, endTime); Integer costTime = Integer.parseInt(String.valueOf(duration.getSeconds())); publishEvent(EventEnums.POOL_RUNTIME_STATISTICS.name(), costTime); }} catch (Exception ex){}} Public Class PhxRejectHandler Implements RejectedExecutionHandler {@override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { PhxThreadPool threadPool = (PhxThreadPool)executor; List metaDataDTOS = new ArrayList<>(); . publisher.publishEvent(metaDataDTOS); }}Copy the code
4.1.2 Dynamic Thread Pool Registration
To facilitate the management of thread pool instances, a thread pool container is defined
public class ThreadPoolContainer { private ConcurrentMap container = new ConcurrentHashMap<>(); public void put(String poolName, PhxThreadPool threadPool) { container.putIfAbsent(poolName, threadPool); } public PhxThreadPool get(String poolName) { return container.getOrDefault(poolName, null); }}Copy the code
The bean’s post-processor is used to scan and register threads, scanning the defined thread pool instance one by one at spring container startup, and creating a separate thread for registration in order not to block the startup process. You also need to set a custom rejection policy to the thread pool instance.
public class PhxThreadPoolBeanPostProcessor implements BeanPostProcessor { private final String url; private PhxThreadPoolConfig config; private ThreadPoolContainer container; private RejectedExecutionHandler rejectHandler; private ExecutorService executorService = Executors.newSingleThreadExecutor(); public PhxThreadPoolBeanPostProcessor(PhxThreadPoolConfig config, ThreadPoolContainer container) { this.config = config; url = config.getAdminUrl() + "/meta/register"; this.container = container; } @Override public Object postProcessBeforeInitialization(final Object bean, final String beanName) throws BeansException { return bean; } @Override public Object postProcessAfterInitialization(final Object poolBean, final String beanName) throws BeansException { if (poolBean instanceof PhxThreadPool) { executorService.execute(() -> handler((PhxThreadPool) poolBean)); } return poolBean; } private void handler(final PhxThreadPool poolBean) { String poolName = poolBean.getName(); poolBean.setRejectedExecutionHandler(rejectHandler); container.put(poolName, poolBean); post(buildJsonParams(poolBean)); }}Copy the code
4.1.3 Reporting the thread pool running status
In order to increase the message throughput and save the application resources to the maximum extent, the client component uses the message queue for buffering, which includes events/event publishers/message queues/event handlers.
The main implementation code is as follows:
Public class MonitorEvent Implements Serializable {private List eventDTOs; public void clear () { eventDTOs = null; }} public class MonitorEventDTO {private String appName; private String ip; private String poolName; private Integer corePoolSize; private Integer maxPoolSize; private Integer queueSize; private Integer queueCapacity; } public class MonitorEventFactory implements EventFactory {@override public MonitorEvent newInstance() { return new MonitorEvent(); }} public class MonitorEventHandler implements EventHandler {private PhxThreadPoolConfig Config; public MonitorEventHandler(PhxThreadPoolConfig config) { this.config = config; } @Override public void onEvent(MonitorEvent monitorEvent, long l, boolean b) throws Exception { if (monitorEvent == null || monitorEvent.getEventDTOs() == null || monitorEvent.getEventDTOs().size() == 0) { return; } String path = config.getAdminUrl() + "/meta/upload"; List eventDTOs = monitorEvent.getEventDTOs(); PhxSender.upload(path, eventDTOs); monitorEvent.clear(); Public class MonitorEventPublisher {private Disruptor Disruptor; private MonitorEventHandler eventHandler; private PhxThreadPoolConfig config; public void publishEvent(final List events) { final RingBuffer ringBuffer = disruptor.getRingBuffer(); ringBuffer.publishEvent(new EventTranslator(), events); } public void destroy() { disruptor.shutdown(); }} public class EventTranslator implements EventTranslatorOneArg> {@override public void translateTo(MonitorEvent event, long l, List monitorEventDTOS) { event.setEventDTOs(monitorEventDTOS); }} public class PhxSender {public static void upload(String path, List metaDataDTOS) throws IOException { if (metaDataDTOS ! = null && metaDataDTOS.size() > 0) { String jsonBody = OkHttpTools.getInstance().getGosn().toJson(metaDataDTOS); OkHttpTools.getInstance().post(path, jsonBody); }}}Copy the code
4.1.4 Thread Pool Dynamic Configuration
There are two ways to implement dynamic configuration change of a thread pool. The first way is to use the Apollo client to listen to the configuration change event sent by the controller and then respond to the event, similar to the “push mode”. The second way is to start a thread on the client, continuously request the controller’s service, and then update the local thread pool configuration, similar to the “pull mode”. The second way is used here.
public class PhxClient { public void startPullPoolConfig() { pullConfigThread = new Thread(() -> { while(! toStop) { try { String path = config.getAdminUrl() + "/meta/" + config.getAppName(); String response = OkHttpTools.getInstance().get(path); / / analytical control end to return to the thread pool configuration information List pools = JSON. ParseArray (response, ThreadPoolConfigDto. Class); if (pools ! = null && pools.size() > 0) { pools.forEach(pool -> { PhxThreadPool threadPool = container.get(pool.getPoolName()); threadPool.setCorePoolSize(pool.getCorePoolSize()); threadPool.setMaximumPoolSize(pool.getMaxPoolSize()); PhxResizeLinkedBlockingQueue queue = (PhxResizeLinkedBlockingQueue)threadPool.getQueue(); queue.setCapacity(pool.getQueueCapacity()); container.put(pool.getPoolName(), threadPool); }); }} catch (Exception ex) {log.error(" [thread pool control client-pull configuration thread has an Exception] Exception :", ex); if (! toStop) { } } try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }}}); }Copy the code