Introduction:
Flink + Mybatis +druid is used in the project for persistence, but the current use method is very crude: each operator instance has an sqlSessionFactory, which actually causes a large waste of resources. SqlSessionFactory initSqlSessionFactory(Set mapperXmls, DataSource DataSource). In fact, each instance of flink operator (each instance of parallelism operator) has a connection pool. When processing data, they take connections from their own connection pool and do not share the connections created by this connection pool, so the utilization rate is very low.
I decided to take the opportunity to take a closer look at druid and think about how to optimize the current use of Flink.
Druid + H2
In this article, we will introduce how to use Druid connection pool to implement simple operation of the H2 database. Test1 () uses druid connection pools to insert into the database, and test2() uses native JDBC for inserts. You can see there’s not a lot of difference in general usage. Initialize the password connection, etc. 2) get the connection, and 3) get the prepareStatement using the connection. 4) Execute statement. 5) Close statement. 6) Close connection
public class TestDruidDataSourceH2DB {
public static String insertSql = "INSERT INTO TEST VALUES(\'%s\', 'World')";
public static void test1(a) throws SQLException {
DruidDataSource dataSource = new DruidDataSource();
String pwd = "";
String user = "sa";
String url = "jdbc:h2:tcp://localhost/~/test";
String driverName = "org.h2.Driver";
// Configure the basic four database items, create connections, initialize the connection pool container
dataSource.setDriverClassName(driverName);
dataSource.setUrl(url);
dataSource.setUsername(user);
dataSource.setPassword(pwd);
// Specify the number of initializations
dataSource.setInitialSize(5);
// Specify the maximum number
dataSource.setMaxActive(100);
// Specify the number of peak periods
dataSource.setMinIdle(20);
// Specify the wait time
dataSource.setMaxWait(3000);
// Get the connection object from the connection pool
Connection connection = dataSource.getConnection();
System.out.println(connection);
insert(connection);
// Return to connection pool
connection.close(); // This object is a subclass provided by Druid, which implements an enhancement to the close method. Instead of destroying the object, it is returned to the connection pool
}
public static void insert(Connection connection) throws SQLException {
String id = "id";
String id_numeric = String.valueOf(System.currentTimeMillis());
System.out.println("description :" + id_numeric);
String provinceId = "provinceId";
String cityName = "cityName";
String insert = String.format(insertSql, id_numeric);
/ / for the statement
PreparedStatement preparedStatement = connection.prepareStatement(insert);
preparedStatement.execute();
preparedStatement.close();
}
public static void test2(a) {
Connection conn = null;
Statement stmt = null;
String pwd = "";
String user = "sa";
String url = "jdbc:h2:tcp://localhost/~/test";
String driverName = "org.h2.Driver";
try {
Class.forName(driverName);
conn = DriverManager.getConnection(url, user, pwd);
insert(conn);
conn.close();
} catch(Exception exception) { exception.printStackTrace(); }}public static void main(String[] args) throws SQLException { test1(); }}Copy the code
DruidDataSource’s main methods and properties
Druid datasource the Druid datasource core logic is in the DruidDataSource class. As you can see from the above two examples, most of the code is the same, starting with the way the connection is retrieved. Druid: connect to Druid
Connection connection = dataSource.getConnection();
Copy the code
The DruidDataSource main class inheritance structure with its member variables and methods is shown below.
Class or attribute name | describe |
---|---|
ExceptionSorter | Used to determine whether an SQLException object is a fatal exception |
ValidConnectionChecker | Verifies that the specified connection object is valid |
CreateConnectionThread | DruidDataSource inner class for asynchronously creating connection objects |
notEmpty | When notempty.await () is called, the current thread enters wait; When a connection is created or reclaimed, notempty.signal () is called and the waiting thread is woken up. |
empty | When empty.await() is called, the CreateConnectionThread enters the wait; When empty.Signal () is called, the CreateConnectionThread is woken up and the connection is created; |
DestroyConnectionThread | DruidDataSource inner classes for asynchronous test connection objects, including phyTimeoutMillis, minEvictableIdleTimeMillis check idle connections, And check out the connection removeAbandonedTimeoutMillis |
LogStatsThread | DruidDataSource’s inner class for logging statistics asynchronously |
connections | Used to hold all connection objects |
evictConnections | Used to store connection objects to be discarded |
keepAliveConnections | Used to store connection objects that need keepAlive |
activeConnections | Used to hold connection objects that need to be removeAbandoned |
poolingCount | The number of free connection objects |
activeCount | The number of connection objects lent |
The DruidDatasource class works as follows:
Druid: connect to SQL server
Here’s a rundown of how the DruidDataSource works for the above test examples
public static void test1(a) throws SQLException {
DruidDataSource dataSource = new DruidDataSource();
String pwd = "";
String user = "sa";
String url = "jdbc:h2:tcp://localhost/~/test";
String driverName = "org.h2.Driver";
// Configure the basic four database items, create connections, initialize the connection pool container
dataSource.setDriverClassName(driverName);
dataSource.setUrl(url);
dataSource.setUsername(user);
dataSource.setPassword(pwd);
// Specify the number of initializations
//dataSource.setInitialSize(0);
// Specify the maximum number
dataSource.setMaxActive(100);
// Specify the number of peak periods
dataSource.setMinIdle(20);
// Specify the wait time
dataSource.setMaxWait(3000);
// Set the number of initial connections
dataSource.setInitialSize(2);
// Get the connection object from the connection pool
Connection connection = dataSource.getConnection();
System.out.println(connection);
insert(connection);
// Return to connection pool
connection.close(); // This object is a subclass provided by Druid, which implements an enhancement to the close method. Instead of destroying the object, it is returned to the connection pool
}
Copy the code
It’s just a little bit more initialization. The role of some important parameters will be added later.
3.1 DruidDataSource constructor
// Create a DruidDataSource instance
DruidDataSource dataSource = new DruidDataSource();
Copy the code
//DruidDataSource constructor
public DruidDataSource(a){
this(false);
}
public DruidDataSource(boolean fairLock){
super(fairLock);
// Initialize some variables from the System configuration
configFromPropety(System.getProperties());
}
Copy the code
super(fairLock) calls the constructor of the parent classpublic DruidAbstractDataSource(boolean lockFair){
/ / initialization
lock = new ReentrantLock(lockFair);
// Initialize two Condition classes. Use to synchronize CreateConnectionThread and DestroyConnectionThread
notEmpty = lock.newCondition();
empty = lock.newCondition();
}
Copy the code
3.2 Obtaining connection objects from the connection pool
// Get the connection object from the connection pool
Connection connection = dataSource.getConnection();
Copy the code
@Override
public DruidPooledConnection getConnection(a) throws SQLException {
return getConnection(maxWait);
}
@Override
public DruidPooledConnection getConnection(a) throws SQLException {
return getConnection(maxWait);
}
public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {
init();
if (filters.size() > 0) {
FilterChainImpl filterChain = new FilterChainImpl(this);
return filterChain.dataSource_connect(this, maxWaitMillis);
} else {
returngetConnectionDirect(maxWaitMillis); }}Copy the code
3.2.1 the init () method
public void init(a) throws SQLException {
if (inited) {
return;
}
// Register the driver
DruidDriver.getInstance();
/ / acquiring a lock
final ReentrantLock lock = this.lock;
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
throw new SQLException("interrupt", e);
}
boolean init = false;
try {
if (inited) {
return;
}
initStackTrace = Utils.toString(Thread.currentThread().getStackTrace());
// AtomicLongfield dupdater is used to update atoms, ensuring safe writing and efficient reading
this.id = DruidDriver.createDataSourceId();
if (this.id > 1) {
long delta = (this.id - 1) * 100000;
this.connectionIdSeedUpdater.addAndGet(this, delta);
this.statementIdSeedUpdater.addAndGet(this, delta);
this.resultSetIdSeedUpdater.addAndGet(this, delta);
this.transactionIdSeedUpdater.addAndGet(this, delta);
}
// Determine the dbType based on the URL prefix
if (this.jdbcUrl ! =null) {
this.jdbcUrl = this.jdbcUrl.trim();
// Druid is a custom url format
// JDBC :wrap-jdbc: starts with driver, name, and JMX
initFromWrapDriverUrl();
}
// Initialize the filter. How to use filter for ????
for (Filter filter : filters) {
filter.init(this);
}
if (this.dbTypeName == null || this.dbTypeName.length() == 0) {
this.dbTypeName = JdbcUtils.getDbType(jdbcUrl, null);
}
DbType dbType = DbType.of(this.dbTypeName);
if (dbType == DbType.mysql
|| dbType == DbType.mariadb
|| dbType == DbType.oceanbase
|| dbType == DbType.ads) {
boolean cacheServerConfigurationSet = false;
if (this.connectProperties.containsKey("cacheServerConfiguration")) {
cacheServerConfigurationSet = true;
} else if (this.jdbcUrl.indexOf("cacheServerConfiguration") != -1) {
cacheServerConfigurationSet = true;
}
if (cacheServerConfigurationSet) {
this.connectProperties.put("cacheServerConfiguration"."true"); }}// Check parameters
if (maxActive <= 0) {
throw new IllegalArgumentException("illegal maxActive " + maxActive);
}
if (maxActive < minIdle) {
throw new IllegalArgumentException("illegal maxActive " + maxActive);
}
if (getInitialSize() > maxActive) {
throw new IllegalArgumentException("illegal initialSize " + this.initialSize + ", maxActive " + maxActive);
}
if (timeBetweenLogStatsMillis > 0 && useGlobalDataSourceStat) {
throw new IllegalArgumentException("timeBetweenLogStatsMillis not support useGlobalDataSourceStat=true");
}
if (maxEvictableIdleTimeMillis < minEvictableIdleTimeMillis) {
throw new SQLException("maxEvictableIdleTimeMillis must be grater than minEvictableIdleTimeMillis");
}
if (keepAlive && keepAliveBetweenTimeMillis <= timeBetweenEvictionRunsMillis) {
throw new SQLException("keepAliveBetweenTimeMillis must be grater than timeBetweenEvictionRunsMillis");
}
// Check parameters
if (this.driverClass ! =null) {
this.driverClass = driverClass.trim();
}
// Use the SPI mechanism to load the filter. This part of the filter will put autoFilters as well as filters
initFromSPIServiceLoader();
resolveDriver();
initCheck();
initExceptionSorter();
initValidConnectionChecker();
validationQueryCheck();
if (isUseGlobalDataSourceStat()) {
dataSourceStat = JdbcDataSourceStat.getGlobal();
if (dataSourceStat == null) {
dataSourceStat = new JdbcDataSourceStat("Global"."Global".this.dbTypeName);
JdbcDataSourceStat.setGlobal(dataSourceStat);
}
if (dataSourceStat.getDbType() == null) {
dataSourceStat.setDbType(this.dbTypeName); }}else {
// To monitor SQL. But there are risks of memory leak reference: https://segmentfault.com/a/1190000021636834
dataSourceStat = new JdbcDataSourceStat(this.name, this.jdbcUrl, this.dbTypeName, this.connectProperties);
}
dataSourceStat.setResetStatEnable(this.resetStatEnable);
connections = new DruidConnectionHolder[maxActive];
evictConnections = new DruidConnectionHolder[maxActive];
keepAliveConnections = new DruidConnectionHolder[maxActive];
SQLException connectError = null;
if(createScheduler ! =null && asyncInit) {
for (int i = 0; i < initialSize; ++i) {
submitCreateTask(true); }}else if(! asyncInit) {// init connections
while (poolingCount < initialSize) {
try {
PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection();
DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo);
connections[poolingCount++] = holder;
} catch (SQLException ex) {
LOG.error("init datasource error, url: " + this.getUrl(), ex);
if (initExceptionThrow) {
connectError = ex;
break;
} else {
Thread.sleep(3000); }}}if (poolingCount > 0) {
poolingPeak = poolingCount;
poolingPeakTime = System.currentTimeMillis();
}
}
createAndLogThread();
createAndStartCreatorThread();
createAndStartDestroyThread();
initedLatch.await();
init = true;
initedTime = new Date();
registerMbean();
if(connectError ! =null && poolingCount == 0) {
throw connectError;
}
if (keepAlive) {
// async fill to minIdle
if(createScheduler ! =null) {
for (int i = 0; i < minIdle; ++i) {
submitCreateTask(true); }}else {
this.emptySignal(); }}}catch (SQLException e) {
LOG.error("{dataSource-" + this.getID() + "} init error", e);
throw e;
} catch (InterruptedException e) {
throw new SQLException(e.getMessage(), e);
} catch (RuntimeException e){
LOG.error("{dataSource-" + this.getID() + "} init error", e);
throw e;
} catch (Error e){
LOG.error("{dataSource-" + this.getID() + "} init error", e);
throw e;
} finally {
inited = true;
lock.unlock();
if (init && LOG.isInfoEnabled()) {
String msg = "{dataSource-" + this.getID();
if (this.name ! =null&&!this.name.isEmpty()) {
msg += ",";
msg += this.name;
}
msg += "} inited"; LOG.info(msg); }}}Copy the code
Initialize the Filter
You can also use the SPI mechanism to load filters. Note that the AutoLoad annotation is also required and that load. Spifilter.skip =false is configured
private void initFromSPIServiceLoader(a) {
if (loadSpifilterSkip) {
return;
}
if (autoFilters == null) {
List<Filter> filters = new ArrayList<Filter>();
ServiceLoader<Filter> autoFilterLoader = ServiceLoader.load(Filter.class);
for (Filter filter : autoFilterLoader) {
AutoLoad autoLoad = filter.getClass().getAnnotation(AutoLoad.class);
if(autoLoad ! =null && autoLoad.value()) {
filters.add(filter);
}
}
autoFilters = filters;
}
for (Filter filter : autoFilters) {
if (LOG.isInfoEnabled()) {
LOG.info("load filter from spi :"+ filter.getClass().getName()); } addFilter(filter); }}Copy the code
The Filter is loaded by reflection
private S nextService(a) {
if(! hasNextService())throw new NoSuchElementException();
String cn = nextName;
nextName = null; Class<? > c =null;
try {
/ / load the classes
c = Class.forName(cn, false, loader);
} catch (ClassNotFoundException x) {
fail(service,
"Provider " + cn + " not found");
}
if(! service.isAssignableFrom(c)) { fail(service,"Provider " + cn + " not a subtype");
}
try {
/ / instantiate
S p = service.cast(c.newInstance());
providers.put(cn, p);
return p;
} catch (Throwable x) {
fail(service,
"Provider " + cn + " could not be instantiated",
x);
}
throw new Error(); // This cannot happen
}
Copy the code
Initializes the Driver
DriverClass load of reflection configuration, and invoke the DriverManager. RegisterDriver register (driver).
protected void resolveDriver(a) throws SQLException {
if (this.driver == null) {
if (this.driverClass == null || this.driverClass.isEmpty()) {
this.driverClass = JdbcUtils.getDriverClassName(this.jdbcUrl);
}
if (MockDriver.class.getName().equals(driverClass)) {
driver = MockDriver.instance;
} else if ("com.alibaba.druid.support.clickhouse.BalancedClickhouseDriver".equals(driverClass)) {
Properties info = new Properties();
info.put("user", username);
info.put("password", password);
info.putAll(connectProperties);
driver = new BalancedClickhouseDriver(jdbcUrl, info);
} else {
if (jdbcUrl == null && (driverClass == null || driverClass.length() == 0)) {
throw new SQLException("url not set"); } driver = JdbcUtils.createDriver(driverClassLoader, driverClass); }}else {
if (this.driverClass == null) {
this.driverClass = driver.getClass().getName(); }}}public static Driver createDriver(ClassLoader classLoader, String driverClassName) throws SQLException { Class<? > clazz =null;
if(classLoader ! =null) {
try {
clazz = classLoader.loadClass(driverClassName);
} catch (ClassNotFoundException e) {
// skip}}if (clazz == null) {
try {
ClassLoader contextLoader = Thread.currentThread().getContextClassLoader();
if(contextLoader ! =null) { clazz = contextLoader.loadClass(driverClassName); }}catch (ClassNotFoundException e) {
// skip}}if (clazz == null) {
try {
// Load the configured Driver Class
clazz = Class.forName(driverClassName);
} catch (ClassNotFoundException e) {
throw newSQLException(e.getMessage(), e); }}try {
/ / instantiate
return (Driver) clazz.newInstance();
} catch (IllegalAccessException e) {
throw new SQLException(e.getMessage(), e);
} catch (InstantiationException e) {
throw newSQLException(e.getMessage(), e); }}Copy the code
Calibration parameters
// Check maxActive, minIdle, initialSize, timeBetweenLogStatsMillis, useGlobalDataSourceStat, maxEvictableIdleTimeMillis, minEvictabl Check whether configurations such as eIdleTimeMillis are valid
/ /......
// For Oracle and DB2, validationQuery needs to be verified
initCheck();
/ / when opened testOnBorrow/testOnReturn/testWhileIdle, determine whether to set the validationQuery, no words will print error messages
validationQueryCheck();
Copy the code
Initialize ExceptionSorter, ValidConnectionChecker, and JdbcDataSourceStat
Instantiate different Exceptionsorters based on different data drivers. The ExceptionSorter interface is shown below, from which you can see that this class is used to determine whether an exception is fatal"Fatal"abnormalpublic interface ExceptionSorter {
/**
* Returns true or false whether or not the exception is fatal.
*
* @param e the exception
* @return true or false if the exception is fatal.
*/
boolean isExceptionFatal(SQLException e);
void configFromProperties(Properties properties);
}
private void initExceptionSorter(a) {
if (exceptionSorter instanceof NullExceptionSorter) {
if (driver instanceof MockDriver) {
return; }}else if (this.exceptionSorter ! =null) {
return;
}
for(Class<? > driverClass = driver.getClass();;) { String realDriverClassName = driverClass.getName();if (realDriverClassName.equals(JdbcConstants.MYSQL_DRIVER) //
|| realDriverClassName.equals(JdbcConstants.MYSQL_DRIVER_6)) {
this.exceptionSorter = new MySqlExceptionSorter();
this.isMySql = true;
} else if (realDriverClassName.equals(JdbcConstants.ORACLE_DRIVER)
|| realDriverClassName.equals(JdbcConstants.ORACLE_DRIVER2)) {
this.exceptionSorter = new OracleExceptionSorter();
} else if (realDriverClassName.equals(JdbcConstants.OCEANBASE_DRIVER)) { // Write a real TestCase
if (JdbcUtils.OCEANBASE_ORACLE.name().equalsIgnoreCase(dbTypeName)) {
this.exceptionSorter = new OceanBaseOracleExceptionSorter();
} else {
this.exceptionSorter = newMySqlExceptionSorter(); }}else if (realDriverClassName.equals("com.informix.jdbc.IfxDriver")) {
this.exceptionSorter = new InformixExceptionSorter();
} else if (realDriverClassName.equals("com.sybase.jdbc2.jdbc.SybDriver")) {
this.exceptionSorter = new SybaseExceptionSorter();
} else if (realDriverClassName.equals(JdbcConstants.POSTGRESQL_DRIVER)
|| realDriverClassName.equals(JdbcConstants.ENTERPRISEDB_DRIVER)
|| realDriverClassName.equals(JdbcConstants.POLARDB_DRIVER)) {
this.exceptionSorter = new PGExceptionSorter();
} else if (realDriverClassName.equals("com.alibaba.druid.mock.MockDriver")) {
this.exceptionSorter = new MockExceptionSorter();
} else if (realDriverClassName.contains("DB2")) {
this.exceptionSorter = new DB2ExceptionSorter();
} else{ Class<? > superClass = driverClass.getSuperclass();if(superClass ! =null&& superClass ! = Object.class) { driverClass = superClass;continue; }}break; }}Copy the code
// Use different connection checkers for different driver classes. For example, when testOnBorrow is set to true, ValidConnectionChecker is called to check that the connection is valid
private void initValidConnectionChecker(a) {
if (this.validConnectionChecker ! =null) {
return;
}
String realDriverClassName = driver.getClass().getName();
if (JdbcUtils.isMySqlDriver(realDriverClassName)) {
this.validConnectionChecker = new MySqlValidConnectionChecker();
} else if (realDriverClassName.equals(JdbcConstants.ORACLE_DRIVER)
|| realDriverClassName.equals(JdbcConstants.ORACLE_DRIVER2)) {
this.validConnectionChecker = new OracleValidConnectionChecker();
} else if (realDriverClassName.equals(JdbcConstants.SQL_SERVER_DRIVER)
|| realDriverClassName.equals(JdbcConstants.SQL_SERVER_DRIVER_SQLJDBC4)
|| realDriverClassName.equals(JdbcConstants.SQL_SERVER_DRIVER_JTDS)) {
this.validConnectionChecker = new MSSQLValidConnectionChecker();
} else if (realDriverClassName.equals(JdbcConstants.POSTGRESQL_DRIVER)
|| realDriverClassName.equals(JdbcConstants.ENTERPRISEDB_DRIVER)
|| realDriverClassName.equals(JdbcConstants.POLARDB_DRIVER)) {
this.validConnectionChecker = newPGValidConnectionChecker(); }}Copy the code
private void validationQueryCheck(a) {
if(! (testOnBorrow || testOnReturn || testWhileIdle)) {return;
}
if (this.validConnectionChecker ! =null) {
return;
}
if (this.validationQuery ! =null && this.validationQuery.length() > 0) {
return;
}
String errorMessage = "";
if (testOnBorrow) {
errorMessage += "testOnBorrow is true, ";
}
if (testOnReturn) {
errorMessage += "testOnReturn is true, ";
}
if (testWhileIdle) {
errorMessage += "testWhileIdle is true, ";
}
LOG.error(errorMessage + "validationQuery not set");
}
Copy the code
Initialize three connection object encapsulation classes
connections = new DruidConnectionHolder[maxActive];
evictConnections = new DruidConnectionHolder[maxActive];
keepAliveConnections = new DruidConnectionHolder[maxActive];
Copy the code
The DruidConnectionHolder class has the following properties and methods:
Initialize the connection with the number of parameters initialSize
There are two ways to create a vm, synchronous or asynchronous
// init connections
while (poolingCount < initialSize) {
try {
// Create a connection. Encapsulate it with DruidConnectionHolder
PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection();
DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo);
connections[poolingCount++] = holder;
} catch (SQLException ex) {
LOG.error("init datasource error, url: " + this.getUrl(), ex);
if (initExceptionThrow) {
connectError = ex;
break;
} else {
Thread.sleep(3000); }}}Copy the code
The process of creating a connection
Most of the previous code can be ignored for now, mainly the createPhysicalConnection and initPhysicalConnection methods
public PhysicalConnectionInfo createPhysicalConnection(a) throws SQLException {
String url = this.getUrl();
Properties connectProperties = getConnectProperties();
String user;
if(getUserCallback() ! =null) {
user = getUserCallback().getName();
} else {
user = getUsername();
}
String password = getPassword();
PasswordCallback passwordCallback = getPasswordCallback();
if(passwordCallback ! =null) {
if (passwordCallback instanceof DruidPasswordCallback) {
DruidPasswordCallback druidPasswordCallback = (DruidPasswordCallback) passwordCallback;
druidPasswordCallback.setUrl(url);
druidPasswordCallback.setProperties(connectProperties);
}
char[] chars = passwordCallback.getPassword();
if(chars ! =null) {
password = new String(chars);
}
}
Properties physicalConnectProperties = new Properties();
if(connectProperties ! =null) {
physicalConnectProperties.putAll(connectProperties);
}
if(user ! =null&& user.length() ! =0) {
physicalConnectProperties.put("user", user);
}
if(password ! =null&& password.length() ! =0) {
physicalConnectProperties.put("password", password);
}
Connection conn = null;
long connectStartNanos = System.nanoTime();
long connectedNanos, initedNanos, validatedNanos;
Map<String, Object> variables = initVariants
? new HashMap<String, Object>()
: null;
Map<String, Object> globalVariables = initGlobalVariants
? new HashMap<String, Object>()
: null;
createStartNanosUpdater.set(this, connectStartNanos);
creatingCountUpdater.incrementAndGet(this);
try {
// Start calling the driver here. Refer to the introduction below
conn = createPhysicalConnection(url, physicalConnectProperties);
connectedNanos = System.nanoTime();
if (conn == null) {
throw new SQLException("connect error, url " + url + ", driverClass " + this.driverClass);
}
initPhysicalConnection(conn, variables, globalVariables);
initedNanos = System.nanoTime();
validateConnection(conn);
validatedNanos = System.nanoTime();
setFailContinuous(false);
setCreateError(null);
} catch (SQLException ex) {
setCreateError(ex);
JdbcUtils.close(conn);
throw ex;
} catch (RuntimeException ex) {
setCreateError(ex);
JdbcUtils.close(conn);
throw ex;
} catch (Error ex) {
createErrorCountUpdater.incrementAndGet(this);
setCreateError(ex);
JdbcUtils.close(conn);
throw ex;
} finally {
long nano = System.nanoTime() - connectStartNanos;
createTimespan += nano;
creatingCountUpdater.decrementAndGet(this);
}
return new PhysicalConnectionInfo(conn, connectStartNanos, connectedNanos, initedNanos, validatedNanos, variables, globalVariables);
}
Copy the code
DruidAbstractDataSource class creates a connection method
The connection creation process is performed by the corresponding Driver’s CONNECT method. Returns a Connect class that encapsulates the Socket
public Connection createPhysicalConnection(String url, Properties info) throws SQLException {
Connection conn;
if (getProxyFilters().size() == 0) {
// getDriver returns the driver class. Here is the org.h2.Driver we registered.
conn = getDriver().connect(url, info);
} else {
conn = new FilterChainImpl(this).connection_connect(info);
}
createCountUpdater.incrementAndGet(this);
return conn;
}
Copy the code
Here is a h2 database Driver org.h2.Driver to explain how to “create a connection”. The driver source code is also very simple, but the so-called sparrow is small, the five organs, by looking at the source can fully understand the connection process.
Org.h2. Driver Creates a connection
public Connection connect(String url, Properties info) throws SQLException {
try {
if (info == null) {
info = new Properties();
}
if(! acceptsURL(url)) {return null;
}
if (url.equals(DEFAULT_URL)) {
return DEFAULT_CONNECTION.get();
}
// This looks updated. You go in and you go back. Never mind.
Connection c = DbUpgrade.connectOrUpgrade(url, info);
if(c ! =null) {
return c;
}
// Create a connection here
return new JdbcConnection(url, info);
} catch (Exception e) {
throwDbException.toSQLException(e); }}Copy the code
//JdbcConnection constructor
public JdbcConnection(ConnectionInfo ci, boolean useBaseDir)
throws SQLException {
try {
if (useBaseDir) {
String baseDir = SysProperties.getBaseDir();
if(baseDir ! =null) { ci.setBaseDir(baseDir); }}// This will return an embedded or server connection. It encapsulates Socket information
session = new SessionRemote(ci).connectEmbeddedOrServer(false);
trace = session.getTrace();
int id = getNextId(TraceObject.CONNECTION);
setTrace(trace, TraceObject.CONNECTION, id);
this.user = ci.getUserName();
if (isInfoEnabled()) {
trace.infoCode("Connection " + getTraceObjectName()
+ " = DriverManager.getConnection("
+ quote(ci.getOriginalURL()) + "," + quote(user)
+ ", \ "\");");
}
this.url = ci.getURL();
scopeGeneratedKeys = ci.getProperty("SCOPE_GENERATED_KEYS".false);
closeOld();
watcher = CloseWatcher.register(this, session, keepOpenStackTrace);
} catch (Exception e) {
throwlogAndConvert(e); }}Copy the code
JdbcConnection class main constructs:
As you can see, the JdbcConnection class has a property called SessionInterface and its implementation class is SessionRemote. The SessionRemote class has a List of Transfer classes (by the way, the List is used for multiple database hosts). The Transfer class has a Socket property, and classes such as DataInputInputStream and DataOutputStream for communication
// Initialize the Transfer class. Here you can see that the username, password, and so on are set up to connect to the database host
private Transfer initTransfer(ConnectionInfo ci, String db, String server)
throws IOException {
Socket socket = NetUtils.createSocket(server,
Constants.DEFAULT_TCP_PORT, ci.isSSL());
Transfer trans = new Transfer(this, socket);
trans.setSSL(ci.isSSL());
trans.init();
trans.writeInt(Constants.TCP_PROTOCOL_VERSION_MIN_SUPPORTED);
trans.writeInt(Constants.TCP_PROTOCOL_VERSION_MAX_SUPPORTED);
trans.writeString(db);
trans.writeString(ci.getOriginalURL());
/ / user name
trans.writeString(ci.getUserName());
/ / password
trans.writeBytes(ci.getUserPasswordHash());
trans.writeBytes(ci.getFilePasswordHash());
String[] keys = ci.getKeys();
trans.writeInt(keys.length);
for (String key : keys) {
trans.writeString(key).writeString(ci.getProperty(key));
}
try {
done(trans);
clientVersion = trans.readInt();
trans.setVersion(clientVersion);
if (clientVersion >= Constants.TCP_PROTOCOL_VERSION_14) {
if(ci.getFileEncryptionKey() ! =null) {
trans.writeBytes(ci.getFileEncryptionKey());
}
}
trans.writeInt(SessionRemote.SESSION_SET_ID);
trans.writeString(sessionId);
done(trans);
if (clientVersion >= Constants.TCP_PROTOCOL_VERSION_15) {
autoCommit = trans.readBoolean();
} else {
autoCommit = true;
}
return trans;
} catch (DbException e) {
trans.close();
throwe; }}Copy the code
The detailed creation process will not be introduced here, after all, this is not the focus of this article, interested readers can download THE SOURCE code of H2 to learn, in addition, the author has seen the company’s own research database Java client driver in work. The process is similar. The Connection class encapsulates Socket information. The sending and receiving messages are then written to and read from the socket (similar to the flow of socket and ServerSocket communication).
DruidAbstractDataSource validateConnection method
After the createPhysicalConnection method has created the connection, Druid verifies that the connection is valid. The dataSource method can use the dataSource. SetValidationQuery () set is used to verify statements. Usually we can set validationQuery = SELECT 1 or something like that.
public void validateConnection(Connection conn) throws SQLException {
// Returns the attribute validationQuery
String query = getValidationQuery();
if (conn.isClosed()) {
throw new SQLException("validateConnection: connection closed");
}
// If validConnectionChecker is not null, validConnectionChecker is used to check whether the connection is valid. Specific for Mysql, PG, Oracle, etc
// validConnectionChecker
if(validConnectionChecker ! =null) {
boolean result;
Exception error = null;
try {
result = validConnectionChecker.isValidConnection(conn, validationQuery, validationQueryTimeout);
if (result && onFatalError) {
lock.lock();
try {
if (onFatalError) {
onFatalError = false; }}finally{ lock.unlock(); }}}catch (SQLException ex) {
throw ex;
} catch (Exception ex) {
result = false;
error = ex;
}
if(! result) { SQLException sqlError = error ! =null ? //
new SQLException("validateConnection false", error) //
: new SQLException("validateConnection false");
throw sqlError;
}
return;
}
// If validationQuery is set, the query will be executed. If no result is returned, an exception is thrown
if (null! = query) { Statement stmt =null;
ResultSet rs = null;
try {
stmt = conn.createStatement();
if (getValidationQueryTimeout() > 0) {
stmt.setQueryTimeout(getValidationQueryTimeout());
}
rs = stmt.executeQuery(query);
if(! rs.next()) {throw new SQLException("validationQuery didn't return a row");
}
if (onFatalError) {
lock.lock();
try {
if (onFatalError) {
onFatalError = false; }}finally{ lock.unlock(); }}}finally{ JdbcUtils.close(rs); JdbcUtils.close(stmt); }}}Copy the code
Druid returns the Connection encapsulated as PhysicalConnectionInfo
The main attributes of PhysicalConnectionInfo are as follows:
private Connection connection;
private long connectStartNanos;
private long connectedNanos;
private long initedNanos;
private long validatedNanos;
private Map<String, Object> vairiables;
private Map<String, Object> globalVairiables;
Copy the code
DruidConnectionHolder encapsulates DataSource and PhysicalConnectionInfo together
DruidConnectionHolder[] connections Adds the connection to the end
// init connections
while (poolingCount < initialSize) {
try {
// Create a connection. Encapsulate it with DruidConnectionHolder
PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection();
DruidConnectionHolder encapsulates DataSource and PhysicalConnectionInfo together
DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo);
connections[poolingCount++] = holder;
} catch (SQLException ex) {
LOG.error("init datasource error, url: " + this.getUrl(), ex);
if (initExceptionThrow) {
connectError = ex;
break;
} else {
Thread.sleep(3000); }}}Copy the code
// DruidConnectionHolder constructor.
public DruidConnectionHolder(DruidAbstractDataSource dataSource, PhysicalConnectionInfo pyConnectInfo)
throws SQLException{
this(dataSource,
pyConnectInfo.getPhysicalConnection(),
pyConnectInfo.getConnectNanoSpan(),
pyConnectInfo.getVairiables(),
pyConnectInfo.getGlobalVairiables());
}
Copy the code
Call createAndLogThread (), createAndStartCreatorThread (), createAndStartDestroyThread () method to create threads
// Start the monitoring data recording thread
createAndLogThread();
// Start the connection creation thread
createAndStartCreatorThread();
// Start the connection detection thread
createAndStartDestroyThread();
// The main thread uses CountDownLatch to wait until the CreateConnectionThread and DestroyConnectionThread above are created
initedLatch.await();
Copy the code
CreateAndStartCreatorThread run method of a class
public void run(a) {
initedLatch.countDown();
long lastDiscardCount = 0;
int errorCount = 0;
for (;;) {
// addLast
try {
lock.lockInterruptibly();
} catch (InterruptedException e2) {
break;
}
long discardCount = DruidDataSource.this.discardCount;
boolean discardChanged = discardCount - lastDiscardCount > 0;
lastDiscardCount = discardCount;
try {
boolean emptyWait = true;
if(createError ! =null
&& poolingCount == 0
&& !discardChanged) {
emptyWait = false;
}
if (emptyWait
&& asyncInit && createCount < initialSize) {
emptyWait = false;
}
if (emptyWait) {
// There must be a thread wait before a connection is created
// poolingCount is an existing connection, notEmptyWaitThreadCount is because the connection is insufficient for the number of threads waiting to create the connection
if (poolingCount >= notEmptyWaitThreadCount //&& (! (keepAlive && activeCount + poolingCount < minIdle)) && ! isFailContinuous() ) { empty.await(); }// Prevent the creation of more connections than maxActive
if (activeCount + poolingCount >= maxActive) {
empty.await();
continue; }}}catch (InterruptedException e) {
lastCreateError = e;
lastErrorTimeMillis = System.currentTimeMillis();
if((! closing) && (! closed)) { LOG.error("create connection Thread Interrupted, url: " + jdbcUrl, e);
}
break;
} finally {
lock.unlock();
}
// The connection can be created when the conditions are met
PhysicalConnectionInfo connection = null;
try {
connection = createPhysicalConnection();
} catch (SQLException e) {
LOG.error("create connection SQLException, url: " + jdbcUrl + ", errorCode " + e.getErrorCode()
+ ", state " + e.getSQLState(), e);
errorCount++;
if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) {
// fail over retry attempts
setFailContinuous(true);
if (failFast) {
lock.lock();
try {
notEmpty.signalAll();
} finally{ lock.unlock(); }}if (breakAfterAcquireFailure) {
break;
}
try {
Thread.sleep(timeBetweenConnectErrorMillis);
} catch (InterruptedException interruptEx) {
break; }}}catch (RuntimeException e) {
LOG.error("create connection RuntimeException", e);
setFailContinuous(true);
continue;
} catch (Error e) {
LOG.error("create connection Error", e);
setFailContinuous(true);
break;
}
if (connection == null) {
continue;
}
boolean result = put(connection);
if(! result) { JdbcUtils.close(connection.getPhysicalConnection()); LOG.info("put physical connection to pool failed.");
}
errorCount = 0; // reset errorCount
if (closing || closed) {
break; }}}Copy the code
DestroyConnectionThread’s run method
public void run(a) {
initedLatch.countDown();
for (;;) {
// Delete from the front
try {
if (closed || closing) {
break;
}
if (timeBetweenEvictionRunsMillis > 0) {
Thread.sleep(timeBetweenEvictionRunsMillis);
} else {
Thread.sleep(1000); //
}
if (Thread.interrupted()) {
break;
}
destroyTask.run();
} catch (InterruptedException e) {
break; }}}Copy the code
3.2.2 getConnectionDirect ()
After the init() method, you get the connection directly
public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException {
int notFullTimeoutRetryCnt = 0;
for (;;) {
// handle notFullTimeoutRetry
DruidPooledConnection poolableConnection;
try {
// Get the connection
poolableConnection = getConnectionInternal(maxWaitMillis);
} catch (GetConnectionTimeoutException ex) {
if (notFullTimeoutRetryCnt <= this.notFullTimeoutRetryCount && ! isFull()) { notFullTimeoutRetryCnt++;if (LOG.isWarnEnabled()) {
LOG.warn("get connection timeout retry : " + notFullTimeoutRetryCnt);
}
continue;
}
throw ex;
}
// If testOnBorrow is set to true. ValidConnectionChecker or validationQuery is checked to see if the connection is available when it is fetched.
if (testOnBorrow) {
boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);
if(! validate) {if (LOG.isDebugEnabled()) {
LOG.debug("skip not validate connection.");
}
discardConnection(poolableConnection.holder);
continue; }}else {
if (poolableConnection.conn.isClosed()) {
discardConnection(poolableConnection.holder); // Pass null to avoid repeated closing
continue;
}
if (testWhileIdle) {
final DruidConnectionHolder holder = poolableConnection.holder;
long currentTimeMillis = System.currentTimeMillis();
long lastActiveTimeMillis = holder.lastActiveTimeMillis;
long lastExecTimeMillis = holder.lastExecTimeMillis;
long lastKeepTimeMillis = holder.lastKeepTimeMillis;
if(checkExecuteTime && lastExecTimeMillis ! = lastActiveTimeMillis) { lastActiveTimeMillis = lastExecTimeMillis; }if (lastKeepTimeMillis > lastActiveTimeMillis) {
lastActiveTimeMillis = lastKeepTimeMillis;
}
long idleMillis = currentTimeMillis - lastActiveTimeMillis;
long timeBetweenEvictionRunsMillis = this.timeBetweenEvictionRunsMillis;
if (timeBetweenEvictionRunsMillis <= 0) {
timeBetweenEvictionRunsMillis = DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS;
}
if (idleMillis >= timeBetweenEvictionRunsMillis
|| idleMillis < 0 // unexcepted branch
) {
boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);
if(! validate) {if (LOG.isDebugEnabled()) {
LOG.debug("skip not validate connection.");
}
discardConnection(poolableConnection.holder);
continue; }}}}if (removeAbandoned) {
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
poolableConnection.connectStackTrace = stackTrace;
poolableConnection.setConnectedTimeNano();
poolableConnection.traceEnable = true;
activeConnectionLock.lock();
try {
activeConnections.put(poolableConnection, PRESENT);
} finally{ activeConnectionLock.unlock(); }}if (!this.defaultAutoCommit) {
poolableConnection.setAutoCommit(false);
}
returnpoolableConnection; }}Copy the code
The maxWait argument is passed when the **getConnectionInternal(Long maxWait)** method is called to get the connection. Gets the maximum waiting time for a connection from the connection pool, in ms, default -1, which means it will wait forever
if (maxWait > 0) {
holder = pollLast(nanos);
} else {
holder = takeLast();
}
Copy the code
Here is the pollLast method:
The Condition’s awaitNanos(Long nanosTimeout) method is used to block the thread. If there are no connections in the pool, call empty.Signal (), notify CreateThread to create the connection, and wait a specified amount of time before waking up to see if any are available.
In addition, when the wait time is up, null is returned instead of waiting forever
if (poolingCount == 0) {
if (estimate > 0) {
continue;
}
waitNanosLocal.set(nanos - estimate);
return null;
}
Copy the code
private DruidConnectionHolder pollLast(long nanos) throws InterruptedException, SQLException { long estimate = nanos; for (;;) { if (poolingCount == 0) { emptySignal(); // send signal to CreateThread create connection if (failFast && isFailContinuous()) { throw new DataSourceNotAvailableException(createError); } if (estimate <= 0) { waitNanosLocal.set(nanos - estimate); return null; } notEmptyWaitThreadCount++; if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) { notEmptyWaitThreadPeak = notEmptyWaitThreadCount; } try { long startEstimate = estimate; estimate = notEmpty.awaitNanos(estimate); // signal by // recycle or // creator notEmptyWaitCount++; notEmptyWaitNanos += (startEstimate - estimate); if (! enable) { connectErrorCountUpdater.incrementAndGet(this); if (disableException ! = null) { throw disableException; } throw new DataSourceDisableException(); } } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread notEmptySignalCount++; throw ie; } finally { notEmptyWaitThreadCount--; } if (poolingCount == 0) { if (estimate > 0) { continue; } waitNanosLocal.set(nanos - estimate); return null; } } decrementPoolingCount(); DruidConnectionHolder last = connections[poolingCount]; connections[poolingCount] = null; long waitNanos = nanos - estimate; last.setLastNotEmptyWaitNanos(waitNanos); return last; }}Copy the code
Here is the takeLast() method:
You can see that takeLast calls the notempty.await () method, and it keeps calling.
MaxWait by default does not timeout, meaning that if there are no idle connections in the pool, it will wait forever
DruidConnectionHolder takeLast(a) throws InterruptedException, SQLException {
try {
while (poolingCount == 0) {
emptySignal(); // send signal to CreateThread create connection
if (failFast && isFailContinuous()) {
throw new DataSourceNotAvailableException(createError);
}
notEmptyWaitThreadCount++;
if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) {
notEmptyWaitThreadPeak = notEmptyWaitThreadCount;
}
try {
notEmpty.await(); // signal by recycle or creator
} finally {
notEmptyWaitThreadCount--;
}
notEmptyWaitCount++;
if(! enable) { connectErrorCountUpdater.incrementAndGet(this);
if(disableException ! =null) {
throw disableException;
}
throw newDataSourceDisableException(); }}}catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
notEmptySignalCount++;
throw ie;
}
decrementPoolingCount();
DruidConnectionHolder last = connections[poolingCount];
connections[poolingCount] = null;
return last;
}
Copy the code
TestOnBorrow, testWhileIdle, and removeAbandoned parameters
- If testOnBorrow is true, the connection is verified, and if it fails, the loop is cleaned up and re-entered, otherwise the next step is skipped
- If testWhileIdle to true, distance than timeBetweenEvictionRunsMillis activation time, last time to clean up
- If removeAbandoned is true, the connection is stored in activeConnections and periodically processed by the cleanup thread
CreateThread and the consumer thread that fetches connections to the thread pool form a consumer-producer model
3.2.3Connection.close() Reclaiming the Connection
The close() method of the DruidPooledConnection class is called. The code is as follows:
The recycle method is actually called
public void close(a) throws SQLException {
if (this.disable) {
return;
}
DruidConnectionHolder holder = this.holder;
if (holder == null) {
if (dupCloseLogEnable) {
LOG.error("dup close");
}
return;
}
DruidAbstractDataSource dataSource = holder.getDataSource();
boolean isSameThread = this.getOwnerThread() == Thread.currentThread();
if(! isSameThread) { dataSource.setAsyncCloseConnectionEnable(true);
}
if (dataSource.isAsyncCloseConnectionEnable()) {
syncClose();
return;
}
if(! CLOSING_UPDATER.compareAndSet(this.0.1)) {
return;
}
try {
for (ConnectionEventListener listener : holder.getConnectionEventListeners()) {
listener.connectionClosed(new ConnectionEvent(this));
}
List<Filter> filters = dataSource.getProxyFilters();
if (filters.size() > 0) {
FilterChainImpl filterChain = new FilterChainImpl(dataSource);
filterChain.dataSource_recycle(this);
} else {
// The recycle method is calledrecycle(); }}finally {
CLOSING_UPDATER.set(this.0);
}
this.disable = true;
}
Copy the code
The recycle() method has omitted some code:
protected void recycle(DruidPooledConnection pooledConnection) throws SQLException {...// If the connection is already closed, increment closeCount by 1 and return
if (physicalConnection.isClosed()) {
lock.lock();
try {
if (holder.active) {
activeCount--;
holder.active = false;
}
closeCount++;
} finally {
lock.unlock();
}
return; }...// If testOnReturn is set, testConnectionInternal(DruidConnectionHolder holder, Connection conn) is called to check whether the Connection is valid. Refer to the validateConnection(Connection Conn) method
if (testOnReturn) {
boolean validate = testConnectionInternal(holder, physicalConnection);
if(! validate) { JdbcUtils.close(physicalConnection); destroyCountUpdater.incrementAndGet(this);
lock.lock();
try {
if (holder.active) {
activeCount--;
holder.active = false;
}
closeCount++;
} finally {
lock.unlock();
}
return; }}...// Finally get the lock and put the connection back into the connections array
lock.lock();
try {
if (holder.active) {
activeCount--;
holder.active = false;
}
closeCount++;
result = putLast(holder, currentTimeMillis);
recycleCount++;
} finally{ lock.unlock(); }}boolean putLast(DruidConnectionHolder e, long lastActiveTimeMillis) {
if (poolingCount >= maxActive || e.discard || this.closed) {
return false;
}
e.lastActiveTimeMillis = lastActiveTimeMillis;
connections[poolingCount] = e;
incrementPoolingCount();
if (poolingCount > poolingPeak) {
poolingPeak = poolingCount;
poolingPeakTime = lastActiveTimeMillis;
}
notEmpty.signal();
notEmptySignalCount++;
return true;
}
Copy the code
Ref:
www.cnblogs.com/ZhangZiShen…
Blog.csdn.net/lishoubin_1…
Tech.youzan.com/shu-ju-ku-l…
Zhengjianglong. Cn / 2019/07/14 /…