Release notes
< the groupId > IO. Seata < / groupId > < artifactId > seata - parent < / artifactId > < version > 1.2.0 < / version >Copy the code
1. Use of SEATA
Seata AT mode has three types of characters:
- The TC transaction coordinator maintains the health status of global transactions and is responsible for coordinating and driving commit or rollback of global transactions
- RM transaction participants control branch transactions, are responsible for branch registration, status reporting, and receive instructions from the transaction coordinator to drive commit and rollback of branch (local) transactions
- The TM transaction initiator controls the boundaries of a global transaction, is responsible for starting a global transaction, and ultimately initiating a global commit or rollback resolution
1.1 Transaction Coordinator – Seata Server
Build seata server
Three tables:
T_global_transaction: global transaction table T_BRANch_TRANSACTION: branch transaction table t_global_lock: global lock tableCopy the code
1.2 Transaction Initiator and Participant – SeATA Client
- Introducing dependent dependencies
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-seata</artifactId>
</dependency>
Copy the code
-
The transaction initiator and transaction participant create a local UNdo_log table
-
The transaction initiator and transaction participant configure information about SEata-Server
-
Annotate the transaction initiator’s methods with @globalTransactionL
1.3 Distributed transaction scenario
@globalTransactional public void order(){order = new order(); @globalTransactional public void order(){order = new order(); order.setUserId(1); order.setProductId(1000); order.setStatus(1); order.setCount(10); Ordermapper.insert (order); / / buckle account accountFeignClient decreaseAccount (); / / inventory reduction storageFeignClient. DecreaseStorage (); }Copy the code
Flowchart of seata AT mode
2. Seata integrates SpringBoot
The SeataAutoConfiguration configuration class is imported into the meta-INF/Spring. factories file in the seata-spring-boot-starter classpath.
public class SeataAutoConfiguration { private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoConfiguration.class); // Save an applicationContext singleton @bean (BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER) @conditionalonmissingbean (name = {BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER}) public SpringApplicationContextProvider springApplicationContextProvider() { return new SpringApplicationContextProvider(); ConditionalOnMissingBean(failureHandler.class) public FailureHandler failureHandler() { return new DefaultFailureHandlerImpl(); } // delegate @bean@dependson ({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER,} // Delegate @bean@dependson ({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER,}) to @transaction or @globalLock. BEAN_NAME_FAILURE_HANDLER}) @ConditionalOnMissingBean(GlobalTransactionScanner.class) public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) { if (LOGGER.isInfoEnabled()) { LOGGER.info("Automatically configure Seata"); } return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler); } // The object @bean (BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR) @conditionalonProperty (prefix = StarterConstants.SEATA_PREFIX, name = {"enableAutoDataSourceProxy", "enable-auto-data-source-proxy"}, havingValue = "true", matchIfMissing = true) @ConditionalOnMissingBean(SeataAutoDataSourceProxyCreator.class) public SeataAutoDataSourceProxyCreator seataAutoDataSourceProxyCreator(SeataProperties seataProperties) { return new SeataAutoDataSourceProxyCreator(seataProperties.isUseJdkProxy(),seataProperties.getExcludesForAutoProxying()); }}Copy the code
GlobalTransactionScanner implements the afterProperties method of the InitializingBean interface, which is used to initialize RM and TM. The communication between RM, TM and RS is implemented via NetTY.
public class GlobalTransactionScanner extends AbstractAutoProxyCreator implements InitializingBean, ApplicationContextAware, DisposableBean {// Lifecycle initialization method @override public void afterPropertiesSet() {if (disableGlobalTransaction) {if (LOGGER.isInfoEnabled()) { LOGGER.info("Global transaction is disabled."); } return; } // Initialize netty client TM RM initClient(); } private void initClient() { if (LOGGER.isInfoEnabled()) { LOGGER.info("Initializing Global Transaction Clients ... "); } if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) { throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup)); } /** * TM (Transaction Manager) : a global Transaction Manager that controls Transaction boundaries and is responsible for starting, committing, and rolling back transactions. * Resource Manager (RM) : a Resource Manager that controls branch transactions, is responsible for branch registration and status reporting, receives instructions from transaction coordinator, and drives the submission and rollback of branch (local) transactions. * Transaction Coordinator (TC) : maintains the running status of global transactions and coordinates and drives the submission or rollback of global transactions. */ / initialize TM tmClient.init (applicationId, txServiceGroup); if (LOGGER.isInfoEnabled()) { LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup); } // initialize RM rmClient.init (applicationId, txServiceGroup); if (LOGGER.isInfoEnabled()) { LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup); } if (LOGGER.isInfoEnabled()) { LOGGER.info("Global Transaction Clients are initialized. "); } registerSpringShutdownHook(); }}Copy the code
GlobalTransactionScanner is also a BeanPostProcessor type bean that overwrites the AbstractautoXyCreator wrapIfNecessary method. Abstracta To ProxyCreator is the core class for Implementing AOP in Spring. For those who are interested in spring AOP principles, see the author’s Spring AOP principles
public class GlobalTransactionScanner extends AbstractAutoProxyCreator implements InitializingBean, ApplicationContextAware, DisposableBean { protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {// Whether to disable distributed transactions Flase if (disableGlobalTransaction) {// Return bean directly if global transactions are disabled; } try {synchronized (PROXYED_SET) {return if (PROXYED_SET. Contains (beanName)) {return bean; } interceptor = null; / / determine whether open TCC mode by default doesn't open the if (TCCBeanParserUtils. IsTccAutoProxy (bean, beanName applicationContext)) {/ / TCC interceptor. proxy bean of sofa:reference/dubbo:reference, And LocalTCC / / TCC interceptor interceptor = new TccActionInterceptor (TCCBeanParserUtils. GetRemotingDesc (beanName)); } else {// Get the bean's class type class <? > serviceInterface = SpringProxyUtils.findTargetClass(bean); Class<? >[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean); /** * Determine if a method in this class has the GlobalTransactional or GlobalLock annotation */ if (! existsAnnotation(new Class[]{serviceInterface}) && ! ExistsAnnotation (interfacesIfJdk) {// There is no direct return bean; } / / if there is an if (interceptor = = null) {/ * * * instantiates a global transaction interceptor target method is executed will actually call GlobalTransactionalInterceptor. Invoke () method Interceptor in getAdvicesAndAdvisorsForBean () method returns to the parent class * / interceptor = new GlobalTransactionalInterceptor(failureHandlerHook); ConfigurationFactory.getInstance().addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener) interceptor); } } LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName()); // Check whether the current bean is already a spring proxied bean. Aoputils.isaopproxy (bean)) {// If it is not already a round of Spring proxy logic Will get the current class of interceptor in the parent class (GlobalTransactionalInterceptor) added to the interceptor chain method to realize the parent class getAdvicesAndAdvisorsForBean (because) bean = super.wrapIfNecessary(bean, beanName, cacheKey); } else {/** * if it is already a spring proxy class, So reflection for collection of existing interceptors in the proxy class * and then put the interceptor (getAdvicesAndAdvisorsForBean () returns) is added to the collection * / AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean); Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null)); for (Advisor avr : advisor) { advised.addAdvisor(0, avr); } } PROXYED_SET.add(beanName); return bean; } } catch (Exception exx) { throw new RuntimeException(exx); }} private Boolean existsAnnotation(class <? >[] classes) { if (CollectionUtils.isNotEmpty(classes)) { for (Class<? > clazz : classes) { if (clazz == null) { continue; } Method[] methods = clazz.getMethods(); for (Method method : methods) { GlobalTransactional trxAnno = method.getAnnotation(GlobalTransactional.class); if (trxAnno ! = null) { return true; } GlobalLock lockAnno = method.getAnnotation(GlobalLock.class); if (lockAnno ! = null) { return true; } } } } return false; } protected Object[] getAdvicesAndAdvisorsForBean(Class beanClass, String beanName, TargetSource customTargetSource) throws BeansException { return new Object[]{interceptor}; }}Copy the code
3. Target method execution
After spring’s agent, the target method (add @ GlobalTransaction or @ GlobalLock annotation method) is executed will be GlobalTransactionalInterceptor ‘invoke ` intercept.
public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor { public Object invoke(final MethodInvocation methodInvocation) throws Throwable { Class<? > targetClass = methodInvocation.getThis() ! = null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null; Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass); final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod); / / to get the final annotation metadata GlobalTransactional globalTransactionalAnnotation = getAnnotation (method, GlobalTransactional. Class); final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class); if (! disable && globalTransactionalAnnotation ! = null) {/ / processing GlobalTransactional return handleGlobalTransaction (the methodInvocation globalTransactionalAnnotation); } else if (! disable && globalLockAnnotation ! = null) {// handleGlobalLock return handleGlobalLock(methodInvocation); } else {// call the target methodInvocation directly. } } private Object handleGlobalTransaction(final MethodInvocation methodInvocation, final GlobalTransactional globalTrxAnno) throws Throwable { try { //TransactionalTemplate#execute return transactionalTemplate.execute(new TransactionalExecutor() { @Override public Object execute() throws Throwable { // Invoke the original method return methodInvocation. Proceed (); } public String name() {// If the user specified a name, use the user specified String name = GlobalTrXAnno.name (); if (! StringUtils.isNullOrEmpty(name)) { return name; } / / not specified will generate a return formatMethod (the methodInvocation. GetMethod ()); Override public TransactionInfo getTransactionInfo() { TransactionInfo transactionInfo = new TransactionInfo(); transactionInfo.setTimeOut(globalTrxAnno.timeoutMills()); transactionInfo.setName(name()); transactionInfo.setPropagation(globalTrxAnno.propagation()); Set<RollbackRule> rollbackRules = new LinkedHashSet<>(); for (Class<? > rbRule : globalTrxAnno.rollbackFor()) { rollbackRules.add(new RollbackRule(rbRule)); } for (String rbRule : globalTrxAnno.rollbackForClassName()) { rollbackRules.add(new RollbackRule(rbRule)); } for (Class<? > rbRule : globalTrxAnno.noRollbackFor()) { rollbackRules.add(new NoRollbackRule(rbRule)); } for (String rbRule : globalTrxAnno.noRollbackForClassName()) { rollbackRules.add(new NoRollbackRule(rbRule)); } transactionInfo.setRollbackRules(rollbackRules); return transactionInfo; }}); } catch (TransactionalExecutor.ExecutionException e) { TransactionalExecutor.Code code = e.getCode(); switch (code) { case RollbackDone: throw e.getOriginalException(); case BeginFailure: failureHandler.onBeginFailure(e.getTransaction(), e.getCause()); throw e.getCause(); case CommitFailure: failureHandler.onCommitFailure(e.getTransaction(), e.getCause()); throw e.getCause(); case RollbackFailure: failureHandler.onRollbackFailure(e.getTransaction(), e.getCause()); throw e.getCause(); case RollbackRetrying: failureHandler.onRollbackRetrying(e.getTransaction(), e.getCause()); throw e.getCause(); default: throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code)); }}}}Copy the code
GlobalTransactionalInterceptor# invoke according to various possible to distinguish treatment, we compare the attention here with @ GlobalTransaction annotation processing logic (with @ GlobalLock annotation is simpler, on the back)
Move on to the TransactionalTemplate#execute method
public class TransactionalTemplate { public Object execute(TransactionalExecutor business) throws Throwable { // 1 get TransactionInfo / / 1, to get the transactionInfo (object) of packing annotation information transactionInfo txInfo = business. GetTransactionInfo (); if (txInfo == null) { throw new ShouldNeverHappenException("transactionInfo does not exist"); Return DefaultGlobalTransaction /** * to determine the originator and participant of the global transaction */ if this is the first time GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate(); //1.2 Handle the Transaction propatation and the branchType txInfo.getPropagation(); SuspendedResourcesHolder suspendedResourcesHolder = null; try { switch (propagation) { case NOT_SUPPORTED: suspendedResourcesHolder = tx.suspend(true); return business.execute(); case REQUIRES_NEW: suspendedResourcesHolder = tx.suspend(true); break; case SUPPORTS: if (! existingTransaction()) { return business.execute(); } break; case REQUIRED: break; case NEVER: if (existingTransaction()) { throw new TransactionException( String.format("Existing transaction found for transaction marked with propagation 'never',xid = %s" ,RootContext.getXID())); } else { return business.execute(); } case MANDATORY: if (! existingTransaction()) { throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'"); } break; default: throw new TransactionException("Not Supported Propagation:" + propagation); } try {// 2. begin transaction /** * 2 Begin transaction * Request zID from seata server * bind xID to current thread */ beginTransaction(txInfo, tx); Object rs = null; Try {/ / Do Your Business / * * * Do you perform the target method Will perform to PreparedStatementProxy. The execute () * / rs = business. The execute (); } catch (Throwable ex) {// 3. The needed business exception to rollback completeTransactionAfterThrowing(txInfo, tx, ex); throw ex; /** * 4 commit transaction */ commitTransaction(tx); return rs; } finally {//5. Clear // The callback hook function triggerAfterCompletion(); cleanUp(); } } finally { tx.resume(suspendedResourcesHolder); }}}Copy the code
3.1 Determine the initiator and participant of distributed transactions
Determine the originator or participant of a distributed transaction based on whether a distributed transaction ID exists in the local thread (stored in ThreadLocal)
Public class GlobalTransactionContext {public static GlobalTransaction getCurrentOrCreate() {// get GlobalTransaction GlobalTransaction tx = getCurrent(); Return createNew(); if (tx == null) {return createNew(); } return tx; } private static GlobalTransaction getCurrent() {// obtain the GlobalTransaction xid String xid = rootcontext.getxid (); If (xid == null) {return null; } /** * otherwise join as a participant */ return new DefaultGlobalTransaction(xid, globalStatus.begin, GlobalTransactionRole.Participant); } private static GlobalTransaction createNew() { return new DefaultGlobalTransaction(); } } public class RootContext { public static final String KEY_XID = "TX_XID"; public static final String KEY_XID_INTERCEPTOR_TYPE = "tx-xid-interceptor-type"; private static ContextCore CONTEXT_HOLDER = ContextCoreLoader.load(); public static String getXID() { String xid = CONTEXT_HOLDER.get(KEY_XID); if (StringUtils.isNotBlank(xid)) { return xid; } String xidType = CONTEXT_HOLDER.get(KEY_XID_INTERCEPTOR_TYPE); if (StringUtils.isNotBlank(xidType) && xidType.contains("_")) { return xidType.split("_")[0]; } return null; }}Copy the code
3.2 Handling the propagation behavior of Spring transactions
According to the characteristics of the corresponding propagation behavior, the corresponding processing
3.3 Distributed Transactions
3.3.1 Enabling distributed Transactions
public class TransactionalTemplate { private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor. ExecutionException {try {/ / callback hooks triggerBeforeBegin (); / / DefaultGlobalTransaction. The begin () really open transactions / / bottom will create a global transaction tx and seata server communication. The begin (txInfo. GetTimeOut (), txInfo. The getName ()); // Call the hook function triggerAfterBegin(); } catch (TransactionException txe) { throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.BeginFailure); }}}Copy the code
Apply to seata-Server to enable distributed transactions
public class DefaultGlobalTransaction implements GlobalTransaction { public void begin(int timeout, String name) throws TransactionException {// the GlobalTransactional transactional annotated method is Launcher if (role! = GlobalTransactionRole. The Launcher) {/ / global transaction participants assertXIDNotNull (); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid); } return; } assertXIDNull(); if (RootContext.getXID() ! = null) { throw new IllegalStateException(); } -xID xid = TransactionManager. begin(NULL, NULL, name, timeout); status = GlobalStatus.Begin; /** * bind xid to current thread */ rootContext.bind (xid); if (LOGGER.isInfoEnabled()) { LOGGER.info("Begin new global transaction [{}]", xid); }}}Copy the code
With distributed transactions enabled at this point, the next step is to execute the target method.