
Flink + Mybatis +druid is used in the project for persistence, but the current use method is very crude: each operator instance has an sqlSessionFactory, which actually causes a large waste of resources. SqlSessionFactory initSqlSessionFactory(Set mapperXmls, DataSource DataSource). In fact, each instance of flink operator (each instance of parallelism operator) has a connection pool. When processing data, they take connections from their own connection pool and do not share the connections created by this connection pool, so the utilization rate is very low.

I decided to take the opportunity to take a closer look at druid and think about how to optimize the current use of Flink.

Druid + H2

In this article, we will introduce how to use Druid connection pool to implement simple operation of the H2 database. Test1 () uses druid connection pools to insert into the database, and test2() uses native JDBC for inserts. You can see there’s not a lot of difference in general usage. Initialize the password connection, etc. 2) get the connection, and 3) get the prepareStatement using the connection. 4) Execute statement. 5) Close statement. 6) Close connection

public class TestDruidDataSourceH2DB {
    public static String insertSql = "INSERT INTO TEST VALUES(\'%s\', 'World')";

    public static void test1(a) throws SQLException {
        DruidDataSource dataSource = new DruidDataSource();
        String pwd = "";
        String user = "sa";
        String url = "jdbc:h2:tcp://localhost/~/test";
        String driverName = "org.h2.Driver";

        // Configure the basic four database items, create connections, initialize the connection pool container

        // Specify the number of initializations
		// Specify the maximum number
		// Specify the number of peak periods
		// Specify the wait time

		// Get the connection object from the connection pool
        Connection connection = dataSource.getConnection();


		// Return to connection pool
        connection.close(); // This object is a subclass provided by Druid, which implements an enhancement to the close method. Instead of destroying the object, it is returned to the connection pool

    public static void insert(Connection connection) throws SQLException {
        String id = "id";
        String id_numeric = String.valueOf(System.currentTimeMillis());
        System.out.println("description :" + id_numeric);

        String provinceId = "provinceId";
        String cityName = "cityName";

        String insert = String.format(insertSql, id_numeric);
        / / for the statement
        PreparedStatement preparedStatement = connection.prepareStatement(insert);




    public static void test2(a) {
        Connection conn = null;
        Statement stmt = null;

        String pwd = "";
        String user = "sa";
        String url = "jdbc:h2:tcp://localhost/~/test";
        String driverName = "org.h2.Driver";
        try {

            conn = DriverManager.getConnection(url, user, pwd);
        } catch(Exception exception) { exception.printStackTrace(); }}public static void main(String[] args) throws SQLException { test1(); }}Copy the code

DruidDataSource’s main methods and properties

Druid datasource the Druid datasource core logic is in the DruidDataSource class. As you can see from the above two examples, most of the code is the same, starting with the way the connection is retrieved. Druid: connect to Druid

 Connection connection = dataSource.getConnection();
Copy the code

The DruidDataSource main class inheritance structure with its member variables and methods is shown below.

Class or attribute name describe
ExceptionSorter Used to determine whether an SQLException object is a fatal exception
ValidConnectionChecker Verifies that the specified connection object is valid
CreateConnectionThread DruidDataSource inner class for asynchronously creating connection objects
notEmpty When notempty.await () is called, the current thread enters wait; When a connection is created or reclaimed, notempty.signal () is called and the waiting thread is woken up.
empty When empty.await() is called, the CreateConnectionThread enters the wait; When empty.Signal () is called, the CreateConnectionThread is woken up and the connection is created;
DestroyConnectionThread DruidDataSource inner classes for asynchronous test connection objects, including phyTimeoutMillis, minEvictableIdleTimeMillis check idle connections, And check out the connection removeAbandonedTimeoutMillis
LogStatsThread DruidDataSource’s inner class for logging statistics asynchronously
connections Used to hold all connection objects
evictConnections Used to store connection objects to be discarded
keepAliveConnections Used to store connection objects that need keepAlive
activeConnections Used to hold connection objects that need to be removeAbandoned
poolingCount The number of free connection objects
activeCount The number of connection objects lent

The DruidDatasource class works as follows:

Druid: connect to SQL server

Here’s a rundown of how the DruidDataSource works for the above test examples

public static void test1(a) throws SQLException {
              DruidDataSource dataSource = new DruidDataSource();
        String pwd = "";
        String user = "sa";
        String url = "jdbc:h2:tcp://localhost/~/test";
        String driverName = "org.h2.Driver";

        // Configure the basic four database items, create connections, initialize the connection pool container

        // Specify the number of initializations
        // Specify the maximum number
        // Specify the number of peak periods
        // Specify the wait time
        // Set the number of initial connections

        // Get the connection object from the connection pool
        Connection connection = dataSource.getConnection();


        // Return to connection pool
        connection.close(); // This object is a subclass provided by Druid, which implements an enhancement to the close method. Instead of destroying the object, it is returned to the connection pool
Copy the code

It’s just a little bit more initialization. The role of some important parameters will be added later.

3.1 DruidDataSource constructor

// Create a DruidDataSource instance
DruidDataSource dataSource = new DruidDataSource();
Copy the code

	//DruidDataSource constructor
    public DruidDataSource(a){

    public DruidDataSource(boolean fairLock){

        // Initialize some variables from the System configuration
Copy the code

   super(fairLock) calls the constructor of the parent classpublic DruidAbstractDataSource(boolean lockFair){
   		/ / initialization
        lock = new ReentrantLock(lockFair);

        // Initialize two Condition classes. Use to synchronize CreateConnectionThread and DestroyConnectionThread
        notEmpty = lock.newCondition();
        empty = lock.newCondition();
Copy the code

3.2 Obtaining connection objects from the connection pool

// Get the connection object from the connection pool
Connection connection = dataSource.getConnection();
Copy the code
public DruidPooledConnection getConnection(a) throws SQLException {
    return getConnection(maxWait);

public DruidPooledConnection getConnection(a) throws SQLException {
    return getConnection(maxWait);

 public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {

        if (filters.size() > 0) {
            FilterChainImpl filterChain = new FilterChainImpl(this);
            return filterChain.dataSource_connect(this, maxWaitMillis);
        } else {
            returngetConnectionDirect(maxWaitMillis); }}Copy the code

3.2.1 the init () method

public void init(a) throws SQLException {
    if (inited) {

    // Register the driver

    / / acquiring a lock
    final ReentrantLock lock = this.lock;
    try {
    } catch (InterruptedException e) {
        throw new SQLException("interrupt", e);

    boolean init = false;
    try {
        if (inited) {

        initStackTrace = Utils.toString(Thread.currentThread().getStackTrace());

        // AtomicLongfield dupdater is used to update atoms, ensuring safe writing and efficient reading = DruidDriver.createDataSourceId();
        if ( > 1) {
            long delta = ( - 1) * 100000;
            this.connectionIdSeedUpdater.addAndGet(this, delta);
            this.statementIdSeedUpdater.addAndGet(this, delta);
            this.resultSetIdSeedUpdater.addAndGet(this, delta);
            this.transactionIdSeedUpdater.addAndGet(this, delta);

        // Determine the dbType based on the URL prefix
        if (this.jdbcUrl ! =null) {
            this.jdbcUrl = this.jdbcUrl.trim();
            // Druid is a custom url format
            // JDBC :wrap-jdbc: starts with driver, name, and JMX

        // Initialize the filter. How to use filter for ????
        for (Filter filter : filters) {

        if (this.dbTypeName == null || this.dbTypeName.length() == 0) {
            this.dbTypeName = JdbcUtils.getDbType(jdbcUrl, null);

        DbType dbType = DbType.of(this.dbTypeName);
        if (dbType == DbType.mysql
                || dbType == DbType.mariadb
                || dbType == DbType.oceanbase
                || dbType == {
            boolean cacheServerConfigurationSet = false;
            if (this.connectProperties.containsKey("cacheServerConfiguration")) {
                cacheServerConfigurationSet = true;
            } else if (this.jdbcUrl.indexOf("cacheServerConfiguration") != -1) {
                cacheServerConfigurationSet = true;
            if (cacheServerConfigurationSet) {
                this.connectProperties.put("cacheServerConfiguration"."true"); }}// Check parameters
        if (maxActive <= 0) {
            throw new IllegalArgumentException("illegal maxActive " + maxActive);
        if (maxActive < minIdle) {
            throw new IllegalArgumentException("illegal maxActive " + maxActive);

        if (getInitialSize() > maxActive) {
            throw new IllegalArgumentException("illegal initialSize " + this.initialSize + ", maxActive " + maxActive);

        if (timeBetweenLogStatsMillis > 0 && useGlobalDataSourceStat) {
            throw new IllegalArgumentException("timeBetweenLogStatsMillis not support useGlobalDataSourceStat=true");

        if (maxEvictableIdleTimeMillis < minEvictableIdleTimeMillis) {
            throw new SQLException("maxEvictableIdleTimeMillis must be grater than minEvictableIdleTimeMillis");

        if (keepAlive && keepAliveBetweenTimeMillis <= timeBetweenEvictionRunsMillis) {
            throw new SQLException("keepAliveBetweenTimeMillis must be grater than timeBetweenEvictionRunsMillis");
         // Check parameters
        if (this.driverClass ! =null) {
            this.driverClass = driverClass.trim();
        // Use the SPI mechanism to load the filter. This part of the filter will put autoFilters as well as filters




        if (isUseGlobalDataSourceStat()) {
            dataSourceStat = JdbcDataSourceStat.getGlobal();
            if (dataSourceStat == null) {
                dataSourceStat = new JdbcDataSourceStat("Global"."Global".this.dbTypeName);
            if (dataSourceStat.getDbType() == null) {
                dataSourceStat.setDbType(this.dbTypeName); }}else {
            // To monitor SQL. But there are risks of memory leak reference:
            dataSourceStat = new JdbcDataSourceStat(, this.jdbcUrl, this.dbTypeName, this.connectProperties);

        connections = new DruidConnectionHolder[maxActive];
        evictConnections = new DruidConnectionHolder[maxActive];
        keepAliveConnections = new DruidConnectionHolder[maxActive];

        SQLException connectError = null;

        if(createScheduler ! =null && asyncInit) {
            for (int i = 0; i < initialSize; ++i) {
                submitCreateTask(true); }}else if(! asyncInit) {// init connections
            while (poolingCount < initialSize) {
                try {
                    PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection();
                    DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo);
                    connections[poolingCount++] = holder;
                } catch (SQLException ex) {
                    LOG.error("init datasource error, url: " + this.getUrl(), ex);
                    if (initExceptionThrow) {
                        connectError = ex;
                    } else {
                        Thread.sleep(3000); }}}if (poolingCount > 0) {
                poolingPeak = poolingCount;
                poolingPeakTime = System.currentTimeMillis();


        init = true;

        initedTime = new Date();

        if(connectError ! =null && poolingCount == 0) {
            throw connectError;

        if (keepAlive) {
            // async fill to minIdle
            if(createScheduler ! =null) {
                for (int i = 0; i < minIdle; ++i) {
                    submitCreateTask(true); }}else {
                this.emptySignal(); }}}catch (SQLException e) {
        LOG.error("{dataSource-" + this.getID() + "} init error", e);
        throw e;
    } catch (InterruptedException e) {
        throw new SQLException(e.getMessage(), e);
    } catch (RuntimeException e){
        LOG.error("{dataSource-" + this.getID() + "} init error", e);
        throw e;
    } catch (Error e){
        LOG.error("{dataSource-" + this.getID() + "} init error", e);
        throw e;

    } finally {
        inited = true;

        if (init && LOG.isInfoEnabled()) {
            String msg = "{dataSource-" + this.getID();

            if ( ! =null&&! {
                msg += ",";
                msg +=;

            msg += "} inited";; }}}Copy the code

Initialize the Filter

You can also use the SPI mechanism to load filters. Note that the AutoLoad annotation is also required and that load. Spifilter.skip =false is configured

 private void initFromSPIServiceLoader(a) {
        if (loadSpifilterSkip) {

        if (autoFilters == null) {
            List<Filter> filters = new ArrayList<Filter>();
            ServiceLoader<Filter> autoFilterLoader = ServiceLoader.load(Filter.class);

            for (Filter filter : autoFilterLoader) {
                AutoLoad autoLoad = filter.getClass().getAnnotation(AutoLoad.class);
                if(autoLoad ! =null && autoLoad.value()) {
            autoFilters = filters;

        for (Filter filter : autoFilters) {
            if (LOG.isInfoEnabled()) {
      "load filter from spi :"+ filter.getClass().getName()); } addFilter(filter); }}Copy the code

The Filter is loaded by reflection

private S nextService(a) {
    if(! hasNextService())throw new NoSuchElementException();
    String cn = nextName;
    nextName = null; Class<? > c =null;
    try {
        / / load the classes
        c = Class.forName(cn, false, loader);
    } catch (ClassNotFoundException x) {
             "Provider " + cn + " not found");
    if(! service.isAssignableFrom(c)) { fail(service,"Provider " + cn  + " not a subtype");
    try {
        / / instantiate
        S p = service.cast(c.newInstance());
        providers.put(cn, p);
        return p;
    } catch (Throwable x) {
             "Provider " + cn + " could not be instantiated",
    throw new Error();          // This cannot happen
Copy the code

Initializes the Driver

DriverClass load of reflection configuration, and invoke the DriverManager. RegisterDriver register (driver).

protected void resolveDriver(a) throws SQLException {
    if (this.driver == null) {
        if (this.driverClass == null || this.driverClass.isEmpty()) {
            this.driverClass = JdbcUtils.getDriverClassName(this.jdbcUrl);

        if (MockDriver.class.getName().equals(driverClass)) {
            driver = MockDriver.instance;
        } else if ("".equals(driverClass)) {
            Properties info = new Properties();
            info.put("user", username);
            info.put("password", password);
            driver = new BalancedClickhouseDriver(jdbcUrl, info);
        } else {
            if (jdbcUrl == null && (driverClass == null || driverClass.length() == 0)) {
                throw new SQLException("url not set"); } driver = JdbcUtils.createDriver(driverClassLoader, driverClass); }}else {
        if (this.driverClass == null) {
            this.driverClass = driver.getClass().getName(); }}}public static Driver createDriver(ClassLoader classLoader, String driverClassName) throws SQLException { Class<? > clazz =null;
        if(classLoader ! =null) {
            try {
                clazz = classLoader.loadClass(driverClassName);
            } catch (ClassNotFoundException e) {
                // skip}}if (clazz == null) {
            try {
                ClassLoader contextLoader = Thread.currentThread().getContextClassLoader();
                if(contextLoader ! =null) { clazz = contextLoader.loadClass(driverClassName); }}catch (ClassNotFoundException e) {
                // skip}}if (clazz == null) {
            try {
                // Load the configured Driver Class
                clazz = Class.forName(driverClassName);
            } catch (ClassNotFoundException e) {
                throw newSQLException(e.getMessage(), e); }}try {
            / / instantiate
            return (Driver) clazz.newInstance();
        } catch (IllegalAccessException e) {
            throw new SQLException(e.getMessage(), e);
        } catch (InstantiationException e) {
            throw newSQLException(e.getMessage(), e); }}Copy the code

Calibration parameters

        // Check maxActive, minIdle, initialSize, timeBetweenLogStatsMillis, useGlobalDataSourceStat, maxEvictableIdleTimeMillis, minEvictabl Check whether configurations such as eIdleTimeMillis are valid
        / /......

        // For Oracle and DB2, validationQuery needs to be verified
        / / when opened testOnBorrow/testOnReturn/testWhileIdle, determine whether to set the validationQuery, no words will print error messages

Copy the code

Initialize ExceptionSorter, ValidConnectionChecker, and JdbcDataSourceStat

Instantiate different Exceptionsorters based on different data drivers. The ExceptionSorter interface is shown below, from which you can see that this class is used to determine whether an exception is fatal"Fatal"abnormalpublic interface ExceptionSorter {

     * Returns true or false whether or not the exception is fatal.
     * @param e the exception
     * @return true or false if the exception is fatal.
    boolean isExceptionFatal(SQLException e);
    void configFromProperties(Properties properties);

private void initExceptionSorter(a) {
    if (exceptionSorter instanceof NullExceptionSorter) {
        if (driver instanceof MockDriver) {
            return; }}else if (this.exceptionSorter ! =null) {

    for(Class<? > driverClass = driver.getClass();;) { String realDriverClassName = driverClass.getName();if (realDriverClassName.equals(JdbcConstants.MYSQL_DRIVER) //
                || realDriverClassName.equals(JdbcConstants.MYSQL_DRIVER_6)) {
            this.exceptionSorter = new MySqlExceptionSorter();
            this.isMySql = true;
        } else if (realDriverClassName.equals(JdbcConstants.ORACLE_DRIVER)
                || realDriverClassName.equals(JdbcConstants.ORACLE_DRIVER2)) {
            this.exceptionSorter = new OracleExceptionSorter();
        } else if (realDriverClassName.equals(JdbcConstants.OCEANBASE_DRIVER)) { // Write a real TestCase
            if ( {
                this.exceptionSorter = new OceanBaseOracleExceptionSorter();
            } else {
                this.exceptionSorter = newMySqlExceptionSorter(); }}else if (realDriverClassName.equals("com.informix.jdbc.IfxDriver")) {
            this.exceptionSorter = new InformixExceptionSorter();

        } else if (realDriverClassName.equals("com.sybase.jdbc2.jdbc.SybDriver")) {
            this.exceptionSorter = new SybaseExceptionSorter();

        } else if (realDriverClassName.equals(JdbcConstants.POSTGRESQL_DRIVER)
                || realDriverClassName.equals(JdbcConstants.ENTERPRISEDB_DRIVER)
                || realDriverClassName.equals(JdbcConstants.POLARDB_DRIVER)) {
            this.exceptionSorter = new PGExceptionSorter();

        } else if (realDriverClassName.equals("")) {
            this.exceptionSorter = new MockExceptionSorter();
        } else if (realDriverClassName.contains("DB2")) {
            this.exceptionSorter = new DB2ExceptionSorter();

        } else{ Class<? > superClass = driverClass.getSuperclass();if(superClass ! =null&& superClass ! = Object.class) { driverClass = superClass;continue; }}break; }}Copy the code
// Use different connection checkers for different driver classes. For example, when testOnBorrow is set to true, ValidConnectionChecker is called to check that the connection is valid
private void initValidConnectionChecker(a) {
    if (this.validConnectionChecker ! =null) {

    String realDriverClassName = driver.getClass().getName();
    if (JdbcUtils.isMySqlDriver(realDriverClassName)) {
        this.validConnectionChecker = new MySqlValidConnectionChecker();

    } else if (realDriverClassName.equals(JdbcConstants.ORACLE_DRIVER)
            || realDriverClassName.equals(JdbcConstants.ORACLE_DRIVER2)) {
        this.validConnectionChecker = new OracleValidConnectionChecker();

    } else if (realDriverClassName.equals(JdbcConstants.SQL_SERVER_DRIVER)
               || realDriverClassName.equals(JdbcConstants.SQL_SERVER_DRIVER_SQLJDBC4)
               || realDriverClassName.equals(JdbcConstants.SQL_SERVER_DRIVER_JTDS)) {
        this.validConnectionChecker = new MSSQLValidConnectionChecker();

    } else if (realDriverClassName.equals(JdbcConstants.POSTGRESQL_DRIVER)
            || realDriverClassName.equals(JdbcConstants.ENTERPRISEDB_DRIVER)
            || realDriverClassName.equals(JdbcConstants.POLARDB_DRIVER)) {
        this.validConnectionChecker = newPGValidConnectionChecker(); }}Copy the code

private void validationQueryCheck(a) {
    if(! (testOnBorrow || testOnReturn || testWhileIdle)) {return;

    if (this.validConnectionChecker ! =null) {

    if (this.validationQuery ! =null && this.validationQuery.length() > 0) {

    String errorMessage = "";

    if (testOnBorrow) {
        errorMessage += "testOnBorrow is true, ";

    if (testOnReturn) {
        errorMessage += "testOnReturn is true, ";

    if (testWhileIdle) {
        errorMessage += "testWhileIdle is true, ";

    LOG.error(errorMessage + "validationQuery not set");
Copy the code

Initialize three connection object encapsulation classes

connections = new DruidConnectionHolder[maxActive];
evictConnections = new DruidConnectionHolder[maxActive];
keepAliveConnections = new DruidConnectionHolder[maxActive];
Copy the code

The DruidConnectionHolder class has the following properties and methods:

Initialize the connection with the number of parameters initialSize

There are two ways to create a vm, synchronous or asynchronous

// init connections
while (poolingCount < initialSize) {
    try {
    	// Create a connection. Encapsulate it with DruidConnectionHolder
        PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection();
        DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo);
        connections[poolingCount++] = holder;
    } catch (SQLException ex) {
        LOG.error("init datasource error, url: " + this.getUrl(), ex);
        if (initExceptionThrow) {
            connectError = ex;
        } else {
            Thread.sleep(3000); }}}Copy the code

The process of creating a connection

Most of the previous code can be ignored for now, mainly the createPhysicalConnection and initPhysicalConnection methods

public PhysicalConnectionInfo createPhysicalConnection(a) throws SQLException {
    String url = this.getUrl();
    Properties connectProperties = getConnectProperties();

    String user;
    if(getUserCallback() ! =null) {
        user = getUserCallback().getName();
    } else {
        user = getUsername();

    String password = getPassword();
    PasswordCallback passwordCallback = getPasswordCallback();

    if(passwordCallback ! =null) {
        if (passwordCallback instanceof DruidPasswordCallback) {
            DruidPasswordCallback druidPasswordCallback = (DruidPasswordCallback) passwordCallback;


        char[] chars = passwordCallback.getPassword();
        if(chars ! =null) {
            password = new String(chars);

    Properties physicalConnectProperties = new Properties();
    if(connectProperties ! =null) {

    if(user ! =null&& user.length() ! =0) {
        physicalConnectProperties.put("user", user);

    if(password ! =null&& password.length() ! =0) {
        physicalConnectProperties.put("password", password);

    Connection conn = null;

    long connectStartNanos = System.nanoTime();
    long connectedNanos, initedNanos, validatedNanos;

    Map<String, Object> variables = initVariants
            ? new HashMap<String, Object>()
            : null;
    Map<String, Object> globalVariables = initGlobalVariants
            ? new HashMap<String, Object>()
            : null;

    createStartNanosUpdater.set(this, connectStartNanos);
    try {
        // Start calling the driver here. Refer to the introduction below
        conn = createPhysicalConnection(url, physicalConnectProperties);
        connectedNanos = System.nanoTime();

        if (conn == null) {
            throw new SQLException("connect error, url " + url + ", driverClass " + this.driverClass);

        initPhysicalConnection(conn, variables, globalVariables);
        initedNanos = System.nanoTime();

        validatedNanos = System.nanoTime();

    } catch (SQLException ex) {
        throw ex;
    } catch (RuntimeException ex) {
        throw ex;
    } catch (Error ex) {
        throw ex;
    } finally {
        long nano = System.nanoTime() - connectStartNanos;
        createTimespan += nano;

    return new PhysicalConnectionInfo(conn, connectStartNanos, connectedNanos, initedNanos, validatedNanos, variables, globalVariables);
Copy the code

DruidAbstractDataSource class creates a connection method

The connection creation process is performed by the corresponding Driver’s CONNECT method. Returns a Connect class that encapsulates the Socket

public Connection createPhysicalConnection(String url, Properties info) throws SQLException {
    Connection conn;
    if (getProxyFilters().size() == 0) {
        // getDriver returns the driver class. Here is the org.h2.Driver we registered.
        conn = getDriver().connect(url, info);
    } else {
        conn = new FilterChainImpl(this).connection_connect(info);


    return conn;
Copy the code

Here is a h2 database Driver org.h2.Driver to explain how to “create a connection”. The driver source code is also very simple, but the so-called sparrow is small, the five organs, by looking at the source can fully understand the connection process.

Org.h2. Driver Creates a connection
public Connection connect(String url, Properties info) throws SQLException {
    try {
        if (info == null) {
            info = new Properties();
        if(! acceptsURL(url)) {return null;
        if (url.equals(DEFAULT_URL)) {
            return DEFAULT_CONNECTION.get();
       // This looks updated. You go in and you go back. Never mind.
        Connection c = DbUpgrade.connectOrUpgrade(url, info);
        if(c ! =null) {
            return c;
         // Create a connection here
        return new JdbcConnection(url, info);
    } catch (Exception e) {
        throwDbException.toSQLException(e); }}Copy the code
  //JdbcConnection constructor
    public JdbcConnection(ConnectionInfo ci, boolean useBaseDir)
            throws SQLException {
        try {
            if (useBaseDir) {
                String baseDir = SysProperties.getBaseDir();
                if(baseDir ! =null) { ci.setBaseDir(baseDir); }}// This will return an embedded or server connection. It encapsulates Socket information
            session = new SessionRemote(ci).connectEmbeddedOrServer(false);
            trace = session.getTrace();
            int id = getNextId(TraceObject.CONNECTION);
            setTrace(trace, TraceObject.CONNECTION, id);
            this.user = ci.getUserName();
            if (isInfoEnabled()) {
                trace.infoCode("Connection " + getTraceObjectName()
                        + " = DriverManager.getConnection("
                        + quote(ci.getOriginalURL()) + "," + quote(user)
                        + ", \ "\");");
            this.url = ci.getURL();
            scopeGeneratedKeys = ci.getProperty("SCOPE_GENERATED_KEYS".false);
            watcher = CloseWatcher.register(this, session, keepOpenStackTrace);
        } catch (Exception e) {
            throwlogAndConvert(e); }}Copy the code

JdbcConnection class main constructs:

As you can see, the JdbcConnection class has a property called SessionInterface and its implementation class is SessionRemote. The SessionRemote class has a List of Transfer classes (by the way, the List is used for multiple database hosts). The Transfer class has a Socket property, and classes such as DataInputInputStream and DataOutputStream for communication

// Initialize the Transfer class. Here you can see that the username, password, and so on are set up to connect to the database host
  private Transfer initTransfer(ConnectionInfo ci, String db, String server)
            throws IOException {
        Socket socket = NetUtils.createSocket(server,
                Constants.DEFAULT_TCP_PORT, ci.isSSL());
        Transfer trans = new Transfer(this, socket);
       / / user name
      / / password
        String[] keys = ci.getKeys();
        for (String key : keys) {
        try {
            clientVersion = trans.readInt();
            if (clientVersion >= Constants.TCP_PROTOCOL_VERSION_14) {
                if(ci.getFileEncryptionKey() ! =null) {
            if (clientVersion >= Constants.TCP_PROTOCOL_VERSION_15) {
                autoCommit = trans.readBoolean();
            } else {
                autoCommit = true;
            return trans;
        } catch (DbException e) {
            throwe; }}Copy the code

The detailed creation process will not be introduced here, after all, this is not the focus of this article, interested readers can download THE SOURCE code of H2 to learn, in addition, the author has seen the company’s own research database Java client driver in work. The process is similar. The Connection class encapsulates Socket information. The sending and receiving messages are then written to and read from the socket (similar to the flow of socket and ServerSocket communication).

DruidAbstractDataSource validateConnection method

After the createPhysicalConnection method has created the connection, Druid verifies that the connection is valid. The dataSource method can use the dataSource. SetValidationQuery () set is used to verify statements. Usually we can set validationQuery = SELECT 1 or something like that.

public void validateConnection(Connection conn) throws SQLException {
    // Returns the attribute validationQuery
    String query = getValidationQuery();
    if (conn.isClosed()) {
        throw new SQLException("validateConnection: connection closed");

    // If validConnectionChecker is not null, validConnectionChecker is used to check whether the connection is valid. Specific for Mysql, PG, Oracle, etc
    // validConnectionChecker
    if(validConnectionChecker ! =null) {
        boolean result;
        Exception error = null;
        try {
            result = validConnectionChecker.isValidConnection(conn, validationQuery, validationQueryTimeout);

            if (result && onFatalError) {
                try {
                    if (onFatalError) {
                        onFatalError = false; }}finally{ lock.unlock(); }}}catch (SQLException ex) {
            throw ex;
        } catch (Exception ex) {
            result = false;
            error = ex;
        if(! result) { SQLException sqlError = error ! =null ? //
                new SQLException("validateConnection false", error) //
                : new SQLException("validateConnection false");
            throw sqlError;
    // If validationQuery is set, the query will be executed. If no result is returned, an exception is thrown
    if (null! = query) { Statement stmt =null;
        ResultSet rs = null;
        try {
            stmt = conn.createStatement();
            if (getValidationQueryTimeout() > 0) {
            rs = stmt.executeQuery(query);
            if(! {throw new SQLException("validationQuery didn't return a row");

            if (onFatalError) {
                try {
                    if (onFatalError) {
                        onFatalError = false; }}finally{ lock.unlock(); }}}finally{ JdbcUtils.close(rs); JdbcUtils.close(stmt); }}}Copy the code
Druid returns the Connection encapsulated as PhysicalConnectionInfo

The main attributes of PhysicalConnectionInfo are as follows:

private Connection connection;
private long connectStartNanos;
private long connectedNanos;
private long initedNanos;
private long validatedNanos;
private Map<String, Object> vairiables;
private Map<String, Object> globalVairiables;
Copy the code
DruidConnectionHolder encapsulates DataSource and PhysicalConnectionInfo together

DruidConnectionHolder[] connections Adds the connection to the end

// init connections
while (poolingCount < initialSize) {
    try {
    	// Create a connection. Encapsulate it with DruidConnectionHolder
        PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection();
        DruidConnectionHolder encapsulates DataSource and PhysicalConnectionInfo together
        DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo);
        connections[poolingCount++] = holder;
    } catch (SQLException ex) {
        LOG.error("init datasource error, url: " + this.getUrl(), ex);
        if (initExceptionThrow) {
            connectError = ex;
        } else {
            Thread.sleep(3000); }}}Copy the code
// DruidConnectionHolder constructor.
public DruidConnectionHolder(DruidAbstractDataSource dataSource, PhysicalConnectionInfo pyConnectInfo)
                                                                                                      throws SQLException{
Copy the code

Call createAndLogThread (), createAndStartCreatorThread (), createAndStartDestroyThread () method to create threads

// Start the monitoring data recording thread
// Start the connection creation thread
// Start the connection detection thread

// The main thread uses CountDownLatch to wait until the CreateConnectionThread and DestroyConnectionThread above are created
Copy the code
CreateAndStartCreatorThread run method of a class
public void run(a) {

    long lastDiscardCount = 0;
    int errorCount = 0;
    for (;;) {
        // addLast
        try {
        } catch (InterruptedException e2) {

        long discardCount = DruidDataSource.this.discardCount;
        boolean discardChanged = discardCount - lastDiscardCount > 0;
        lastDiscardCount = discardCount;

        try {
            boolean emptyWait = true;

            if(createError ! =null
                    && poolingCount == 0
                    && !discardChanged) {
                emptyWait = false;

            if (emptyWait
                    && asyncInit && createCount < initialSize) {
                emptyWait = false;

            if (emptyWait) {
                // There must be a thread wait before a connection is created
                // poolingCount is an existing connection, notEmptyWaitThreadCount is because the connection is insufficient for the number of threads waiting to create the connection
                if (poolingCount >= notEmptyWaitThreadCount //&& (! (keepAlive && activeCount + poolingCount < minIdle)) && ! isFailContinuous() ) { empty.await(); }// Prevent the creation of more connections than maxActive
                if (activeCount + poolingCount >= maxActive) {
                    continue; }}}catch (InterruptedException e) {
            lastCreateError = e;
            lastErrorTimeMillis = System.currentTimeMillis();

            if((! closing) && (! closed)) { LOG.error("create connection Thread Interrupted, url: " + jdbcUrl, e);
        } finally {
        // The connection can be created when the conditions are met
        PhysicalConnectionInfo connection = null;

        try {
            connection = createPhysicalConnection();
        } catch (SQLException e) {
            LOG.error("create connection SQLException, url: " + jdbcUrl + ", errorCode " + e.getErrorCode()
                      + ", state " + e.getSQLState(), e);

            if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) {
                // fail over retry attempts
                if (failFast) {
                    try {
                    } finally{ lock.unlock(); }}if (breakAfterAcquireFailure) {

                try {
                } catch (InterruptedException interruptEx) {
                    break; }}}catch (RuntimeException e) {
            LOG.error("create connection RuntimeException", e);
        } catch (Error e) {
            LOG.error("create connection Error", e);

        if (connection == null) {

        boolean result = put(connection);
        if(! result) { JdbcUtils.close(connection.getPhysicalConnection());"put physical connection to pool failed.");

        errorCount = 0; // reset errorCount

        if (closing || closed) {
            break; }}}Copy the code
DestroyConnectionThread’s run method
public void run(a) {

    for (;;) {
        // Delete from the front
        try {
            if (closed || closing) {

            if (timeBetweenEvictionRunsMillis > 0) {
            } else {
                Thread.sleep(1000); //

            if (Thread.interrupted()) {

        } catch (InterruptedException e) {
            break; }}}Copy the code

3.2.2 getConnectionDirect ()

After the init() method, you get the connection directly

  public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException {
        int notFullTimeoutRetryCnt = 0;
        for (;;) {
            // handle notFullTimeoutRetry
            DruidPooledConnection poolableConnection;
            try {
                // Get the connection
                poolableConnection = getConnectionInternal(maxWaitMillis);
            } catch (GetConnectionTimeoutException ex) {
                if (notFullTimeoutRetryCnt <= this.notFullTimeoutRetryCount && ! isFull()) { notFullTimeoutRetryCnt++;if (LOG.isWarnEnabled()) {
                        LOG.warn("get connection timeout retry : " + notFullTimeoutRetryCnt);
                throw ex;
            // If testOnBorrow is set to true. ValidConnectionChecker or validationQuery is checked to see if the connection is available when it is fetched.
            if (testOnBorrow) {
                boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);
                if(! validate) {if (LOG.isDebugEnabled()) {
                        LOG.debug("skip not validate connection.");

                    continue; }}else {
                if (poolableConnection.conn.isClosed()) {
                    discardConnection(poolableConnection.holder); // Pass null to avoid repeated closing

                if (testWhileIdle) {
                    final DruidConnectionHolder holder = poolableConnection.holder;
                    long currentTimeMillis             = System.currentTimeMillis();
                    long lastActiveTimeMillis          = holder.lastActiveTimeMillis;
                    long lastExecTimeMillis            = holder.lastExecTimeMillis;
                    long lastKeepTimeMillis            = holder.lastKeepTimeMillis;

                    if(checkExecuteTime && lastExecTimeMillis ! = lastActiveTimeMillis) { lastActiveTimeMillis = lastExecTimeMillis; }if (lastKeepTimeMillis > lastActiveTimeMillis) {
                        lastActiveTimeMillis = lastKeepTimeMillis;

                    long idleMillis                    = currentTimeMillis - lastActiveTimeMillis;

                    long timeBetweenEvictionRunsMillis = this.timeBetweenEvictionRunsMillis;

                    if (timeBetweenEvictionRunsMillis <= 0) {
                        timeBetweenEvictionRunsMillis = DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS;

                    if (idleMillis >= timeBetweenEvictionRunsMillis
                            || idleMillis < 0 // unexcepted branch
                            ) {
                        boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);
                        if(! validate) {if (LOG.isDebugEnabled()) {
                                LOG.debug("skip not validate connection.");

                             continue; }}}}if (removeAbandoned) {
                StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
                poolableConnection.connectStackTrace = stackTrace;
                poolableConnection.traceEnable = true;

                try {
                    activeConnections.put(poolableConnection, PRESENT);
                } finally{ activeConnectionLock.unlock(); }}if (!this.defaultAutoCommit) {

            returnpoolableConnection; }}Copy the code

The maxWait argument is passed when the **getConnectionInternal(Long maxWait)** method is called to get the connection. Gets the maximum waiting time for a connection from the connection pool, in ms, default -1, which means it will wait forever

if (maxWait > 0) {
    holder = pollLast(nanos);
} else {
    holder = takeLast();
Copy the code

Here is the pollLast method:

The Condition’s awaitNanos(Long nanosTimeout) method is used to block the thread. If there are no connections in the pool, call empty.Signal (), notify CreateThread to create the connection, and wait a specified amount of time before waking up to see if any are available.

In addition, when the wait time is up, null is returned instead of waiting forever

if (poolingCount == 0) {
    if (estimate > 0) {

    waitNanosLocal.set(nanos - estimate);
    return null;
Copy the code
private DruidConnectionHolder pollLast(long nanos) throws InterruptedException, SQLException { long estimate = nanos; for (;;) { if (poolingCount == 0) { emptySignal(); // send signal to CreateThread create connection if (failFast && isFailContinuous()) { throw new DataSourceNotAvailableException(createError); } if (estimate <= 0) { waitNanosLocal.set(nanos - estimate); return null; } notEmptyWaitThreadCount++; if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) { notEmptyWaitThreadPeak = notEmptyWaitThreadCount; } try { long startEstimate = estimate; estimate = notEmpty.awaitNanos(estimate); // signal by // recycle or // creator notEmptyWaitCount++; notEmptyWaitNanos += (startEstimate - estimate); if (! enable) { connectErrorCountUpdater.incrementAndGet(this); if (disableException ! = null) { throw disableException; } throw new DataSourceDisableException(); } } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread notEmptySignalCount++; throw ie; } finally { notEmptyWaitThreadCount--; } if (poolingCount == 0) { if (estimate > 0) { continue; } waitNanosLocal.set(nanos - estimate); return null; } } decrementPoolingCount(); DruidConnectionHolder last = connections[poolingCount]; connections[poolingCount] = null; long waitNanos = nanos - estimate; last.setLastNotEmptyWaitNanos(waitNanos); return last; }}Copy the code

Here is the takeLast() method:

You can see that takeLast calls the notempty.await () method, and it keeps calling.

MaxWait by default does not timeout, meaning that if there are no idle connections in the pool, it will wait forever

DruidConnectionHolder takeLast(a) throws InterruptedException, SQLException {
    try {
        while (poolingCount == 0) {
            emptySignal(); // send signal to CreateThread create connection

            if (failFast && isFailContinuous()) {
                throw new DataSourceNotAvailableException(createError);

            if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) {
                notEmptyWaitThreadPeak = notEmptyWaitThreadCount;
            try {
                notEmpty.await(); // signal by recycle or creator
            } finally {

            if(! enable) { connectErrorCountUpdater.incrementAndGet(this);
                if(disableException ! =null) {
                    throw disableException;

                throw newDataSourceDisableException(); }}}catch (InterruptedException ie) {
        notEmpty.signal(); // propagate to non-interrupted thread
        throw ie;

    DruidConnectionHolder last = connections[poolingCount];
    connections[poolingCount] = null;

    return last;
Copy the code

TestOnBorrow, testWhileIdle, and removeAbandoned parameters

  • If testOnBorrow is true, the connection is verified, and if it fails, the loop is cleaned up and re-entered, otherwise the next step is skipped
  • If testWhileIdle to true, distance than timeBetweenEvictionRunsMillis activation time, last time to clean up
  • If removeAbandoned is true, the connection is stored in activeConnections and periodically processed by the cleanup thread

CreateThread and the consumer thread that fetches connections to the thread pool form a consumer-producer model

3.2.3Connection.close() Reclaiming the Connection

The close() method of the DruidPooledConnection class is called. The code is as follows:

The recycle method is actually called

public void close(a) throws SQLException {
    if (this.disable) {

    DruidConnectionHolder holder = this.holder;
    if (holder == null) {
        if (dupCloseLogEnable) {
            LOG.error("dup close");

    DruidAbstractDataSource dataSource = holder.getDataSource();
    boolean isSameThread = this.getOwnerThread() == Thread.currentThread();

    if(! isSameThread) { dataSource.setAsyncCloseConnectionEnable(true);

    if (dataSource.isAsyncCloseConnectionEnable()) {

    if(! CLOSING_UPDATER.compareAndSet(this.0.1)) {

    try {
        for (ConnectionEventListener listener : holder.getConnectionEventListeners()) {
            listener.connectionClosed(new ConnectionEvent(this));

        List<Filter> filters = dataSource.getProxyFilters();
        if (filters.size() > 0) {
            FilterChainImpl filterChain = new FilterChainImpl(dataSource);
        } else {
            // The recycle method is calledrecycle(); }}finally {

    this.disable = true;
Copy the code

The recycle() method has omitted some code:

protected void recycle(DruidPooledConnection pooledConnection) throws SQLException {...// If the connection is already closed, increment closeCount by 1 and return
      if (physicalConnection.isClosed()) {
                try {
                    if ( {
               = false;
                } finally {
                return; }...// If testOnReturn is set, testConnectionInternal(DruidConnectionHolder holder, Connection conn) is called to check whether the Connection is valid. Refer to the validateConnection(Connection Conn) method
            if (testOnReturn) {
                boolean validate = testConnectionInternal(holder, physicalConnection);
                if(! validate) { JdbcUtils.close(physicalConnection); destroyCountUpdater.incrementAndGet(this);

                    try {
                        if ( {
                   = false;
                    } finally {
                    return; }}...// Finally get the lock and put the connection back into the connections array
            try {
                if ( {
           = false;

                result = putLast(holder, currentTimeMillis);
            } finally{ lock.unlock(); }}boolean putLast(DruidConnectionHolder e, long lastActiveTimeMillis) {
        if (poolingCount >= maxActive || e.discard || this.closed) {
            return false;

        e.lastActiveTimeMillis = lastActiveTimeMillis;
        connections[poolingCount] = e;

        if (poolingCount > poolingPeak) {
            poolingPeak = poolingCount;
            poolingPeakTime = lastActiveTimeMillis;


        return true;
Copy the code


