Java geek

1. Scheduling worker threads in HikariPool

The worker threads in HikariPool are scheduled by ThreadPoolExecutor. There are three instances of ThreadPoolExecutor.

ThreadPoolExecutor Duties and responsibilities Processing strategy after overload
houseKeepingExecutorService To be responsible for the

1. Reduce database connections during dynamic scaling of the database connection pool

2. Monitor database connection leaks

3. Monitor database connections that exceed the maximum lifetime
abandon
addConnectionExecutor Responsible for creating database connections, including adding database connections when database connection pools scale dynamically. abandon
closeConnectionExecutor Responsible for closing database connections. Repeat until successful

1.1. houseKeepingExecutorService

Instantiation:

//HikariPool.java
   private ScheduledExecutorService initializeHouseKeepingExecutorService(a)
   {
      if (config.getScheduledExecutor() == null) {
         final ThreadFactory threadFactory = Optional.ofNullable(config.getThreadFactory()).orElseGet(() -> new DefaultThreadFactory(poolName + " housekeeper".true));
         final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, threadFactory, new ThreadPoolExecutor.DiscardPolicy());
         executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
         executor.setRemoveOnCancelPolicy(true);
         return executor;
      }
      else {
         returnconfig.getScheduledExecutor(); }}Copy the code

1.1.1 Monitor connection leaks

//HikariPool.java
      this.leakTaskFactory = new ProxyLeakTaskFactory(config.getLeakDetectionThreshold(), houseKeepingExecutorService);
Copy the code
//ProxyLeakTaskFactory.java
   private ProxyLeakTask scheduleNewTask(PoolEntry poolEntry) {
      ProxyLeakTask task = new ProxyLeakTask(poolEntry);
      / / the executorService is houseKeepingExecutorService
      task.schedule(executorService, leakDetectionThreshold);

      return task;
   }
Copy the code
//ProxyLeakTask.java
   ProxyLeakTask(final PoolEntry poolEntry)
   {
      this.exception = new Exception("Apparent connection leak detected");
      this.threadName = Thread.currentThread().getName();
      this.connectionName = poolEntry.connection.toString();
   }
   
   public void run(a)
   {
      isLeaked = true;

      final StackTraceElement[] stackTrace = exception.getStackTrace();
      final StackTraceElement[] trace = new StackTraceElement[stackTrace.length - 5];
      System.arraycopy(stackTrace, 5, trace, 0, trace.length);

      exception.setStackTrace(trace);
      // The following is the process that monitors connection leaks. Here, it is only recorded in the log. It is more flexible if it is handled through an interface and can be implemented dynamically by the user
      LOGGER.warn("Connection leak detection triggered for {} on thread {}, stack trace follows", connectionName, threadName, exception);
   }
Copy the code

1.1.2 Connection Pool Dynamic Scaling

//HikariPool.java
      // HouseKeeper is a load connection pool of dynamic scaling work threads
      this.houseKeeperTask = houseKeepingExecutorService.scheduleWithFixedDelay(new HouseKeeper(), 100L, housekeepingPeriodMs, MILLISECONDS);
Copy the code

1.1.3 Monitoring the maximum lifetime of database connections

         final long maxLifetime = config.getMaxLifetime();
         if (maxLifetime > 0) {
            // variance up to 2.5% of the maxlifetime
            final long variance = maxLifetime > 10 _000 ? ThreadLocalRandom.current().nextLong( maxLifetime / 40 ) : 0;
            final long lifetime = maxLifetime - variance;
            poolEntry.setFutureEol(houseKeepingExecutorService.schedule(
               () -> {
                  if (softEvictConnection(poolEntry, "(connection has passed maxLifetime)".false /* not owner */)) {
                     addBagItem(connectionBag.getWaitingThreadCount());
                  }
               },
               lifetime, MILLISECONDS));
         }
Copy the code

1.2. addConnectionExecutor

Instantiation:

//HikariPool.java
      this.addConnectionExecutor = createThreadPoolExecutor(addConnectionQueue, poolName + " connection adder", threadFactory, new ThreadPoolExecutor.DiscardPolicy());
Copy the code
//UtilityElf.java
   public static ThreadPoolExecutor createThreadPoolExecutor(final BlockingQueue<Runnable> queue, final String threadName, ThreadFactory threadFactory, final RejectedExecutionHandler policy)
   {
      if (threadFactory == null) {
         threadFactory = new DefaultThreadFactory(threadName, true);
      }

      ThreadPoolExecutor executor = new ThreadPoolExecutor(1 /*core*/.1 /*max*/.5 /*keepalive*/, SECONDS, queue, threadFactory, policy);
      executor.allowCoreThreadTimeOut(true);
      return executor;
   }
Copy the code

Adding a connection:

//HikariPool.java
   public void addBagItem(final int waiting)
   {
      final boolean shouldAdd = waiting - addConnectionQueue.size() >= 0; // Yes, >= is intentional.
      if(shouldAdd) { addConnectionExecutor.submit(poolEntryCreator); }}// connectives dynamically scale to add connections
   private synchronized void fillPool(a)
   {
      final int connectionsToAdd = Math.min(config.getMaximumPoolSize() - getTotalConnections(), config.getMinimumIdle() - getIdleConnections())
                                   - addConnectionQueue.size();
      for (int i = 0; i < connectionsToAdd; i++) {
         addConnectionExecutor.submit((i < connectionsToAdd - 1)? poolEntryCreator : postFillPoolEntryCreator); }}Copy the code

1.3. closeConnectionExecutor

Instantiation:

//HikariPool.java
      this.closeConnectionExecutor = createThreadPoolExecutor(config.getMaximumPoolSize(), poolName + " connection closer", threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
Copy the code
//UtilityElf.java
   public static ThreadPoolExecutor createThreadPoolExecutor(final int queueSize, final String threadName, ThreadFactory threadFactory, final RejectedExecutionHandler policy)
   {
      if (threadFactory == null) {
         threadFactory = new DefaultThreadFactory(threadName, true);
      }

      LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(queueSize);
      ThreadPoolExecutor executor = new ThreadPoolExecutor(1 /*core*/.1 /*max*/.5 /*keepalive*/, SECONDS, queue, threadFactory, policy);
      executor.allowCoreThreadTimeOut(true);
      return executor;
   }
Copy the code

Close the connection:

   void closeConnection(final PoolEntry poolEntry, final String closureReason)
   {
      if (connectionBag.remove(poolEntry)) {
         final Connection connection = poolEntry.close();
         closeConnectionExecutor.execute(() -> {
            quietlyCloseConnection(connection, closureReason);
            if(poolState == POOL_NORMAL) { fillPool(); }}); }}Copy the code

2. Related tools

class Duties and responsibilities
ThreadPoolExecutor Thread executor
BlockingQueue The buffer queue used by the thread pool. The queue length determines the maximum number of worker threads that can be buffered
ThreadFactory Create thread factories for worker threads
ScheduledThreadPoolExecutor Thread pool executor that supports scheduled scheduling and can specify deferred and periodic execution. This allows you to set the delay time to the maximum lifetime time to monitor whether the database connection has exceeded the maximum lifetime
DefaultThreadFactory The default thread factory implemented in HikariPool, which sets the thread name and makes the thread Sprite
RejectedExecutionHandler Process policy interface for adding new threads when the thread queue in thread executor is full
DiscardOldestPolicy Discard the oldest unexecuted worker thread in the thread queue and add a new worker thread that is not used in the HikariPool.
CallerRunsPolicy Repeat until successful and use in closeConnectionExecutor.
AbortPolicy Discard worker threads that exceed thread queue load and throw an exception. It is not used in HikariPool.
DiscardPolicy Worker threads that are complex beyond the thread queue are ignored and no processing is done. Used in houseKeepingExecutorService and houseKeepingExecutorService.

3. The core classes

3.1 ThreadPoolExecutor

The constructor:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
Copy the code

Parameter Description:

parameter instructions
int corePoolSize The minimum number of threads to hold in the thread pool executor
int maximumPoolSize The maximum number of threads allowed by the thread pool executor
long keepAliveTime The retention time of unused threads. If this time is exceeded and the number of threads is greater than the minimum number, the thread is released
TimeUnit Unit of retention time for unused threads
BlockingQueue The thread buffer queue, whose role is affected by maximumPoolSize, will not be queued when there are enough threads.
ThreadFactory Thread factory interface for generating worker threads
RejectedExecutionHandler The number of worker threads put into the cache queue exceeds the cache queue capacity

A few things to note here:

  1. The thread buffer queue should be set to a bounded queue to avoid memory overflow due to infinite size.
  2. The maximum number of threads should also be controlled appropriately, avoiding integer.max_value for the same reason as above.
  3. The processing logic of the thread buffer queue is affected by corePoolSize and maximumPoolSize. In short, workers are not put into the thread buffer queue when there are enough threads available.

Example:

import java.util.concurrent.*;
import static java.util.concurrent.ThreadPoolExecutor.*;

public class ThreadPoolExecutorTest {

    private static int runableNum = 1;

    public static void main(String[] args) {
        BlockingQueue<Runnable> queue = new LinkedBlockingQueue(3);
        // Change maximumPoolSize and maximumPoolSize to see the effect on queue processing logic
        ThreadPoolExecutor executor = new ThreadPoolExecutor(1.3.300, TimeUnit.SECONDS,
                queue, new DefaultThreadFactory(), new DefaultDiscardPolicy());
        while(true) {
            System.out.println("runableNum: " + runableNum);
            executor.execute(new DefaultRunnable("id-" + runableNum));
            runableNum++;
            quietlySleep(500); }}private static void quietlySleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch(InterruptedException ie) { Thread.currentThread().interrupt(); }}private static class DefaultRunnable implements Runnable {
        private String name;

        public DefaultRunnable(String name) {
            this.name = name;
        }

        @Override
        public void run(a) {
            System.out.println("Runnable-" + name + " run.");
            quietlySleep(3000);
        }

        public String getName(a) {
            return this.name; }}private static class DefaultDiscardPolicy extends DiscardPolicy {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            super.rejectedExecution(r, e);
            if (r instanceof DefaultRunnable) {
                DefaultRunnable defaultRunnable = (DefaultRunnable)r;
                System.out.println("Runnable-" + defaultRunnable.getName() + " be discard."); }}}private static class DefaultThreadFactory implements ThreadFactory {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            returnthread; }}}Copy the code

Output:

runableNum: 1
Runnable-id-1 run.
runableNum: 2
runableNum: 3
runableNum: 4
runableNum: 5
Runnable-id-5 run.
runableNum: 6
Runnable-id-6 run.
Runnable-id-2 run.
runableNum: 7
runableNum: 8
Runnable-id-8 be discard.
runableNum: 9
Runnable-id-9 be discard.
runableNum: 10
Runnable-id-10 be discard.
Runnable-id-3 run.
runableNum: 11
Runnable-id-4 run.
runableNum: 12
Runnable-id-7 run.
runableNum: 13
runableNum: 14
Runnable-id-14 be discard.
Copy the code

3.2 ScheduledThreadPoolExecutor

publicScheduledFuture<? > schedule(Runnable command,long delay,
                                       TimeUnit unit) {
Copy the code

You can leverage its ability to delay thread execution to monitor for connection leaks or exceeding the maximum lifetime.

4. To summarize

  1. ThreadPoolExecutor and ScheduledThreadPoolExecutor related classes is the core to improve the performance of thread tools, must be good at using.
  2. Make full use of thread tools to manage resource pools and arrange worker threads reasonably.

end.


<– Thanks for the triple punch, left likes and followings.


HikariPool source code (a) HikariPool source code (two) design ideas for reference HikariPool source code (three) resource pool dynamic scaling HikariPool source code (four) resource state HikariPool source code (six) to use some useful JAVA features


Java geek site: javageektour.com/