This is the sixth day of my participation in the November Gwen Challenge. Check out the event details: The last Gwen Challenge 2021
Introduction to the
In the process of writing database Demo, how to obtain the connection if the maximum number of current connections is less than the current SQL query thread, how to run the specific situation, the following write related test code and debug related source code to explore
1. The related performance when the number of threads exceeds the maximum number of connections
Write a test related code first:
- Initializes voice related data
- Set the maximum number of threads to 3 and the lowest number of idle threads to 2, initialized to 0
- Start 10 query threads and wait for all to end
- 5 more query threads
package com.alibaba.druid;
import com.alibaba.druid.pool.DruidDataSource;
import java.sql.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class TestDemo1 {
public static final String DB_URL = "jdbc:h2:file:./demo-db";
public static final String USER = "sa";
public static final String PASS = "sa";
public static final String QUERY = "SELECT id, name FROM user_example";
public static void main(String[] args) throws InterruptedException, ExecutionException {
// The first run can initialize the database data, later can cancel
// initData();
DruidDataSource dataSource = new DruidDataSource();
dataSource.setInitialSize(0);
dataSource.setMaxActive(3);
dataSource.setMinIdle(2);
dataSource.setDriverClassName("org.h2.Driver");
dataSource.setUrl(DB_URL);
dataSource.setUsername(USER);
dataSource.setPassword(PASS);
FutureTask[] fs = new FutureTask[10];
for (int i=0; i<10; i++) {
fs[i] = new FutureTask(() -> druidQuery(dataSource));
new Thread(fs[i]).start();
}
while (true) {
for (int i = 0; i < 10; i++) {
if(! fs[i].isDone()) {continue; }}break;
}
long cost = 0;
for (int i = 0; i < 10; i++) {
cost += (Long)(fs[i].get());
}
System.out.printf("Total cost: %d \n", cost);
Thread.sleep(3000);
fs = new FutureTask[5];
for (int i=0; i<5; i++) {
fs[i] = new FutureTask(() -> druidQuery(dataSource));
new Thread(fs[i]).start();
}
Thread.sleep(3000);
System.out.printf("Number of current database connections: %d\n", dataSource.getActiveCount());
}
/** * Generate data */
public static void initData(a) {
final String drop = "drop table `user_example` if exists;";
final String createTable = "CREATE TABLE IF NOT EXISTS `user_example` (" +
"`id` bigint NOT NULL AUTO_INCREMENT, " +
"`name` varchar(100) NOT NULL" +
");";
final String addUser = "insert into user_example (name) values(%s)";
try(Connection conn = DriverManager.getConnection(DB_URL, USER, PASS);
Statement stmt = conn.createStatement()) {
stmt.execute(drop);
stmt.execute(createTable);
for (int i=0; i<10; i++) {
stmt.execute(String.format(addUser, i));
}
conn.commit();
} catch(SQLException e) { e.printStackTrace(); }}private static long druidQuery(DruidDataSource dataSource) {
System.out.println("Start executing query");
final long cur = System.currentTimeMillis();
try(Connection conn = dataSource.getConnection();
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(QUERY)) {
// Extract data from result set
while (rs.next()) {
// Retrieve by column name
// System.out.print("ID: " + rs.getInt("id"));
// System.out.print(", name: " + rs.getString("name"));
// System.out.print(";" );
}
// System.out.println();
Thread.sleep(1000);
System.out.printf("Number of current database connections: %d\n", dataSource.getActiveCount());
} catch (SQLException | InterruptedException e) {
e.printStackTrace();
}
final long cost = System.currentTimeMillis() - cur;
System.out.println(cost);
returncost; }}Copy the code
Let’s take a look at the results:
Start Query Start Query Start Query Start Query Start Query Start Query Start Query Start Query Start Query Start Query Start Query Start Query Start Query Current database connections: 3 Current database connections: 3 Number of current database connections: 3 Number of current database connections: 3 Number of current database connections: 3 Number of current database connections: 3 Number of current database connections: 3 Number of current database connections: 3 Number of current database connections: 3 Number of current database connections: 3 Number of current database connections: 3 Number of current database connections: 3 Number of current database connections: 3 Number of current database connections: 3 Start Query. Start Query. Start Query. 3 Number of current database connections: 3 Number of current database connections: 3 Number of current database connections: 3 Number of current database connections: 3Copy the code
In the above result, we see that the number of database connections is always 3, now add their own log source, debugging
Two, add custom log in the source code
Generates physical connection information for printing
In the accompanying analysis article, we saw the code for generating the physical connection as follows, with log printing in the relevant place:
# DruidAbstractDataSource.java
public Connection createPhysicalConnection(String url, Properties info) throws SQLException {
Connection conn;
if (getProxyFilters().size() == 0) {
conn = getDriver().connect(url, info);
} else {
conn = new FilterChainImpl(this).connection_connect(info);
}
createCountUpdater.incrementAndGet(this);
System.out.println("Make a physical connection");
return conn;
}
Copy the code
The final code to generate the physical connection is called in one thread: the CreateConnectionThread class in DruidDataSource. Java
After adding the log to the code above, we run the code again and find that the function is only called three times, and indeed the maximum number of connections is 3
So how do the rest of the 10 threads get connections when the maximum number of connections is reached? Debug analysis is next
How to obtain connections when the number of connections reaches the maximum
Going back to the related functions of getConnection, we debug that the connections are all from
private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException {
// All the following are from connections
// What is confusing is that there is no connection operation in createDirect and its related code
// Check the fetch logic of this block
if (maxWait > 0) {
holder = pollLast(nanos);
} else {
holder = takeLast();
System.out.println("get connection by takeLast"); }}Copy the code
If the associated print log is added, the method is called a total of 15 times
Let’s take a closer look:
DruidConnectionHolder takeLast(a) throws InterruptedException, SQLException {
try {
// If there is no connection available in the current thread pool, the following loop is performed, waiting for the connection to be acquired
while (poolingCount == 0) {
emptySignal(); // send signal to CreateThread create connection
if (failFast && isFailContinuous()) {
throw new DataSourceNotAvailableException(createError);
}
notEmptyWaitThreadCount++;
if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) {
notEmptyWaitThreadPeak = notEmptyWaitThreadCount;
}
try {
// This is the key, thread login, wake up, get the connection from the connection array later
System.out.println("Thread wait:" + System.currentTimeMillis());
notEmpty.await(); // signal by recycle or creator
} finally {
System.out.println(Thread wake up: + System.currentTimeMillis());
notEmptyWaitThreadCount--;
}
notEmptyWaitCount++;
if(! enable) { connectErrorCountUpdater.incrementAndGet(this);
if(disableException ! =null) {
throw disableException;
}
throw newDataSourceDisableException(); }}}catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
notEmptySignalCount++;
throw ie;
}
// Get the connection
decrementPoolingCount();
DruidConnectionHolder last = connections[poolingCount];
connections[poolingCount] = null;
return last;
}
Copy the code
In the code, see the wait notification mechanism:
- 1. Returns if the change method is called and there are connections available in the thread pool
- 2. If there is no thread, the thread will wait and wake up and return to the connection.
You see that there are two conditions for getting a thread. Remember from a couple of previous articles, when we got a connection, there was an init function that would initialize the configured number of initialized connections, and if it did, it would skip the loop
If none is configured or available, a wait state occurs
So where does it wake up the function that gets the connection? Let’s type in the relevant log and move on
How do I wake up takeLast to get connections
When viewing the generated physical connection, we found the following:
public class CreateConnectionThread extends Thread {...if (failFast) {
lock.lock();
try {
// A wake up operation
notEmpty.signalAll();
System.out.println("Generate a physical connection and wake up all waits.");
} finally{ lock.unlock(); }...if (physicalConnection == null) {
continue;
}
System.out.println("After the connection is made, put it in");
boolean result = put(connection);
if(! result) { JdbcUtils.close(connection.getPhysicalConnection()); LOG.info("put physical connection to pool failed."); }}Copy the code
Put (); put (); put ();
private boolean put(DruidConnectionHolder holder, long createTaskId, boolean checkExists) {
lock.lock();
try{... notEmpty.signal(); System.out.println("Connect to fetch, wake up"); notEmptySignalCount++; . }finally {
lock.unlock();
}
return true;
}
Copy the code
Log in the relevant function above
Let’s go ahead and think about what happens when we connect to a close and return a connection to the connection pool, so let’s see if the Recycle function has anything to do with it
Sure enough, we found it: in the previous article, when a connection is closed, it is returned to the connection pool as follows
boolean putLast(DruidConnectionHolder e, long lastActiveTimeMillis) {
if (poolingCount >= maxActive || e.discard || this.closed) {
return false;
}
e.lastActiveTimeMillis = lastActiveTimeMillis;
connections[poolingCount] = e;
incrementPoolingCount();
if (poolingCount > poolingPeak) {
poolingPeak = poolingCount;
poolingPeakTime = lastActiveTimeMillis;
}
notEmpty.signal();
System.out.println("PutLast, wake up");
notEmptySignalCount++;
return true;
}
Copy the code
Type in the relevant journal
That feels like it. Let’s run the mode
The results
The logs and analysis are as follows:
// Ten threads wait for the query to start. // The physical connection generation thread starts Ten query createAndStartCreatorThread / / thread has reached the awaited state get connection by takeLast start threads waiting for: 1637126886214 GET connection by takeLast start Thread waiting: 1637126886214 GET connection by takeLast start Thread waiting: 1637126886214 GET connection by takeLast start Thread waiting: 1637126886214 GET connection by takeLast start Thread waiting: 1637126886214 get connection by takeLast start Thread wait: 1637126886215 1637126886450 Get connection by takeLast end get connection by takeLast end 1637126886450 Get connection by takeLast end get connection by takeLast end 1637126886450 get connection by takeLast end 1637126886450 get connection by takeLast end 1637126887486 Get Connection by takeLast end 1637126887486 Get connection by takeLast end putLast, wake up the thread: 1637126887488 Get Connection by takeLast end 3 Number of current database connections: 3 putLast: wake up the thread: 1637126888488 Get Connection by takeLast end putLast: Wake up the thread: 1637126888488 Get Connection by takeLast End Number of current database connections: 3......Copy the code
conclusion
This time prepared an example, and printed the relevant log in the source code to verify the reading of the source code, the database connection pool connection basic use process is as follows:
- If the initial connection number is configured, the physical connection is generated in advance
- When initializing, start the physical connection generation thread (whose maximum number of spawns is not greater than the configured maximum number)
- If there is a connection, the thread will get it directly. If there is no connection, the thread will block and wait to get it
- A physical connection is generated and wakes up when it is put into the connection pool
- Wakes up when the connection is returned to the thread