This is the sixth day of my participation in the November Gwen Challenge. Check out the event details: The last Gwen Challenge 2021

Introduction to the

In the process of writing database Demo, how to obtain the connection if the maximum number of current connections is less than the current SQL query thread, how to run the specific situation, the following write related test code and debug related source code to explore

1. The related performance when the number of threads exceeds the maximum number of connections

Write a test related code first:

  • Initializes voice related data
  • Set the maximum number of threads to 3 and the lowest number of idle threads to 2, initialized to 0
  • Start 10 query threads and wait for all to end
  • 5 more query threads
package com.alibaba.druid;

import com.alibaba.druid.pool.DruidDataSource;

import java.sql.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class TestDemo1 {

    public static final String DB_URL = "jdbc:h2:file:./demo-db";
    public static final String USER = "sa";
    public static final String PASS = "sa";
    public static final String QUERY = "SELECT id, name FROM user_example";


    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // The first run can initialize the database data, later can cancel
// initData();

        DruidDataSource dataSource = new DruidDataSource();
        dataSource.setInitialSize(0);
        dataSource.setMaxActive(3);
        dataSource.setMinIdle(2);
        dataSource.setDriverClassName("org.h2.Driver");
        dataSource.setUrl(DB_URL);
        dataSource.setUsername(USER);
        dataSource.setPassword(PASS);

        FutureTask[] fs = new FutureTask[10];
        for (int i=0; i<10; i++) {
            fs[i] = new FutureTask(() -> druidQuery(dataSource));
            new Thread(fs[i]).start();
        }

        while (true) {
            for (int i = 0; i < 10; i++) {
                if(! fs[i].isDone()) {continue; }}break;
        }

        long cost = 0;
        for (int i = 0; i < 10; i++) {
            cost += (Long)(fs[i].get());
        }
        System.out.printf("Total cost: %d \n", cost);

        Thread.sleep(3000);
        fs = new FutureTask[5];
        for (int i=0; i<5; i++) {
            fs[i] = new FutureTask(() -> druidQuery(dataSource));
            new Thread(fs[i]).start();
        }

        Thread.sleep(3000);
        System.out.printf("Number of current database connections: %d\n", dataSource.getActiveCount());
    }

    /** * Generate data */
    public static void initData(a) {
        final String drop = "drop table `user_example` if exists;";
        final String createTable = "CREATE TABLE IF NOT EXISTS `user_example` (" +
                "`id` bigint NOT NULL AUTO_INCREMENT, " +
                "`name` varchar(100) NOT NULL" +
                ");";
        final String addUser = "insert into user_example (name) values(%s)";
        try(Connection conn = DriverManager.getConnection(DB_URL, USER, PASS);
            Statement stmt = conn.createStatement()) {
            stmt.execute(drop);
            stmt.execute(createTable);
            for (int i=0; i<10; i++) {
                stmt.execute(String.format(addUser, i));
            }
            conn.commit();
        } catch(SQLException e) { e.printStackTrace(); }}private static long druidQuery(DruidDataSource dataSource) {
        System.out.println("Start executing query");
        final long cur = System.currentTimeMillis();
        try(Connection conn = dataSource.getConnection();
            Statement stmt = conn.createStatement();
            ResultSet rs = stmt.executeQuery(QUERY)) {
            // Extract data from result set
            while (rs.next()) {
                // Retrieve by column name
// System.out.print("ID: " + rs.getInt("id"));
// System.out.print(", name: " + rs.getString("name"));
// System.out.print(";" );
            }
// System.out.println();
            Thread.sleep(1000);
            System.out.printf("Number of current database connections: %d\n", dataSource.getActiveCount());
        } catch (SQLException | InterruptedException e) {
            e.printStackTrace();
        }
        final long cost = System.currentTimeMillis() - cur;
        System.out.println(cost);
        returncost; }}Copy the code

Let’s take a look at the results:

Start Query Start Query Start Query Start Query Start Query Start Query Start Query Start Query Start Query Start Query Start Query Start Query Start Query Current database connections: 3 Current database connections: 3 Number of current database connections: 3 Number of current database connections: 3 Number of current database connections: 3 Number of current database connections: 3 Number of current database connections: 3 Number of current database connections: 3 Number of current database connections: 3 Number of current database connections: 3 Number of current database connections: 3 Number of current database connections: 3 Number of current database connections: 3 Number of current database connections: 3 Start Query. Start Query. Start Query. 3 Number of current database connections: 3 Number of current database connections: 3 Number of current database connections: 3 Number of current database connections: 3Copy the code

In the above result, we see that the number of database connections is always 3, now add their own log source, debugging

Two, add custom log in the source code

Generates physical connection information for printing

In the accompanying analysis article, we saw the code for generating the physical connection as follows, with log printing in the relevant place:

# DruidAbstractDataSource.java
    public Connection createPhysicalConnection(String url, Properties info) throws SQLException {
        Connection conn;
        if (getProxyFilters().size() == 0) {
            conn = getDriver().connect(url, info);
        } else {
            conn = new FilterChainImpl(this).connection_connect(info);
        }

        createCountUpdater.incrementAndGet(this);
        System.out.println("Make a physical connection");

        return conn;
    }
Copy the code

The final code to generate the physical connection is called in one thread: the CreateConnectionThread class in DruidDataSource. Java

After adding the log to the code above, we run the code again and find that the function is only called three times, and indeed the maximum number of connections is 3

So how do the rest of the 10 threads get connections when the maximum number of connections is reached? Debug analysis is next

How to obtain connections when the number of connections reaches the maximum

Going back to the related functions of getConnection, we debug that the connections are all from


    private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException {
                // All the following are from connections
                // What is confusing is that there is no connection operation in createDirect and its related code
                // Check the fetch logic of this block
                if (maxWait > 0) {
                    holder = pollLast(nanos);
                } else {
                    holder = takeLast();
		    System.out.println("get connection by takeLast"); }}Copy the code

If the associated print log is added, the method is called a total of 15 times

Let’s take a closer look:


    DruidConnectionHolder takeLast(a) throws InterruptedException, SQLException {
        try {
	    // If there is no connection available in the current thread pool, the following loop is performed, waiting for the connection to be acquired
            while (poolingCount == 0) {
                emptySignal(); // send signal to CreateThread create connection

                if (failFast && isFailContinuous()) {
                    throw new DataSourceNotAvailableException(createError);
                }

                notEmptyWaitThreadCount++;
                if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) {
                    notEmptyWaitThreadPeak = notEmptyWaitThreadCount;
                }
                try {
                    // This is the key, thread login, wake up, get the connection from the connection array later
                    System.out.println("Thread wait:" + System.currentTimeMillis());
                    notEmpty.await(); // signal by recycle or creator
                } finally {
                    System.out.println(Thread wake up: + System.currentTimeMillis());
                    notEmptyWaitThreadCount--;
                }
                notEmptyWaitCount++;

                if(! enable) { connectErrorCountUpdater.incrementAndGet(this);
                    if(disableException ! =null) {
                        throw disableException;
                    }

                    throw newDataSourceDisableException(); }}}catch (InterruptedException ie) {
            notEmpty.signal(); // propagate to non-interrupted thread
            notEmptySignalCount++;
            throw ie;
        }

        // Get the connection
        decrementPoolingCount();
        DruidConnectionHolder last = connections[poolingCount];
        connections[poolingCount] = null;

        return last;
    }
Copy the code

In the code, see the wait notification mechanism:

  • 1. Returns if the change method is called and there are connections available in the thread pool
  • 2. If there is no thread, the thread will wait and wake up and return to the connection.

You see that there are two conditions for getting a thread. Remember from a couple of previous articles, when we got a connection, there was an init function that would initialize the configured number of initialized connections, and if it did, it would skip the loop

If none is configured or available, a wait state occurs

So where does it wake up the function that gets the connection? Let’s type in the relevant log and move on

How do I wake up takeLast to get connections

When viewing the generated physical connection, we found the following:

    public class CreateConnectionThread extends Thread {...if (failFast) {
                            lock.lock();
                            try {
                                // A wake up operation
                                notEmpty.signalAll();
                                System.out.println("Generate a physical connection and wake up all waits.");
                            } finally{ lock.unlock(); }...if (physicalConnection == null) {
                    continue;
                }

                System.out.println("After the connection is made, put it in");
                boolean result = put(connection);
                if(! result) { JdbcUtils.close(connection.getPhysicalConnection()); LOG.info("put physical connection to pool failed."); }}Copy the code

Put (); put (); put ();

private boolean put(DruidConnectionHolder holder, long createTaskId, boolean checkExists) {
    lock.lock();
    try{... notEmpty.signal(); System.out.println("Connect to fetch, wake up"); notEmptySignalCount++; . }finally {
        lock.unlock();
    }
    return true;
}
Copy the code

Log in the relevant function above

Let’s go ahead and think about what happens when we connect to a close and return a connection to the connection pool, so let’s see if the Recycle function has anything to do with it

Sure enough, we found it: in the previous article, when a connection is closed, it is returned to the connection pool as follows


    boolean putLast(DruidConnectionHolder e, long lastActiveTimeMillis) {
        if (poolingCount >= maxActive || e.discard || this.closed) {
            return false;
        }

        e.lastActiveTimeMillis = lastActiveTimeMillis;
        connections[poolingCount] = e;
        incrementPoolingCount();

        if (poolingCount > poolingPeak) {
            poolingPeak = poolingCount;
            poolingPeakTime = lastActiveTimeMillis;
        }

        notEmpty.signal();
        System.out.println("PutLast, wake up");
        notEmptySignalCount++;

        return true;
    }
Copy the code

Type in the relevant journal

That feels like it. Let’s run the mode

The results

The logs and analysis are as follows:

// Ten threads wait for the query to start. // The physical connection generation thread starts Ten query createAndStartCreatorThread / / thread has reached the awaited state get connection by takeLast start threads waiting for: 1637126886214 GET connection by takeLast start Thread waiting: 1637126886214 GET connection by takeLast start Thread waiting: 1637126886214 GET connection by takeLast start Thread waiting: 1637126886214 GET connection by takeLast start Thread waiting: 1637126886214 get connection by takeLast start Thread wait: 1637126886215 1637126886450 Get connection by takeLast end get connection by takeLast end 1637126886450 Get connection by takeLast end get connection by takeLast end 1637126886450 get connection by takeLast end 1637126886450 get connection by takeLast end 1637126887486 Get Connection by takeLast end 1637126887486 Get connection by takeLast end putLast, wake up the thread: 1637126887488 Get Connection by takeLast end 3 Number of current database connections: 3 putLast: wake up the thread: 1637126888488 Get Connection by takeLast end putLast: Wake up the thread: 1637126888488 Get Connection by takeLast End Number of current database connections: 3......Copy the code

conclusion

This time prepared an example, and printed the relevant log in the source code to verify the reading of the source code, the database connection pool connection basic use process is as follows:

  1. If the initial connection number is configured, the physical connection is generated in advance
  2. When initializing, start the physical connection generation thread (whose maximum number of spawns is not greater than the configured maximum number)
  3. If there is a connection, the thread will get it directly. If there is no connection, the thread will block and wait to get it
    1. A physical connection is generated and wakes up when it is put into the connection pool
    2. Wakes up when the connection is returned to the thread