preface
From this chapter into the HikariCP core source code, this chapter to learn HikariCP to obtain and create a connection process.
A, agents,
Hikari is returned to the userConnection
,ResultSet
Java. SQL object instances are created byProxyFactory
The proxy object created. Such asConnection
The proxy object ofHikariProxyConnection
. The class files for these agents are generated by Javassist through bytecode, as shown in the generated logicJavassistProxyFactory
.
public static void main(String... args) throws Exception { classPool = new ClassPool(); classPool.importPackage("java.sql"); classPool.appendClassPath(new LoaderClassPath(JavassistProxyFactory.class.getClassLoader())); If (args. Length > 0) {// Generate class file location, default./target/classes genDirectory = args[0]; } // Generate Connection, Statement, ResultSet, DatabaseMetaData proxy class String methodBody = "{try {return delegate.method($$); } catch (SQLException e) { throw checkException(e); }} "; generateProxyClass(Connection.class, ProxyConnection.class.getName(), methodBody); generateProxyClass(Statement.class, ProxyStatement.class.getName(), methodBody); generateProxyClass(ResultSet.class, ProxyResultSet.class.getName(), methodBody); generateProxyClass(DatabaseMetaData.class, ProxyDatabaseMetaData.class.getName(), methodBody); MethodBody = "{try {return ((cast) delegate).method($$); } catch (SQLException e) { throw checkException(e); }} "; generateProxyClass(PreparedStatement.class, ProxyPreparedStatement.class.getName(), methodBody); generateProxyClass(CallableStatement.class, ProxyCallableStatement.class.getName(), methodBody); // modifyProxyFactory implementation modifyProxyFactory(); }Copy the code
The source code for ProxyFactory does not generate the proxy object directly; its implementation is also generated by Javassist.
public final class ProxyFactory { static ProxyConnection getProxyConnection(final PoolEntry poolEntry, final Connection connection, final FastList<Statement> openStatements, final ProxyLeakTask leakTask, final long now, final boolean isReadOnly, final boolean isAutoCommit) { throw new IllegalStateException("You need to run the CLI build and you need target/classes in your classpath to run."); }Copy the code
JavassistProxyFactory#modifyProxyFactory modifies ProxyFactory implementation.
private static void modifyProxyFactory() throws NotFoundException, CannotCompileException, IOException { String packageName = ProxyConnection.class.getPackage().getName(); CtClass proxyCt = classPool.getCtClass("com.zaxxer.hikari.pool.ProxyFactory"); for (CtMethod method : proxyCt.getMethods()) { switch (method.getName()) { case "getProxyConnection": SetBody ("{return new "+ packageName + ".HikariProxyConnection($$); } "); break; case "getProxyStatement": SetBody ("{return new "+ packageName + ".HikariProxyStatement($$); } "); break; case ... default: break; } } proxyCt.writeFile(genDirectory + "target/classes"); }Copy the code
2. Get connections
HikariPool#getConnection is the entry to get the connection, the timeout is the configured connectionTimeout, and the return object is HikariProxyConnection.
Public Connection getConnection (final long hardTimeout) throws SQLException {/ / get a semaphore suspendResumeLock. Acquire (); final long startTime = currentTime(); try { long timeout = hardTimeout; PoolEntry PoolEntry PoolEntry = connectionbag. borrow(timeout, MILLISECONDS); If (poolEntry == null) {// Borrow times out and returns null, breaking the loop; } final long now = currentTime(); / / poolEntry ousted or not survive state if (poolEntry. IsMarkedEvicted () | | (elapsedMillis (poolEntry lastAccessed, now) > aliveBypassWindowMs && ! IsConnectionAlive (poolEntry connection))) {/ / close the connection closeConnection (poolEntry, poolEntry isMarkedEvicted ()? EVICTED_CONNECTION_MESSAGE : DEAD_CONNECTION_MESSAGE); timeout = hardTimeout - elapsedMillis(startTime); } else {/ / create a connection broker return poolEntry. CreateProxyConnection (leakTaskFactory. The schedule (poolEntry), now); } } while (timeout > 0L); // throw createTimeoutException(startTime); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new SQLException(poolName + " - Interrupted during connection acquisition", e); } the finally {/ / release the semaphore suspendResumeLock. Release (); }}Copy the code
suspendResumeLock.acquire()
If isAllowPoolSuspension=true(the default is false), the suspendResumeLock instance is suspendResumeLock. If the connection pool is suspended, the semaphore is not available here and will be blocked until the connection pool is restored to its normal state.
Public class SuspendResumeLock {// acquisitionSemaphore private final Semaphore; Public void acquire () throws SQLException {/ / try to obtain the semaphore the if (acquisitionSemaphore. TryAcquire () {return; } / / attempt failed to get a semaphore, com. Zaxxer. Hikari. ThrowIfSuspended if it is true, An exception is thrown directly else if (Boolean. GetBoolean (" com. Zaxxer. Hikari. ThrowIfSuspended ")) {throw new SQLTransientException (" The pool is currently suspended and configured to throw exceptions upon acquisition"); } / / attempt failed to get a semaphore, blocked here waiting to get a new license acquisitionSemaphore. AcquireUninterruptibly (); }}Copy the code
If isAllowPoolSuspension=false, the suspendResumeLock instance is suspendResumeLock #FAUX_LOCK, and the semaphore is null.
public static final SuspendResumeLock FAUX_LOCK = new SuspendResumeLock(false) {
@Override
public void acquire() {}
@Override
public void release() {}
@Override
public void suspend() {}
@Override
public void resume() {}
};
Copy the code
connectionBag.borrow(timeout, MILLISECONDS)
Fetch an idle connection from connectionBag. If null is returned, timeout occurs and the loop ends with an exception. ConcurrentBag was discussed in Chapter 2.
public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException { // 1. Final List<Object> List = threadlist.get (); for (int i = list.size() - 1; i >= 0; i--) { final Object entry = list.remove(i); final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry; if (bagEntry ! = null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { return bagEntry; }} // Get final int waiting = waiters. IncrementAndGet (); try { for (T bagEntry : sharedList) { if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { if (waiting > 1) { listener.addBagItem(waiting - 1); } return bagEntry; }} // May tell HikariPool to add an Entry listener.addbagItem (waiting); Timeout = timeUnit.toNanos(timeout); // 3. do { final long start = currentTime(); final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS); if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { return bagEntry; } timeout -= elapsedNanos(start); } while (timeout > 10_000); return null; } finally { waiters.decrementAndGet(); }}Copy the code
poolEntry.isMarkedEvicted()
Check that the connection entry is not expelled. The expulsion is performed only after the HikariPool#softEvictConnection method is called.
private boolean softEvictConnection(final PoolEntry poolEntry, final String reason, Final Boolean owner) {// Flag is expelled poolentry.markevicted (); / / if the owner = true (user initiated soft expulsion) or retain entry if successful (owner | | connectionBag. Reserve (poolEntry)) {/ / close the connection closeConnection (poolEntry, reason); return true; } return false; }Copy the code
The softEvictConnection method is called in the following situations:
HikariPool#evictConnection
The user actively invokes the expulsion connection.HouseKeeper
Clock backtracking detected. Close all connections.HikariDataSource#close
Disable connection pooling.- The connection is automatically closed when MaxLifetime is exceeded.
elapsedMillis(poolEntry.lastAccessed, now) > aliveBypassWindowMs && ! isConnectionAlive(poolEntry.connection)
First determine whether the last entry is used time distance now more than com. Zaxxer. Hikari. AliveBypassWindowMs default 500 ms.
If a link survival check is required over 500ms, conversely, if the connection is frequently acquired and returned, no survival check is required.
Abstract class PoolBase {// from HikariConfig, default 5000ms long validationTimeout; Boolean isConnectionAlive(final Connection Connection) {try {try {// setNetworkTimeout to 5000ms setNetworkTimeout(Connection, validationTimeout); // 5s final int validationSeconds = (int) math.max (1000L, validationTimeout) / 1000; // If connectionTestQuery is null, Connection used to own isValid method detection / / for com. Mysql. Cj. JDBC. ConnectionImpl# isValid by means of ping an if (isUseJdbc4Validation) {return connection.isValid(validationSeconds); } // If connectionTestQuery is not null, Perform the configuration of SQL try (Statement Statement = connection. The createStatement ()) {if (isNetworkTimeoutSupported! = TRUE) { setQueryTimeout(statement, validationSeconds); } statement.execute(config.getConnectionTestQuery()); NetworkTimeout The default value is validationTimeout setNetworkTimeout(connection, networkTimeout); if (isIsolateInternalQueries && ! isAutoCommit) { connection.rollback(); } } return true; } catch (Exception e) { lastConnectionFailure.set(e); return false; }}}Copy the code
leakTaskFactory.schedule(poolEntry)
For the Schedule method, if leakDetectionThreshold is configured, ProxyLeakTask is periodically executed according to leakDetectionThreshold. Otherwise returns proxyLeaktask.no_leak, which is an empty implementation.
class ProxyLeakTaskFactory { private ScheduledExecutorService executorService; // Configure leakDetectionThreshold Private Long leakDetectionThreshold; ProxyLeakTaskFactory(final long leakDetectionThreshold, final ScheduledExecutorService executorService) { this.executorService = executorService; this.leakDetectionThreshold = leakDetectionThreshold; } ProxyLeakTask schedule(final PoolEntry poolEntry) { return (leakDetectionThreshold == 0) ? ProxyLeakTask.NO_LEAK : scheduleNewTask(poolEntry); } void updateLeakDetectionThreshold(final long leakDetectionThreshold) { this.leakDetectionThreshold = leakDetectionThreshold; } private ProxyLeakTask scheduleNewTask(PoolEntry poolEntry) { ProxyLeakTask task = new ProxyLeakTask(poolEntry); task.schedule(executorService, leakDetectionThreshold); return task; }}Copy the code
The connection leak detection timed task, as shown in ProxyLeakTask#run, prints only one log.
class ProxyLeakTask implements Runnable { static final ProxyLeakTask NO_LEAK; private ScheduledFuture<? > scheduledFuture; private String connectionName; private Exception exception; private String threadName; private boolean isLeaked; @Override public void run() { isLeaked = true; final StackTraceElement[] stackTrace = exception.getStackTrace(); final StackTraceElement[] trace = new StackTraceElement[stackTrace.length - 5]; System.arraycopy(stackTrace, 5, trace, 0, trace.length); exception.setStackTrace(trace); LOGGER. Warn ("Connection leak detection triggered for {} on thread {}, stack trace follows", connectionName, threadName, exception); }}Copy the code
poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now)
GetConnection finally calls the ProxyFactory generated by JavAssist to get the HikariProxyConnection.
Connection createProxyConnection(final ProxyLeakTask leakTask, final long now) {
return ProxyFactory.getProxyConnection(this, connection, openStatements, leakTask, now, isReadOnly, isAutoCommit);
}
Copy the code
The ProxyFactory simply calls the constructor of the HikariProxyConnection.
public final class ProxyFactory { static ProxyConnection getProxyConnection(PoolEntry var0, Connection var1, FastList<Statement> var2, ProxyLeakTask var3, long var4, boolean var6, boolean var7) { return new HikariProxyConnection(var0, var1, var2, var3, var4, var6, var7); }}Copy the code
Create a connection
getConnection
Does not see the actual fetch inConnection
, simply encapsulates the Connection in PoolEntry as a ProxyConnection. There are two trigger points for creating a ConnectionHikariPool#addBagItem
, one isHouseKeeper
Scheduled tasks. Look at this chapter firstHikariPool#addBagItem
. 在HikariPool#getConnection
Of course, ifConcurrentBag
Failed to get PoolEntry from ThreadLocalIBagStateListener#addBagItem
. That’s exactly what’s triggered hereHikariPool#addBagItem
, it creates the actual database Connection, encapsulates the actual Connection into PoolEntry, and puts PoolEntry into ConcurrentBag.
HikariPool#addBagItem
private final PoolEntryCreator poolEntryCreator = new PoolEntryCreator(null); private final Collection<Runnable> addConnectionQueueReadOnlyView; private final ThreadPoolExecutor addConnectionExecutor; public HikariPool(final HikariConfig config) { super(config); this.connectionBag = new ConcurrentBag<>(this); final int maxPoolSize = config.getMaximumPoolSize(); LinkedBlockingQueue<Runnable> addConnectionQueue = new LinkedBlockingQueue<>(maxPoolSize); this.addConnectionQueueReadOnlyView = unmodifiableCollection(addConnectionQueue); this.addConnectionExecutor = createThreadPoolExecutor(addConnectionQueue, poolName + " connection adder", threadFactory, new ThreadPoolExecutor.DiscardOldestPolicy()); } @Override public void addBagItem(final int waiting) { final boolean shouldAdd = waiting - addConnectionQueueReadOnlyView.size() >= 0; if (shouldAdd) { addConnectionExecutor.submit(poolEntryCreator); }}Copy the code
Waiting is passed in by ConcurrentBag and represents the number of threads currently waiting for PoolEntry (an imprecise number because the number of waiting threads changes from moment to moment).
AddConnectionExecutor is the thread pool responsible for creating connections, as described earlier in the configuration parameters. Number of core threads 1, maximum number of threads 1, 5 seconds idle time (set to allow core thread collection), wait queue length maxPoolSize, reject policy to discard the oldest task.
AddConnectionQueueReadOnlyView initialization, when HikariPool structure represents addConnectionExecutor waiting queue view.
PoolEntryCreator is a Callable task that creates PoolEntry.
AddBagItem determines whether the number of threads waiting for a connection is greater than or equal to the number of queues in the addConnectionExecutor wait queue. If so, the PoolEntryCreator task is submitted to the thread pool to create PoolEntry.
PoolEntryCreator
Create a PoolEntry Callable. Controls whether a connection needs to be added to the connection pool. A maximum of one PoolEntry can be created at a time.
private final class PoolEntryCreator implements Callable<Boolean> { private final String loggingPrefix; PoolEntryCreator(String loggingPrefix) { this.loggingPrefix = loggingPrefix; } @Override public Boolean call() { long sleepBackoff = 250L; / / whether the Connection pool need to add the Connection while (poolState = = POOL_NORMAL && shouldCreateAnotherConnection ()) {/ / create a Connection, create PoolEntry final PoolEntry poolEntry = createPoolEntry(); if (poolEntry ! = null) {// Add Bag connectionbag. add(poolEntry); // PoolEntry return Boolean.TRUE; } // Obtain the quietlySleep(sleepBackoff) for a maximum of 10 seconds; SleepBackoff = math.min (seconds.tomillis (10), math.min (connectionTimeout, (long) (sleepBackoff * 1.5))); } return Boolean.FALSE; } private synchronized boolean shouldCreateAnotherConnection() { return getTotalConnections() < config.getMaximumPoolSize() && (connectionBag.getWaitingThreadCount() > 0 || getIdleConnections() < config.getMinimumIdle()); } @Override public int getTotalConnections() { // sharedList.size() return connectionBag.size(); }}Copy the code
ShouldCreateAnotherConnection determine whether need to add the connection. GetTotalConnections () < config. GetMaximumPoolSize () is the prerequisite, the current total number of connections that load-balanced across shareList PoolEntry in quantity, must be smaller than MaxPoolSize configuration. Any of the following conditions must be met:
- ConnectionBag. GetWaitingThreadCount () > 0, waiters. In the load-balanced across the get () returns the waiting thread number greater than zero. Indicates that a thread really needs to get a connection and is waiting to get PoolEntry from shareList or handoffQueue.
- GetIdleConnections () < config.getMinimumIdle() : ConcurrentBag status is
STATE_NOT_IN_USE
The number of poolentries is smaller than the configured numberminimumIdle
Minimum number of connections.
The HikariPool#createPoolEntry method creates PoolEntry. If the connection exceeds maxLifetime, the connection is closed and HikariPool#addItem is notified to add the connection.
Private PoolEntry createPoolEntry() {try {// createPoolEntry final PoolEntry PoolEntry = newPoolEntry(); final long maxLifetime = config.getMaxLifetime(); If (maxLifetime > 0) {// Calculate lifetime subtract a random number from maxLifetime to prevent a large number of connections from being closed at the same time final Long variance = maxLifetime > 10000? ThreadLocalRandom.current().nextLong( maxLifetime / 40 ) : 0; final long lifetime = maxLifetime - variance; / / after the MaxLifeTime the connection, to create a connection poolEntry. SetFutureEol (houseKeepingExecutorService. The schedule (() - > {/ / soft expulsion, front said before, If the entry status changes to Reserve successfully, If (softEvictConnection(poolEntry, "(Connection has passed maxLifetime)", False)) {/ / notify HikariPool addBagItem increase elements (connectionBag. GetWaitingThreadCount ()); } }, lifetime, MILLISECONDS)); } return poolEntry; } catch (ConnectionSetupException e) { if (poolState == POOL_NORMAL) { lastConnectionFailure.set(e); }} catch (Exception e) {} // Return null if there is an Exception; }Copy the code
PoolBase#newPoolEntry DriverDataSource gets the real Connection.
PoolEntry newPoolEntry() throws Exception { return new PoolEntry(newConnection(), this, isReadOnly, isAutoCommit); } private Connection newConnection() throws Exception { Connection connection = null; // DriverDataSource get Connection String username = config.getUsername(); // DriverDataSource get Connection String username = config.getUsername(); String password = config.getPassword(); connection = (username == null) ? dataSource.getConnection() : dataSource.getConnection(username, password); if (connection == null) { throw new SQLTransientConnectionException("DataSource returned null unexpectedly"); } // Initialize some parameters of the connection, such as readOnly, autoCommit setupConnection(connection); lastConnectionFailure.set(null); return connection; } catch (Exception e) { throw e; }}Copy the code
conclusion
GetConnection gets the Connection by wrapping the Connection instance in PoolEntry to the user as a proxy object, HikariProxyConnection.
When getConnection fails to retrieve PoolEntry from ConcurrentBag, execute HikariPool#addBagItem to submit PoolEntryCreator to the thread pool and asynchronously create PoolEntry into ConcurrentBag.
Original is not easy, welcome to comment, like and attention. Welcome to pay attention to the public number: program ape YUE.