Java geek
1. Scheduling worker threads in HikariPool
The worker threads in HikariPool are scheduled by ThreadPoolExecutor. There are three instances of ThreadPoolExecutor.
ThreadPoolExecutor | Duties and responsibilities | Processing strategy after overload |
---|---|---|
houseKeepingExecutorService | To be responsible for the 1. Reduce database connections during dynamic scaling of the database connection pool 2. Monitor database connection leaks 3. Monitor database connections that exceed the maximum lifetime |
abandon |
addConnectionExecutor | Responsible for creating database connections, including adding database connections when database connection pools scale dynamically. | abandon |
closeConnectionExecutor | Responsible for closing database connections. | Repeat until successful |
1.1. houseKeepingExecutorService
Instantiation:
//HikariPool.java
private ScheduledExecutorService initializeHouseKeepingExecutorService(a)
{
if (config.getScheduledExecutor() == null) {
final ThreadFactory threadFactory = Optional.ofNullable(config.getThreadFactory()).orElseGet(() -> new DefaultThreadFactory(poolName + " housekeeper".true));
final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, threadFactory, new ThreadPoolExecutor.DiscardPolicy());
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
executor.setRemoveOnCancelPolicy(true);
return executor;
}
else {
returnconfig.getScheduledExecutor(); }}Copy the code
1.1.1 Monitor connection leaks
//HikariPool.java
this.leakTaskFactory = new ProxyLeakTaskFactory(config.getLeakDetectionThreshold(), houseKeepingExecutorService);
Copy the code
//ProxyLeakTaskFactory.java
private ProxyLeakTask scheduleNewTask(PoolEntry poolEntry) {
ProxyLeakTask task = new ProxyLeakTask(poolEntry);
/ / the executorService is houseKeepingExecutorService
task.schedule(executorService, leakDetectionThreshold);
return task;
}
Copy the code
//ProxyLeakTask.java
ProxyLeakTask(final PoolEntry poolEntry)
{
this.exception = new Exception("Apparent connection leak detected");
this.threadName = Thread.currentThread().getName();
this.connectionName = poolEntry.connection.toString();
}
public void run(a)
{
isLeaked = true;
final StackTraceElement[] stackTrace = exception.getStackTrace();
final StackTraceElement[] trace = new StackTraceElement[stackTrace.length - 5];
System.arraycopy(stackTrace, 5, trace, 0, trace.length);
exception.setStackTrace(trace);
// The following is the process that monitors connection leaks. Here, it is only recorded in the log. It is more flexible if it is handled through an interface and can be implemented dynamically by the user
LOGGER.warn("Connection leak detection triggered for {} on thread {}, stack trace follows", connectionName, threadName, exception);
}
Copy the code
1.1.2 Connection Pool Dynamic Scaling
//HikariPool.java
// HouseKeeper is a load connection pool of dynamic scaling work threads
this.houseKeeperTask = houseKeepingExecutorService.scheduleWithFixedDelay(new HouseKeeper(), 100L, housekeepingPeriodMs, MILLISECONDS);
Copy the code
1.1.3 Monitoring the maximum lifetime of database connections
final long maxLifetime = config.getMaxLifetime();
if (maxLifetime > 0) {
// variance up to 2.5% of the maxlifetime
final long variance = maxLifetime > 10 _000 ? ThreadLocalRandom.current().nextLong( maxLifetime / 40 ) : 0;
final long lifetime = maxLifetime - variance;
poolEntry.setFutureEol(houseKeepingExecutorService.schedule(
() -> {
if (softEvictConnection(poolEntry, "(connection has passed maxLifetime)".false /* not owner */)) {
addBagItem(connectionBag.getWaitingThreadCount());
}
},
lifetime, MILLISECONDS));
}
Copy the code
1.2. addConnectionExecutor
Instantiation:
//HikariPool.java
this.addConnectionExecutor = createThreadPoolExecutor(addConnectionQueue, poolName + " connection adder", threadFactory, new ThreadPoolExecutor.DiscardPolicy());
Copy the code
//UtilityElf.java
public static ThreadPoolExecutor createThreadPoolExecutor(final BlockingQueue<Runnable> queue, final String threadName, ThreadFactory threadFactory, final RejectedExecutionHandler policy)
{
if (threadFactory == null) {
threadFactory = new DefaultThreadFactory(threadName, true);
}
ThreadPoolExecutor executor = new ThreadPoolExecutor(1 /*core*/.1 /*max*/.5 /*keepalive*/, SECONDS, queue, threadFactory, policy);
executor.allowCoreThreadTimeOut(true);
return executor;
}
Copy the code
Adding a connection:
//HikariPool.java
public void addBagItem(final int waiting)
{
final boolean shouldAdd = waiting - addConnectionQueue.size() >= 0; // Yes, >= is intentional.
if(shouldAdd) { addConnectionExecutor.submit(poolEntryCreator); }}// connectives dynamically scale to add connections
private synchronized void fillPool(a)
{
final int connectionsToAdd = Math.min(config.getMaximumPoolSize() - getTotalConnections(), config.getMinimumIdle() - getIdleConnections())
- addConnectionQueue.size();
for (int i = 0; i < connectionsToAdd; i++) {
addConnectionExecutor.submit((i < connectionsToAdd - 1)? poolEntryCreator : postFillPoolEntryCreator); }}Copy the code
1.3. closeConnectionExecutor
Instantiation:
//HikariPool.java
this.closeConnectionExecutor = createThreadPoolExecutor(config.getMaximumPoolSize(), poolName + " connection closer", threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
Copy the code
//UtilityElf.java
public static ThreadPoolExecutor createThreadPoolExecutor(final int queueSize, final String threadName, ThreadFactory threadFactory, final RejectedExecutionHandler policy)
{
if (threadFactory == null) {
threadFactory = new DefaultThreadFactory(threadName, true);
}
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(queueSize);
ThreadPoolExecutor executor = new ThreadPoolExecutor(1 /*core*/.1 /*max*/.5 /*keepalive*/, SECONDS, queue, threadFactory, policy);
executor.allowCoreThreadTimeOut(true);
return executor;
}
Copy the code
Close the connection:
void closeConnection(final PoolEntry poolEntry, final String closureReason)
{
if (connectionBag.remove(poolEntry)) {
final Connection connection = poolEntry.close();
closeConnectionExecutor.execute(() -> {
quietlyCloseConnection(connection, closureReason);
if(poolState == POOL_NORMAL) { fillPool(); }}); }}Copy the code
2. Related tools
class | Duties and responsibilities |
---|---|
ThreadPoolExecutor | Thread executor |
BlockingQueue | The buffer queue used by the thread pool. The queue length determines the maximum number of worker threads that can be buffered |
ThreadFactory | Create thread factories for worker threads |
ScheduledThreadPoolExecutor | Thread pool executor that supports scheduled scheduling and can specify deferred and periodic execution. This allows you to set the delay time to the maximum lifetime time to monitor whether the database connection has exceeded the maximum lifetime |
DefaultThreadFactory | The default thread factory implemented in HikariPool, which sets the thread name and makes the thread Sprite |
RejectedExecutionHandler | Process policy interface for adding new threads when the thread queue in thread executor is full |
DiscardOldestPolicy | Discard the oldest unexecuted worker thread in the thread queue and add a new worker thread that is not used in the HikariPool. |
CallerRunsPolicy | Repeat until successful and use in closeConnectionExecutor. |
AbortPolicy | Discard worker threads that exceed thread queue load and throw an exception. It is not used in HikariPool. |
DiscardPolicy | Worker threads that are complex beyond the thread queue are ignored and no processing is done. Used in houseKeepingExecutorService and houseKeepingExecutorService. |
3. The core classes
3.1 ThreadPoolExecutor
The constructor:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
Copy the code
Parameter Description:
parameter | instructions |
---|---|
int corePoolSize | The minimum number of threads to hold in the thread pool executor |
int maximumPoolSize | The maximum number of threads allowed by the thread pool executor |
long keepAliveTime | The retention time of unused threads. If this time is exceeded and the number of threads is greater than the minimum number, the thread is released |
TimeUnit | Unit of retention time for unused threads |
BlockingQueue | The thread buffer queue, whose role is affected by maximumPoolSize, will not be queued when there are enough threads. |
ThreadFactory | Thread factory interface for generating worker threads |
RejectedExecutionHandler | The number of worker threads put into the cache queue exceeds the cache queue capacity |
A few things to note here:
- The thread buffer queue should be set to a bounded queue to avoid memory overflow due to infinite size.
- The maximum number of threads should also be controlled appropriately, avoiding integer.max_value for the same reason as above.
- The processing logic of the thread buffer queue is affected by corePoolSize and maximumPoolSize. In short, workers are not put into the thread buffer queue when there are enough threads available.
Example:
import java.util.concurrent.*;
import static java.util.concurrent.ThreadPoolExecutor.*;
public class ThreadPoolExecutorTest {
private static int runableNum = 1;
public static void main(String[] args) {
BlockingQueue<Runnable> queue = new LinkedBlockingQueue(3);
// Change maximumPoolSize and maximumPoolSize to see the effect on queue processing logic
ThreadPoolExecutor executor = new ThreadPoolExecutor(1.3.300, TimeUnit.SECONDS,
queue, new DefaultThreadFactory(), new DefaultDiscardPolicy());
while(true) {
System.out.println("runableNum: " + runableNum);
executor.execute(new DefaultRunnable("id-" + runableNum));
runableNum++;
quietlySleep(500); }}private static void quietlySleep(long millis) {
try {
Thread.sleep(millis);
} catch(InterruptedException ie) { Thread.currentThread().interrupt(); }}private static class DefaultRunnable implements Runnable {
private String name;
public DefaultRunnable(String name) {
this.name = name;
}
@Override
public void run(a) {
System.out.println("Runnable-" + name + " run.");
quietlySleep(3000);
}
public String getName(a) {
return this.name; }}private static class DefaultDiscardPolicy extends DiscardPolicy {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
super.rejectedExecution(r, e);
if (r instanceof DefaultRunnable) {
DefaultRunnable defaultRunnable = (DefaultRunnable)r;
System.out.println("Runnable-" + defaultRunnable.getName() + " be discard."); }}}private static class DefaultThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
returnthread; }}}Copy the code
Output:
runableNum: 1
Runnable-id-1 run.
runableNum: 2
runableNum: 3
runableNum: 4
runableNum: 5
Runnable-id-5 run.
runableNum: 6
Runnable-id-6 run.
Runnable-id-2 run.
runableNum: 7
runableNum: 8
Runnable-id-8 be discard.
runableNum: 9
Runnable-id-9 be discard.
runableNum: 10
Runnable-id-10 be discard.
Runnable-id-3 run.
runableNum: 11
Runnable-id-4 run.
runableNum: 12
Runnable-id-7 run.
runableNum: 13
runableNum: 14
Runnable-id-14 be discard.
Copy the code
3.2 ScheduledThreadPoolExecutor
publicScheduledFuture<? > schedule(Runnable command,long delay,
TimeUnit unit) {
Copy the code
You can leverage its ability to delay thread execution to monitor for connection leaks or exceeding the maximum lifetime.
4. To summarize
- ThreadPoolExecutor and ScheduledThreadPoolExecutor related classes is the core to improve the performance of thread tools, must be good at using.
- Make full use of thread tools to manage resource pools and arrange worker threads reasonably.
end.
<– Thanks for the triple punch, left likes and followings.
HikariPool source code (a) HikariPool source code (two) design ideas for reference HikariPool source code (three) resource pool dynamic scaling HikariPool source code (four) resource state HikariPool source code (six) to use some useful JAVA features
Java geek site: javageektour.com/