preface
Recently when I looked at SpringBoot source code, I suddenly remembered the previous transaction mechanism and its transmission, so I seriously looked at the relevant source code, especially to share with you, and manual implementation of a simple version, the main sightseeing process is as follows.
The basic idea
The mechanism for SpringBoot transaction propagation is the binding of connections to transaction managers and the need to suspend, or unbind, previous transactions as new transactions occur. A new ConnectionHolder is created for REQUIRES_NEW. The ConnectionHolder is an owner of the current Connection and is used to retrieve the current Connection.
The source code parsing
First let’s take a look at the interceptor entry
public Object invoke(MethodInvocation invocation) throws Throwable {
// Load the target classClass<? > targetClass = invocation.getThis() ! =null ? AopUtils.getTargetClass(invocation.getThis()) : null;
// Load the method of the target class
Method var10001 = invocation.getMethod();
invocation.getClass();
// Do some AOP processing of transactions (core)
return this.invokeWithinTransaction(var10001, targetClass, invocation::proceed);
}
Copy the code
Then we go to the invokeWithinTransaction method, where we don’t go if (txAttr! = null && tm instanceof CallbackPreferringPlatformTransactionManager) {this condition, Because our transaction management is not CallbackPreferringPlatformTransactionManager, so we entered the else
If a transaction is created, return the new transaction, otherwise the old transaction
TransactionAspectSupport.TransactionInfo txInfo = this.createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
try {
// Intercepts the target method
result = invocation.proceedWithInvocation();
} catch (Throwable var17) {
// Handle exceptions after they occur, such as rollback
this.completeTransactionAfterThrowing(txInfo, var17);
throw var17;
} finally {
// Clear the information about the current transaction and add the previous transaction
this.cleanupTransactionInfo(txInfo);
}
// Commit the current transaction
this.commitTransactionAfterReturning(txInfo);
return result;
Copy the code
Here we see createTransactionIfNecessary mainly
if(txAttr ! =null && ((TransactionAttribute)txAttr).getName() == null) {
// If the current transaction information is empty, the default is given
txAttr = new DelegatingTransactionAttribute((TransactionAttribute)txAttr) {
public String getName(a) {
returnjoinpointIdentification; }}; } TransactionStatus status =null;
if(txAttr ! =null) {
if(tm ! =null) {
// Get the transaction object
status = tm.getTransaction((TransactionDefinition)txAttr);
} else if (this.logger.isDebugEnabled()) {
this.logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured"); }}return this.prepareTransactionInfo(tm, (TransactionAttribute)txAttr, joinpointIdentification, status);
Copy the code
Let’s take a look at the getTransaction methods, since they are usually REQUIRED and REQUIRED_NEW, and the others are similar.
// Determine if there is a transaction before, and if there is some processing
if (this.isExistingTransaction(transaction)) {
return this.handleExistingTransaction((TransactionDefinition)definition, transaction, debugEnabled);
Copy the code
REQUIRES_NEW = REQUIRES_NEW = REQUIRES_NEW
if (definition.getPropagationBehavior() == 3) {
if (debugEnabled) {
this.logger.debug("Suspending current transaction, creating new transaction with name [" + definition.getName() + "]");
}
// Suspend the current transaction
suspendedResources = this.suspend(transaction);
try {
newSynchronization = this.getTransactionSynchronization() ! =2;
// Create a new transaction state
DefaultTransactionStatus status = this.newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// Start transaction (core)
this.doBegin(transaction, definition);
// Do some property Settings and synchronization Settings for transaction management
this.prepareSynchronization(status, definition);
return status;
} catch (Error | RuntimeException var7) {
// Resume pending transactions
this.resumeAfterBeginException(transaction, suspendedResources, var7);
throw var7;
}
Copy the code
Let’s look at how the transaction pending, in to suspend method, click enter into doSuspend again, you can see DataSourceTransactionManager
protected Object doSuspend(Object transaction) {
// Get the transaction object of the current thread
DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)transaction;
// Clear the connection of the current object because the transaction is suspended and the next transaction is dropped
txObject.setConnectionHolder((ConnectionHolder)null);
// Unbind the previous transaction
return TransactionSynchronizationManager.unbindResource(this.obtainDataSource());
}
Copy the code
The previous transaction is suspended, and the new transaction, connection and binding process proceed.
// Check whether the current thread already has a connection, because this is empty, that is, unbind there, here again verify
if(! txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {// Get a new connection
Connection newCon = this.obtainDataSource().getConnection();
if (this.logger.isDebugEnabled()) {
this.logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
// Bind the new connection to the transaction object of the current thread
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
// Get the connection Holder
con = txObject.getConnectionHolder().getConnection();
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
// Sets whether the current connection is readable
txObject.setPreviousIsolationLevel(previousIsolationLevel);
// Turn off auto commit
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (this.logger.isDebugEnabled()) {
this.logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}
// Set the transaction to read-only
this.prepareTransactionalConnection(con, definition);
// Set transaction to take effect
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = this.determineTimeout(definition);
if(timeout ! = -1) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// If the connection is new, it will be bound, because if it is not new, it is already bound, and an error will be reported if it is bound again
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(this.obtainDataSource(), txObject.getConnectionHolder());
}
Copy the code
The main reason why the REQUIRE_NEW rollback does not affect the parent method is because they are two different connections. In general, the whole idea is that if a new transaction needs to be created, the REQUIRE_NEW state, the new connection should bind the current transaction management. In this case, the child method is in the REQUIRE_NEW state. If an exception occurs and we roll back the parent method, it will not affect the parent method because we are binding the connection of the current child method.
Write one by hand
Connect related operation tool classes
/ * * *@author chujun
* @version 1.0
* @dateThe 2021-03-06 "* /
@Component
public class ConnectionHolderUtil {
private static DataSource dataSource;
private static final Logger log = LoggerFactory.getLogger(ConnectionHolderUtil.class);
@Autowired
public void setDataSource(DataSource dataSource) {
ConnectionHolderUtil.dataSource = dataSource;
}
private static ThreadLocal<ConnectionHolder> connectionHolderThreadLocal = new ThreadLocal<>();
/** * get database connection *@return Connection
*/
public static ConnectionHolder getConnectionHolder(boolean isNew){
ConnectionHolder connectionHolder = connectionHolderThreadLocal.get();
// There is no need to generate a new direct return if there is a connection
if(connectionHolder ! =null && !isNew){
return connectionHolder;
}
try {
// Get a new connection
Connection connection = dataSource.getConnection();
// Turn off auto commit
connection.setAutoCommit(false);
connectionHolder = new ConnectionHolder(connection);
connectionHolderThreadLocal.set(connectionHolder);
// Bind the connection
TransactionSynchronizationManager.bindResource(dataSource,connectionHolder);
return connectionHolder;
} catch (SQLException e) {
log.error("Database connection acquisition failed",e);
return null; }}/** * Commit transaction */
public static void commit(a){
ConnectionHolder connectionHolder = connectionHolderThreadLocal.get();
if(connectionHolder == null) {return;
}
try {
connectionHolder.getConnection().commit();
} catch (SQLException e) {
log.error("Submission failed",e); }}/** * transaction rollback */
public static void rollback(a){
ConnectionHolder connectionHolder = connectionHolderThreadLocal.get();
if(connectionHolder == null) {return;
}
try {
connectionHolder.getConnection().rollback();
} catch (SQLException e) {
log.error("Rollback failed",e); }}/** * close the connection */
public static void close(a){
ConnectionHolder connectionHolder = connectionHolderThreadLocal.get();
if(connectionHolder == null) {return;
}
Connection connection = connectionHolder.getConnection();
try {
connection.close();
} catch (SQLException e) {
log.error("Failed to close database connection",e); }}/** * Resuming pending transactions */
public static void resume(Object susPend){
TransactionSynchronizationManager.unbindResource(dataSource);
TransactionSynchronizationManager.bindResource(dataSource,susPend);
connectionHolderThreadLocal.set((ConnectionHolder) susPend);
}
/** * suspend the current transaction */
public static Object hangTrasaction(a){
return TransactionSynchronizationManager.unbindResource(dataSource);
}
/** * Check whether the current connection is closed *@return* /
public static boolean isClose(a){
if(connectionHolderThreadLocal.get() == null) {return true;
}
try {
return connectionHolderThreadLocal.get().getConnection().isClosed();
} catch (SQLException e) {
log.error("Failed to get connection status");
}
return true; }}Copy the code
AOP aspect interception
/ * * *@author chujun
* @version 1.0
* @dateThe 2021-02-28 protest * /
@Aspect
@Component
public class TransactionAspect{
private static final Logger log = LoggerFactory.getLogger(TransactionAspect.class);
@Pointcut("@annotation(com.cj.annotion.Transactional)")
public void point(a){}
@Around("point()")
public Object transactionHandle(ProceedingJoinPoint joinPoint) throws Throwable {
MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
Transactional transactional = methodSignature.getMethod().getAnnotation(Transactional.class);
// Method execution result
Object result = null;
// The pending transaction
Object supSend = null;
boolean isNew = false;
// If the transaction type is require require_new
if(transactional.propagation() == Propagation.REQUIRED || transactional.propagation() == Propagation.REQUIRES_NEW)
{
// If the transaction type is require
if(transactional.propagation() == Propagation.REQUIRED){
// If there is a transaction, use the original transaction
ConnectionHolderUtil.getConnectionHolder(false);
}
// If the transaction type is require_new
else {
// Suspend the current transaction
// Create a new connection transaction and bind to the transaction synchronization manager
// Bind a new connection
supSend = ConnectionHolderUtil.hangTrasaction();
// Get a new connection
isNew = true;
ConnectionHolderUtil.getConnectionHolder(true);
}
try {
// Execute the intercepting method
result = joinPoint.proceed();
// Commit the transaction
if(!ConnectionHolderUtil.isClose()) {
ConnectionHolderUtil.commit();
}
}catch (Exception e){
// Rollback the transaction
if(!ConnectionHolderUtil.isClose()) {
ConnectionHolderUtil.rollback();
}
}finally {
// Close the connection
if(isNew) {
ConnectionHolderUtil.close();
}
// Resume pending transactions
if(supSend ! =null) { ConnectionHolderUtil.resume(supSend); }}}/ / otherwise
else {
log.info("Transaction type not currently supported");
}
log.info("Section execution completed");
returnresult; }}Copy the code
Transaction annotations
@Documented
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Transactional {
Propagation propagation(a) default Propagation.REQUIRED;
Class <? extends Throwable>[] rollBackFor() default {};
}
Copy the code
mode of transmission
public enum Propagation {
REQUIRED(0),
SUPPORTS(1),
MANDATORY(2),
REQUIRES_NEW(3),
NOT_SUPPORTED(4),
NEVER(5),
NESTED(6);
private final int value;
private Propagation(int value) {
this.value = value;
}
public int value(a) {
return this.value; }}Copy the code
Call the client
@Service
public class BlogService {
@Autowired
private BlogDao blogDao;
@Transactional(propagation = Propagation.REQUIRED)
public void testMyTransaction(Blog blog){
blogDao.insertBlog(blog);
BlogService blogService = (BlogService) AopContext.currentProxy();
try {
blogService.testMyTransaction2(blog);
}catch(Exception e){ e.printStackTrace(); }}@Transactional(propagation = Propagation.REQUIRES_NEW)
public void testMyTransaction2(Blog blog){
blogDao.insertBlog(blog);
int a = 1/0; }}Copy the code
As shown above, if testMyTransaction2 fails at this point, testMyTransaction will not be affected because they are different transactions.
conclusion
Spring transaction management is also used, I have not been in touch with this for a long time, but I can only use it before, after these days of interview, I found that learning things must be in-depth to understand, can only use can not. ConnectionHolder: ThreadLocal: ThreadLocal: ThreadLocal: ThreadLocal: ThreadLocal: ThreadLocal: ThreadLocal: ThreadLocal: ThreadLocal: ThreadLocal: ThreadLocal: ThreadLocal: ThreadLocal: ThreadLocal: ThreadLocal: ThreadLocal: ThreadLocal: ThreadLocal: ThreadLocal: ThreadLocal: ThreadLocal Last but not least, thank you for browsing and feel free to point out any mistakes.