Actuator modifiable configuration of thread pool monitoring
Summary 1.
Previously, the thread pool load was too high due to the company’s bad habit of using thread pools. The rejection policy is triggered, causing a large number of tasks to be lost. However, this situation is not monitored. As a result, a rejection exception is thrown only after the service fails. Therefore, it is necessary to monitor projects that use a large number of thread pools, and it is better to modify the parameters of the thread pool without stopping the operation. Therefore, the thread pool’s hook method can be used to burrow the status of the thread pool, and the internal parameters of the thread pool can be modified by using the Actuator’s visual monitoring and customized Endpoint. Implementations can modify thread pool parameters dynamically.
2. Implement
1. Import Maven dependencies
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
Copy the code
2. Write the ThreadPoolMonitor.java monitor class
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/** * Inherits the ThreadPoolExecutor class, overriding the shutdown(), shutdownNow(), beforeExecute(), and afterExecute() * methods to count thread pool execution */
public class ThreadPoolMonitor extends ThreadPoolExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolMonitor.class);
/** * Saves the start time of the task. When the task is complete, the end time is subtracted from the start time to calculate the task execution time */
private final ConcurrentHashMap<String, Date> startTimes;
/** * Thread pool name, usually named with the service name to distinguish */
private final String poolName;
private long totalDiff;
/** * calls the constructor of the parent class and initializes the HashMap and thread pool name **@paramCorePoolSize Number of core threads in the thread pool *@paramMaximumPoolSize Maximum number of threads in the thread pool *@paramKeepAliveTime Maximum idle time of a thread *@paramUnit Unit of idle time *@paramWorkQueue Holds the queue of submitted tasks *@paramPoolName specifies the thread poolName */
public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
new EventThreadFactory(poolName), poolName);
}
/** * calls the constructor of the parent class and initializes the HashMap and thread pool name **@paramCorePoolSize Number of core threads in the thread pool *@paramMaximumPoolSize Maximum number of threads in the thread pool *@paramKeepAliveTime Maximum idle time of a thread *@paramUnit Unit of idle time *@paramWorkQueue Holds the queue of submitted tasks *@paramThreadFactory specifies the threadFactory@paramPoolName specifies the thread poolName */
public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory, String poolName) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
this.startTimes = new ConcurrentHashMap<>();
this.poolName = poolName;
}
/** * Count thread pool status */ when a thread pool is delayed (waiting for all tasks in the pool to complete)
@Override
public void shutdown(a) {
// Count the number of executed tasks, executing tasks, and unexecuted tasks
LOGGER.info("{} Going to shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}".this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
super.shutdown();
}
/** * Count thread pool status when the thread pool is immediately closed */
@Override
public List<Runnable> shutdownNow(a) {
// Count the number of executed tasks, executing tasks, and unexecuted tasks
LOGGER.info("{} Going to immediately shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}".this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
return super.shutdownNow();
}
/** * Before executing the task, record the start time of the task */
@Override
protected void beforeExecute(Thread t, Runnable r) {
startTimes.put(String.valueOf(r.hashCode()), new Date());
}
/** * After the task is executed, the end time of the task is calculated */
@Override
protected void afterExecute(Runnable r, Throwable t) {
Date startDate = startTimes.remove(String.valueOf(r.hashCode()));
Date finishDate = new Date();
long diff = finishDate.getTime() - startDate.getTime();
totalDiff += diff;
// Count the task time, number of initial threads, number of core threads, number of tasks being executed,
// Number of completed tasks, total tasks, number of cached tasks in the queue, maximum number of threads in the pool,
// Maximum number of threads allowed, idle time, whether the thread pool is closed, whether the thread pool is terminated
LOGGER.info("{}-pool-monitor: " +
"Duration: {} ms, PoolSize: {}, CorePoolSize: {}, Active: {}, " +
"Completed: {}, Task: {}, Queue: {}, LargestPoolSize: {}, " +
"MaximumPoolSize: {}, KeepAliveTime: {}, isShutdown: {}, isTerminated: {}".this.poolName,
diff, this.getPoolSize(), this.getCorePoolSize(), this.getActiveCount(),
this.getCompletedTaskCount(), this.getTaskCount(), this.getQueue().size(), this.getLargestPoolSize(),
this.getMaximumPoolSize(), this.getKeepAliveTime(TimeUnit.MILLISECONDS), this.isShutdown(), this.isTerminated());
}
/** * The thread used to generate the thread pool just overwrites the default thread factory of the thread pool, passing in the thread pool name to facilitate problem tracking */
static class EventThreadFactory implements ThreadFactory {
private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
/** * Initializes the thread factory **@paramPoolName specifies the thread poolName */
EventThreadFactory(String poolName) {
SecurityManager s = System.getSecurityManager();
group = Objects.nonNull(s) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = poolName + "-pool-" + POOL_NUMBER.getAndIncrement() + "-thread-";
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
if (t.isDaemon()) {
t.setDaemon(false);
}
if(t.getPriority() ! = Thread.NORM_PRIORITY) { t.setPriority(Thread.NORM_PRIORITY); }returnt; }}public long getTotalDiff(a) {
returntotalDiff; }}Copy the code
3. Implement ResizeableBlockingQueue. Java variable queue
Here we directly modify the LinkedBlockingQueue code to change capacity from final to a mutable parameter. Add get and set methods.
/**
* The type Resizeable blocking queue.
*
* @param <E> the type parameter
*/
public class ResizeableBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private static final long serialVersionUID = -1232131234709194L;
/* * A variable queue size blocking queue based on the LinkedBlockingQueue implementation * * */
/**
* The type Node.
*
* @param <E> the type parameter
*/
static class Node<E> { E item; Node<E> next; Node(E x) { item = x; }}private int capacity;
private final AtomicInteger count = new AtomicInteger();
transient Node<E> head;
private transient Node<E> last;
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
/**
* Gets capacity.
*
* @return the capacity
*/
public int getCapacity(a) {
return capacity;
}
/**
* Sets capacity.
*
* @param capacity the capacity
*/
public void setCapacity(int capacity) {
this.capacity = capacity;
}
private void signalNotEmpty(a) {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally{ takeLock.unlock(); }}private void signalNotFull(a) {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally{ putLock.unlock(); }}private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
private E dequeue(a) {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
/** * Fully lock. */
void fullyLock(a) {
putLock.lock();
takeLock.lock();
}
/** * Fully unlock. */
void fullyUnlock(a) {
takeLock.unlock();
putLock.unlock();
}
/** * Instantiates a new Resizeable blocking queue. */
public ResizeableBlockingQueue(a) {
this(Integer.MAX_VALUE);
}
/**
* Instantiates a new Resizeable blocking queue.
*
* @param capacity the capacity
*/
public ResizeableBlockingQueue(int capacity) {
if (capacity <= 0) {
throw new IllegalArgumentException();
}
this.capacity = capacity;
last = head = new Node<E>(null);
}
/**
* Instantiates a new Resizeable blocking queue.
*
* @param c the c
*/
public ResizeableBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null) {
throw new NullPointerException();
}
if (n == capacity) {
throw new IllegalStateException("Queue full");
}
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally{ putLock.unlock(); }}// this doc comment is overridden to remove the reference to collections
// greater in size than Integer.MAX_VALUE
@Override
public int size(a) {
return count.get();
}
// this doc comment is a modified copy of the inherited doc comment,
// without the reference to unlimited queues.
@Override
public int remainingCapacity(a) {
return capacity - count.get();
}
@Override
public void put(E e) throws InterruptedException {
if (e == null) {
throw new NullPointerException();
}
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1< capacity) { notFull.signal(); }}finally {
putLock.unlock();
}
if (c == 0) { signalNotEmpty(); }}@Override
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) {
throw new NullPointerException();
}
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
if (nanos <= 0) {
return false;
}
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1< capacity) { notFull.signal(); }}finally {
putLock.unlock();
}
if (c == 0) {
signalNotEmpty();
}
return true;
}
@Override
public boolean offer(E e) {
if (e == null) {
throw new NullPointerException();
}
final AtomicInteger count = this.count;
if (count.get() == capacity) {
return false;
}
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1< capacity) { notFull.signal(); }}}finally {
putLock.unlock();
}
if (c == 0) {
signalNotEmpty();
}
return c >= 0;
}
@Override
public E take(a) throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1) { notEmpty.signal(); }}finally {
takeLock.unlock();
}
if (c == capacity) {
signalNotFull();
}
return x;
}
@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0) {
return null;
}
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1) { notEmpty.signal(); }}finally {
takeLock.unlock();
}
if (c == capacity) {
signalNotFull();
}
return x;
}
@Override
public E poll(a) {
final AtomicInteger count = this.count;
if (count.get() == 0) {
return null;
}
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1) { notEmpty.signal(); }}}finally {
takeLock.unlock();
}
if (c == capacity) {
signalNotFull();
}
return x;
}
@Override
public E peek(a) {
if (count.get() == 0) {
return null;
}
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null) {
return null;
} else {
returnfirst.item; }}finally{ takeLock.unlock(); }}void unlink(Node<E> p, Node<E> trail) {
// assert isFullyLocked();
// p.next is not changed, to allow iterators that are
// traversing p to maintain their weak-consistency guarantee.
p.item = null;
trail.next = p.next;
if (last == p) {
last = trail;
}
if(count.getAndDecrement() == capacity) { notFull.signal(); }}@Override
public boolean remove(Object o) {
if (o == null) {
return false;
}
fullyLock();
try {
for(Node<E> trail = head, p = trail.next; p ! =null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail);
return true; }}return false;
} finally{ fullyUnlock(); }}@Override
public boolean contains(Object o) {
if (o == null) {
return false;
}
fullyLock();
try {
for(Node<E> p = head.next; p ! =null; p = p.next) {
if (o.equals(p.item)) {
return true; }}return false;
} finally{ fullyUnlock(); }}@Override
public Object[] toArray() {
fullyLock();
try {
int size = count.get();
Object[] a = new Object[size];
int k = 0;
for(Node<E> p = head.next; p ! =null; p = p.next) {
a[k++] = p.item;
}
return a;
} finally{ fullyUnlock(); }}@Override
@SuppressWarnings("unchecked")
public <T> T[] toArray(T[] a) {
fullyLock();
try {
int size = count.get();
if (a.length < size) {
a = (T[])java.lang.reflect.Array.newInstance
(a.getClass().getComponentType(), size);
}
int k = 0;
for(Node<E> p = head.next; p ! =null; p = p.next) {
a[k++] = (T)p.item;
}
if (a.length > k) {
a[k] = null;
}
return a;
} finally{ fullyUnlock(); }}@Override
public String toString(a) {
fullyLock();
try {
Node<E> p = head.next;
if (p == null) {
return "[]";
}
StringBuilder sb = new StringBuilder();
sb.append('[');
for (;;) {
E e = p.item;
sb.append(e == this ? "(this Collection)" : e);
p = p.next;
if (p == null) {
return sb.append('] ').toString();
}
sb.append(', ').append(' '); }}finally{ fullyUnlock(); }}@Override
public void clear(a) {
fullyLock();
try {
for(Node<E> p, h = head; (p = h.next) ! =null; h = p) {
h.next = h;
p.item = null;
}
head = last;
// assert head.item == null && head.next == null;
if (count.getAndSet(0) == capacity) { notFull.signal(); }}finally{ fullyUnlock(); }}@Override
public int drainTo(Collection<? super E> c) {
return drainTo(c, Integer.MAX_VALUE);
}
@Override
public int drainTo(Collection<? super E> c, int maxElements) {
if (c == null) {
throw new NullPointerException();
}
if (c == this) {
throw new IllegalArgumentException();
}
if (maxElements <= 0) {
return 0;
}
boolean signalNotFull = false;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
int n = Math.min(maxElements, count.get());
// count.get provides visibility to first n Nodes
Node<E> h = head;
int i = 0;
try {
while (i < n) {
Node<E> p = h.next;
c.add(p.item);
p.item = null;
h.next = h;
h = p;
++i;
}
return n;
} finally {
// Restore invariants even if c.add() threw
if (i > 0) {
// assert h.item == null;head = h; signalNotFull = (count.getAndAdd(-i) == capacity); }}}finally {
takeLock.unlock();
if(signalNotFull) { signalNotFull(); }}}@Override
public Iterator<E> iterator(a) {
return new Itr();
}
private class Itr implements Iterator<E> {
/* * Basic weakly-consistent iterator. At all times hold the next * item to hand out so that if hasNext() reports true, we will * still have it to return even if lost race with a take etc. */
private Node<E> current;
private Node<E> lastRet;
private E currentElement;
Itr() {
fullyLock();
try {
current = head.next;
if(current ! =null) { currentElement = current.item; }}finally{ fullyUnlock(); }}@Override
public boolean hasNext(a) {
returncurrent ! =null;
}
private Node<E> nextNode(Node<E> p) {
for (;;) {
Node<E> s = p.next;
if (s == p) {
return head.next;
}
if (s == null|| s.item ! =null) {
returns; } p = s; }}@Override
public E next(a) {
fullyLock();
try {
if (current == null) {
throw new NoSuchElementException();
}
E x = currentElement;
lastRet = current;
current = nextNode(current);
currentElement = (current == null)?null : current.item;
return x;
} finally{ fullyUnlock(); }}@Override
public void remove(a) {
if (lastRet == null) {
throw new IllegalStateException();
}
fullyLock();
try {
Node<E> node = lastRet;
lastRet = null;
for(Node<E> trail = head, p = trail.next; p ! =null;
trail = p, p = p.next) {
if (p == node) {
unlink(p, trail);
break; }}}finally{ fullyUnlock(); }}}/**
* The type Lbq spliterator.
*
* @param <E> the type parameter
*/
static final class LBQSpliterator<E> implements Spliterator<E> {
static final int MAX_BATCH = 1 << 25; // max batch array size;
final ResizeableBlockingQueue<E> queue;
Node<E> current; // current node; null until initialized
int batch; // batch size for splits
boolean exhausted; // true when no more nodes
long est; // size estimate
LBQSpliterator(ResizeableBlockingQueue<E> queue) {
this.queue = queue;
this.est = queue.size();
}
@Override
public long estimateSize(a) { return est; }
@Override
public Spliterator<E> trySplit(a) {
Node<E> h;
final ResizeableBlockingQueue<E> q = this.queue;
int b = batch;
int n = (b <= 0)?1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1;
if(! exhausted && ((h = current) ! =null|| (h = q.head.next) ! =null) && h.next ! =null) {
Object[] a = new Object[n];
int i = 0;
Node<E> p = current;
q.fullyLock();
try {
if(p ! =null|| (p = q.head.next) ! =null) {
do {
if((a[i] = p.item) ! =null) { ++i; }}while((p = p.next) ! =null&& i < n); }}finally {
q.fullyUnlock();
}
if ((current = p) == null) {
est = 0L;
exhausted = true;
}
else if ((est -= i) < 0L) {
est = 0L;
}
if (i > 0) {
batch = i;
return Spliterators.spliterator
(a, 0, i, Spliterator.ORDERED | Spliterator.NONNULL | Spliterator.CONCURRENT); }}return null;
}
@Override
public void forEachRemaining(Consumer<? super E> action) {
if (action == null) {
throw new NullPointerException();
}
final ResizeableBlockingQueue<E> q = this.queue;
if(! exhausted) { exhausted =true;
Node<E> p = current;
do {
E e = null;
q.fullyLock();
try {
if (p == null) {
p = q.head.next;
}
while(p ! =null) {
e = p.item;
p = p.next;
if(e ! =null) {
break; }}}finally {
q.fullyUnlock();
}
if(e ! =null) { action.accept(e); }}while(p ! =null); }}@Override
public boolean tryAdvance(Consumer<? super E> action) {
if (action == null) {
throw new NullPointerException();
}
final ResizeableBlockingQueue<E> q = this.queue;
if(! exhausted) { E e =null;
q.fullyLock();
try {
if (current == null) {
current = q.head.next;
}
while(current ! =null) {
e = current.item;
current = current.next;
if(e ! =null) {
break; }}}finally {
q.fullyUnlock();
}
if (current == null) {
exhausted = true;
}
if(e ! =null) {
action.accept(e);
return true; }}return false;
}
@Override
public int characteristics(a) {
returnSpliterator.ORDERED | Spliterator.NONNULL | Spliterator.CONCURRENT; }}public Spliterator<E> spliterator(a) {
return new LBQSpliterator<E>(this);
}
private void writeObject(java.io.ObjectOutputStream s)
throws java.io.IOException {
fullyLock();
try {
// Write out any hidden stuff, plus capacity
s.defaultWriteObject();
// Write out all elements in the proper order.
for(Node<E> p = head.next; p ! =null; p = p.next) {
s.writeObject(p.item);
}
// Use trailing null as sentinel
s.writeObject(null);
} finally{ fullyUnlock(); }}private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
// Read in capacity, and any hidden stuff
s.defaultReadObject();
count.set(0);
last = head = new Node<E>(null);
// Read in all elements and place in queue
for (;;) {
@SuppressWarnings("unchecked")
E item = (E)s.readObject();
if (item == null) {
break; } add(item); }}}Copy the code
4. Implement ThreadPoolUtil. Java
Write the thread pool utility class, use Util to create the thread pool, and use a HashMap to point to the created thread pool, which can then be retrieved using the HashMap.
/** * The type Thread pool util
@Component
public class ThreadPoolUtil {
/** * Hash to the created thread pool, which can then be retrieved via the HashMap */
private final HashMap<String, ThreadPoolMonitor> threadPoolExecutorHashMap = new HashMap<>();
/** * Creat thread pool thread pool monitor. ** You can customize the constructor for queue types **@param corePoolSize the core pool size
* @param maximumPoolSize the maximum pool size
* @param keepAliveTime the keep alive time
* @param unit the unit
* @param workQueue the work queue
* @param poolName the pool name
* @return the thread pool monitor
*/
public ThreadPoolMonitor creatThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName) {
ThreadPoolMonitor threadPoolExecutor = new ThreadPoolMonitor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, poolName);
threadPoolExecutorHashMap.put(poolName, threadPoolExecutor);
return threadPoolExecutor;
}
/** * Creat thread pool thread pool monitor. ** ResizeableBlockingQueue ResizeableBlockingQueue ResizeableBlockingQueue Use the constructor for the default queue * *@param corePoolSize the core pool size
* @param maximumPoolSize the maximum pool size
* @param keepAliveTime the keep alive time
* @param unit the unit
* @param queueSize the queue size
* @param poolName the pool name
* @return the thread pool monitor
*/
public ThreadPoolMonitor creatThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, int queueSize, String poolName) {
ThreadPoolMonitor threadPoolExecutor = new ThreadPoolMonitor(corePoolSize, maximumPoolSize, keepAliveTime, unit, new ResizeableBlockingQueue<>(queueSize), poolName);
threadPoolExecutorHashMap.put(poolName, threadPoolExecutor);
return threadPoolExecutor;
}
/**
* Gets thread pool executor hash map.
*
* @return the thread pool executor hash map
*/
public HashMap<String, ThreadPoolMonitor> getThreadPoolExecutorHashMap(a) {
returnthreadPoolExecutorHashMap; }}Copy the code
5. Entity classes that implement thread pool information
The entity class that implements thread pool information is used to EndPoint the returned data
ThreadPoolDetailInfo.java
/** * The type Thread pool detail info. */
public class ThreadPoolDetailInfo {
private String threadPoolName;
private Integer poolSize;
private Integer corePoolSize;
private Integer largestPoolSize;
private Integer maximumPoolSize;
private long completedTaskCount;
private Integer active;
private long task;
private long keepAliveTime;
private String activePercent;
private Integer queueCapacity;
private Integer queueSize;
private long avgDiff;
/**
* Instantiates a new Thread pool detail info.
*
* @param threadPoolName the thread pool name
* @param poolSize the pool size
* @param corePoolSize the core pool size
* @param largestPoolSize the largest pool size
* @param maximumPoolSize the maximum pool size
* @param completedTaskCount the completed task count
* @param active the active
* @param task the task
* @param keepAliveTime the keep alive time
* @param activePercent the active percent
* @param queueCapacity the queue capacity
* @param queueSize the queue size
* @param avgDiff the avg diff
*/
public ThreadPoolDetailInfo(String threadPoolName, Integer poolSize, Integer corePoolSize, Integer largestPoolSize, Integer maximumPoolSize, long completedTaskCount, Integer active, long task, long keepAliveTime, String activePercent, Integer queueCapacity, Integer queueSize, long avgDiff) {
this.threadPoolName = threadPoolName;
this.poolSize = poolSize;
this.corePoolSize = corePoolSize;
this.largestPoolSize = largestPoolSize;
this.maximumPoolSize = maximumPoolSize;
this.completedTaskCount = completedTaskCount;
this.active = active;
this.task = task;
this.keepAliveTime = keepAliveTime;
this.activePercent = activePercent;
this.queueCapacity = queueCapacity;
this.queueSize = queueSize;
this.avgDiff = avgDiff;
}
/**
* Gets thread pool name.
*
* @return the thread pool name
*/
public String getThreadPoolName(a) {
return threadPoolName;
}
/**
* Sets thread pool name.
*
* @param threadPoolName the thread pool name
*/
public void setThreadPoolName(String threadPoolName) {
this.threadPoolName = threadPoolName;
}
/**
* Gets pool size.
*
* @return the pool size
*/
public Integer getPoolSize(a) {
return poolSize;
}
/**
* Sets pool size.
*
* @param poolSize the pool size
*/
public void setPoolSize(Integer poolSize) {
this.poolSize = poolSize;
}
/**
* Gets core pool size.
*
* @return the core pool size
*/
public Integer getCorePoolSize(a) {
return corePoolSize;
}
/**
* Sets core pool size.
*
* @param corePoolSize the core pool size
*/
public void setCorePoolSize(Integer corePoolSize) {
this.corePoolSize = corePoolSize;
}
/**
* Gets largest pool size.
*
* @return the largest pool size
*/
public Integer getLargestPoolSize(a) {
return largestPoolSize;
}
/**
* Sets largest pool size.
*
* @param largestPoolSize the largest pool size
*/
public void setLargestPoolSize(Integer largestPoolSize) {
this.largestPoolSize = largestPoolSize;
}
/**
* Gets maximum pool size.
*
* @return the maximum pool size
*/
public Integer getMaximumPoolSize(a) {
return maximumPoolSize;
}
/**
* Sets maximum pool size.
*
* @param maximumPoolSize the maximum pool size
*/
public void setMaximumPoolSize(Integer maximumPoolSize) {
this.maximumPoolSize = maximumPoolSize;
}
/**
* Gets completed task count.
*
* @return the completed task count
*/
public long getCompletedTaskCount(a) {
return completedTaskCount;
}
/**
* Sets completed task count.
*
* @param completedTaskCount the completed task count
*/
public void setCompletedTaskCount(long completedTaskCount) {
this.completedTaskCount = completedTaskCount;
}
/**
* Gets active.
*
* @return the active
*/
public Integer getActive(a) {
return active;
}
/**
* Sets active.
*
* @param active the active
*/
public void setActive(Integer active) {
this.active = active;
}
/**
* Gets task.
*
* @return the task
*/
public long getTask(a) {
return task;
}
/**
* Sets task.
*
* @param task the task
*/
public void setTask(long task) {
this.task = task;
}
/**
* Gets keep alive time.
*
* @return the keep alive time
*/
public long getKeepAliveTime(a) {
return keepAliveTime;
}
/**
* Sets keep alive time.
*
* @param keepAliveTime the keep alive time
*/
public void setKeepAliveTime(long keepAliveTime) {
this.keepAliveTime = keepAliveTime;
}
/**
* Gets active percent.
*
* @return the active percent
*/
public String getActivePercent(a) {
return activePercent;
}
/**
* Sets active percent.
*
* @param activePercent the active percent
*/
public void setActivePercent(String activePercent) {
this.activePercent = activePercent;
}
/**
* Gets queue capacity.
*
* @return the queue capacity
*/
public Integer getQueueCapacity(a) {
return queueCapacity;
}
/**
* Sets queue capacity.
*
* @param queueCapacity the queue capacity
*/
public void setQueueCapacity(Integer queueCapacity) {
this.queueCapacity = queueCapacity;
}
/**
* Gets queue size.
*
* @return the queue size
*/
public Integer getQueueSize(a) {
return queueSize;
}
/**
* Sets queue size.
*
* @param queueSize the queue size
*/
public void setQueueSize(Integer queueSize) {
this.queueSize = queueSize;
}
/**
* Gets avg diff.
*
* @return the avg diff
*/
public long getAvgDiff(a) {
return avgDiff;
}
/**
* Sets avg diff.
*
* @param avgDiff the avg diff
*/
public void setAvgDiff(long avgDiff) {
this.avgDiff = avgDiff; }}Copy the code
ThreadPoolInfo.java
/** * The type Thread pool info. */
public class ThreadPoolInfo {
private String threadPoolName;
private int corePoolSize;
private int maximumPoolSize;
private String queueType;
private int queueCapacity;
/**
* Instantiates a new Thread pool info.
*
* @param threadPoolName the thread pool name
* @param corePoolSize the core pool size
* @param maximumPoolSize the maximum pool size
* @param queueType the queue type
* @param queueCapacity the queue capacity
*/
public ThreadPoolInfo(String threadPoolName, int corePoolSize, int maximumPoolSize, String queueType, int queueCapacity) {
this.threadPoolName = threadPoolName;
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.queueType = queueType;
this.queueCapacity = queueCapacity;
}
/**
* Gets thread pool name.
*
* @return the thread pool name
*/
public String getThreadPoolName(a) {
return threadPoolName;
}
/**
* Sets thread pool name.
*
* @param threadPoolName the thread pool name
*/
public void setThreadPoolName(String threadPoolName) {
this.threadPoolName = threadPoolName;
}
/**
* Gets core pool size.
*
* @return the core pool size
*/
public int getCorePoolSize(a) {
return corePoolSize;
}
/**
* Sets core pool size.
*
* @param corePoolSize the core pool size
*/
public void setCorePoolSize(int corePoolSize) {
this.corePoolSize = corePoolSize;
}
/**
* Gets maximum pool size.
*
* @return the maximum pool size
*/
public int getMaximumPoolSize(a) {
return maximumPoolSize;
}
/**
* Sets maximum pool size.
*
* @param maximumPoolSize the maximum pool size
*/
public void setMaximumPoolSize(int maximumPoolSize) {
this.maximumPoolSize = maximumPoolSize;
}
/**
* Gets queue type.
*
* @return the queue type
*/
public String getQueueType(a) {
return queueType;
}
/**
* Sets queue type.
*
* @param queueType the queue type
*/
public void setQueueType(String queueType) {
this.queueType = queueType;
}
/**
* Gets capacity.
*
* @return the capacity
*/
public int getqueueCapacity(a) {
return queueCapacity;
}
/**
* Sets capacity.
*
* @param queueCapacity the queue capacity
*/
public void setqueueCapacity(int queueCapacity) {
this.queueCapacity = queueCapacity; }}Copy the code
6. Write the EndPoint
The Endpoints interface can be added using the @RestControllerendpoint annotation in the actuator. In essence, it acts the same as @endpoint and @webendpoint. Both add interfaces for services and facilitate the management of running services. One notable difference, however, is that @RestControllerendpoint only supports Http access, not JMX access. @getMapping, @postMapping, @deletemapping, @requestMapping, @readOperation, @writeOperation, @writeOperation @ DeleteOperation. And it returns the format: Application/JSON.
Since our monitoring system only supports JSON format, it is actually better to use Metrics and Grafana for monitoring.
/**
* The type Thread pool endpoint.
*
* @author newrank
*/
@RestControllerEndpoint(id = "threadpool")
@Component
public class ThreadPoolEndpoint {
@Autowired
private ThreadPoolUtil threadPoolUtil;
private static final ReentrantLock LOCK = new ReentrantLock();
private static final String RESIZEABLE_BLOCKING_QUEUE = "ResizeableBlockingQueue";
/** * getThreadPools * get the thread names of all current thread pools */
@GetMapping("getThreadPools")
private List<String> getThreadPools (a){
List<String> threadPools = new ArrayList<>();
if(! threadPoolUtil.getThreadPoolExecutorHashMap().isEmpty()){for(Map.Entry<String, ThreadPoolMonitor> entry : threadPoolUtil.getThreadPoolExecutorHashMap().entrySet()) { threadPools.add(entry.getKey()); }}return threadPools;
}
/** * get thread pool variable parameter information *@param threadPoolName
* @return* /
@GetMapping("getThreadPoolFixInfo")
private ThreadPoolInfo getThreadPoolInfo(@RequestParam String threadPoolName){
if (threadPoolUtil.getThreadPoolExecutorHashMap().containsKey(threadPoolName)){
ThreadPoolMonitor threadPoolExecutor = threadPoolUtil.getThreadPoolExecutorHashMap().get(threadPoolName);
int queueCapacity = 0;
if (RESIZEABLE_BLOCKING_QUEUE.equals(threadPoolExecutor.getQueue().getClass().getSimpleName())){
ResizeableBlockingQueue queue = (ResizeableBlockingQueue) threadPoolExecutor.getQueue();
queueCapacity = queue.getCapacity();
}
return new ThreadPoolInfo(threadPoolName,threadPoolExecutor.getCorePoolSize(),threadPoolExecutor.getMaximumPoolSize(),
threadPoolExecutor.getQueue().getClass().getSimpleName(),queueCapacity);
}
return null;
}
/** * Modify thread pool configuration *@param threadPoolInfo
* @return* /
@PostMapping("setThreadPoolFixInfo")
private Boolean setThreadPoolInfo(@RequestBody ThreadPoolInfo threadPoolInfo){
if (threadPoolUtil.getThreadPoolExecutorHashMap().containsKey(threadPoolInfo.getThreadPoolName())){
LOCK.lock();
try {
ThreadPoolMonitor threadPoolExecutor = threadPoolUtil.getThreadPoolExecutorHashMap().get(threadPoolInfo.getThreadPoolName());
threadPoolExecutor.setMaximumPoolSize(threadPoolInfo.getMaximumPoolSize());
threadPoolExecutor.setCorePoolSize(threadPoolInfo.getCorePoolSize());
if (RESIZEABLE_BLOCKING_QUEUE.equals(threadPoolExecutor.getQueue().getClass().getSimpleName())){
ResizeableBlockingQueue queue = (ResizeableBlockingQueue) threadPoolExecutor.getQueue();
queue.setCapacity(threadPoolInfo.getqueueCapacity());
}
return true;
}catch (Exception e){
e.printStackTrace();
return false;
}
finally{ LOCK.unlock(); }}return false;
}
/** * get thread pool monitoring information *@return* /
@GetMapping("getThreadPoolListInfo")
private List<ThreadPoolDetailInfo> getThreadPoolListInfo(a){
List<ThreadPoolDetailInfo> detailInfoList = new ArrayList<>();
if(! threadPoolUtil.getThreadPoolExecutorHashMap().isEmpty()){for(Map.Entry<String, ThreadPoolMonitor> entry : threadPoolUtil.getThreadPoolExecutorHashMap().entrySet()) { ThreadPoolDetailInfo threadPoolDetailInfo = threadPoolInfo(entry.getValue(),entry.getKey()); detailInfoList.add(threadPoolDetailInfo); }}return detailInfoList;
}
/** * Assemble thread pool details *@param threadPool
* @param threadPoolName
* @return* /
private ThreadPoolDetailInfo threadPoolInfo(ThreadPoolMonitor threadPool,String threadPoolName) {
BigDecimal activeCount = new BigDecimal(threadPool.getActiveCount());
BigDecimal maximumPoolSize = new BigDecimal(threadPool.getMaximumPoolSize());
BigDecimal result =activeCount.divide(maximumPoolSize, 2, BigDecimal.ROUND_HALF_UP);
NumberFormat numberFormat = NumberFormat.getPercentInstance();
numberFormat.setMaximumFractionDigits(2);
int queueCapacity = 0;
if (RESIZEABLE_BLOCKING_QUEUE.equals(threadPool.getQueue().getClass().getSimpleName())){
ResizeableBlockingQueue queue = (ResizeableBlockingQueue) threadPool.getQueue();
queueCapacity = queue.getCapacity();
}
return new ThreadPoolDetailInfo(threadPoolName,threadPool.getPoolSize(), threadPool.getCorePoolSize(),
threadPool.getLargestPoolSize(), threadPool.getMaximumPoolSize(), threadPool.getCompletedTaskCount(),
threadPool.getActiveCount(),threadPool.getTaskCount(),threadPool.getKeepAliveTime(TimeUnit.MILLISECONDS),
numberFormat.format(result.doubleValue()),queueCapacity,threadPool.getQueue().size(),threadPool.getTotalDiff()/threadPool.getTaskCount());
}
}
Copy the code
7. Use thread pool monitoring
- annotations
@Async("asyncExecutor") public void getTrendQuery(a){ //do something } Copy the code
- Direct use of
public void test(a) { asyncExecutor.execute(()->{ //do something});Copy the code
1. View the thread details
HTTP: / / http://localhost/actuator/threadpool/getThreadPoolListInfo / / GET requestCopy the code
Returns:
[{"active": 0.// Number of tasks in progress
"activePercent": "0%".// Thread pool load
"completedTaskCount": 17.// Number of completed tasks
"corePoolSize": 16.// Number of core threads
"keepAliveTime": 60000.// Thread lifetime
"largestPoolSize": 16.// The maximum number of threads reached
"maximumPoolSize": 32.// Maximum number of threads
"poolSize": 16.// Number of current threads
"queueCapacity": 500.Ps: Default to 0 if the queue is not ResizeableBlockingQueue
"task": 0.// Total number of tasks
"queueSize":0.// The number of tasks cached in the queue
"threadPoolName": "asyncExecutor" // The thread pool name}]Copy the code
2. View thread pool parameters
HTTP: / / http://localhost/actuator/threadpool/getThreadPoolFixInfo?threadPoolName=asyncExecutor / / GET requestCopy the code
Parameters:
The name of the | type |
---|---|
threadPoolName | String |
Returns:
{
"corePoolSize": 16.// Number of core threads
"maximumPoolSize": 32.// Maximum number of threads
"queueCapacity": 500.// Queue size
"queueType": "ResizeableBlockingQueue".// Queue type
"threadPoolName": "asyncExecutor" // The thread pool name
}
Copy the code
3. Modify thread pool parameters
HTTP: / / https://localhost/actuator/threadpool/setThreadPoolInfo / / Post requestCopy the code
Parameters:
The name of the | type | note |
---|---|---|
threadPoolName | String | |
corePoolSize | int | variable |
maximumPoolSize | int | variable |
queueCapacity | int | variable |
queueType | String | immutable |
Request type: JSON
Returns: a Boolean
The complete code above is available on Github
Github
Personal blog
The stones of Sisyphus
The author is of limited level, please point out any errors and omissions.
Refer to the article
1. Implementation principle of Java thread pool and its practice in Meituan business
2.Java concurrency (vi) Thread pool monitoring