This is the 14th day of my participation in the More text Challenge. For more details, see more text Challenge

>>>> 😜😜😜 Making: 👉 github.com/black-ant

A. The preface

Starting and configuring Seata was introduced earlier. This article takes a look at the process of starting the Seata Client:

Inventory Seata: Seata Server startup process

Seata: Seata Server configuration process

Seata: Client configuration process

Seata transaction initiation can actually be divided into two stages, the first stage is the Client initiates the Request, the second stage is the Server side to process the Request, and the creation of TX object, etc. The Seata Client initiation process was first reviewed in this article. Here’s a closer look:

1.1 Main process of Seata AT mode

There are three roles in Seata:

// Transaction Coordinator (TC) - Transaction CoordinatorMaintains the state of global and branch transactions and drives global transaction commit or rollback.// TM (Transaction Manager) - Transaction ManagerDefine the scope of a global transaction: start, commit, or roll back a global transaction.// RM (Resource Manager) - The Resource ManagerManages resources for branch transaction processing, talks to TCS to register branch transactions and report branch transaction status, and drives branch transaction commit or rollback.Copy the code

1.2 Main process comb

No nonsense, the flow chart is here, can understand, you can not read the following (PS: not very detailed, after a complete supplement) :

The following process is summarized by referring to official documents. The whole process is divided into two stages:

Update product set name = ‘GTS’ where name = ‘TXC’;

A phase

  1. Select * from SQL where name = ‘TXC’; select * from SQL where name = ‘TXC’;
  2. Mirroring before query: Generates a query statement based on the obtained condition information to locate the data.
  3. Execute business SQL: update the name of this record to ‘GTS’.
  4. Mirror after query: Locate data using the primary key based on the results of the previous mirror.
  5. Insert rollback log: A rollback log record is composed of the mirror data before and after and the information related to the service SQL, and inserted into the UNDO_LOG table.
  6. Apply for a global lock on a record in the Product table whose primary key is equal to 1.
  7. Local transaction commit: Updates to the business data are committed together with the UNDO LOG generated in the previous step.
  8. The result of local transaction submission is reported to the TC.

Phase two: Submission

  1. After receiving a branch submission request from a TC, put the request into an asynchronous task queue and return a successful submission result to the TC immediately.
  2. Branch commit requests in the asynchronous task phase will asynchronously and in batches delete the corresponding UNDO LOG records.

1.3 introduce GlobalLock

Declaration transactions are only executed in a single local RM, but transactions need to ensure that the record to be updated (or choose to update) is not in the middle of a global transaction

public @interface GlobalLock {
    /** * Custom global lock retry interval (in ms) */
    int lockRetryInternal(a) default 0;

    /** * Custom global lock retry times */
    int lockRetryTimes(a) default- 1;
}

Copy the code

1.4 introduce GlobalTransactional

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD,ElementType.TYPE})
@Inherited
public @interface GlobalTransactional {

    /** * Global transaction timeout mills(milliseconds) */
    int timeoutMills(a) default DefaultValues.DEFAULT_GLOBAL_TRANSACTION_TIMEOUT;

    /** * Global transaction instance given name */
    String name(a) default "";

    /** * roll back to the specified class */
    Class<? extends Throwable>[] rollbackFor() default {};

    /** * Name of the Class to be rolled back */
    String[] rollbackForClassName() default {};

    /** * Not to roll back the specified class */
    Class<? extends Throwable>[] noRollbackFor() default {};

    /** * Specifies the name of the class not to be rolled back */
    String[] noRollbackForClassName() default {};

    /** * Propagation of global transactions */
    Propagation propagation(a) default Propagation.REQUIRED;

    /** * Custom global lock retry interval (in ms) */
    int lockRetryInternal(a) default 0;

    /** * Custom global lock retry time */
    int lockRetryTimes(a) default- 1;
}
Copy the code

2. Initiation of Client transactions

Transaction initiation is initiated by TM, and transaction branch is completed by RM. Look at the main process:

The whole process will only talk about the first two steps this time:

  • Interceptor intercepts an operation
  • TransactionalTemplate handles the overall flow of a transaction

Take a look at the class flow:

// TM process 1: annotation processing phase, this phase annotation method, will create a new transactionGlobalTransactionalInterceptor # invoke: interceptors to intercept method GlobalTransactionalInterceptor # handleGlobalTransaction agent: TransactionalTemplate # beginTransaction: Open transaction main process DefaultGlobalTransaction # begin: call API process DefaultTransactionManager # begin: call API actual process// App A RM process 1: This method creates the Statement class for the ProxyPreparedStatementProxy() PreparedStatementHandler # instantiateStatement// App A RM process 2: agent execution scheme
ExecuteTemplate # execute 
BaseTransactionalExecutor # execute(Object... args)
AbstractDMLBaseExecutor # doExecute(Object... args)
AbstractDMLBaseExecutor # executeAutoCommitTrue(Object[] args)
AbstractDMLBaseExecutor # executeAutoCommitFalse(Object[] args)
ConnectionProxy # doCommit()
ConnectionProxy # processGlobalTransactionCommit()

// App A RM process 3: call the actual Connect executionDruidPooledConnection # commit() : DruidPooledConnection? - Commit will be committedCopy the code

2.1 Entry for annotations

// How to scan annotations:
@GlobalTransactional(timeoutMills = 300000, name = "dubbo-gts-seata-example")

// Step 1: Core GlobalTransactionScanner

// Take a look at the inheritance system of this classC01 - GlobalTransactionScanner E - AbstractAutoProxyCreator: Aop agent - > PS: C01_001 I - ConfigurationChangeListener: -> PS:C01_002 i-initializingBean: means to initialize running i-ApplicationContextaware: The ApplicationContext runtime is notified i-DisposableBean: Will be processed when the Bean is destroyedCopy the code

PS: C01_001 AbstractAutoProxyCreator role

In short, this class is proxied. An AOP proxy is used to wrap the BeanPostProcessor implementation of each qualified bean, delegating to a specified interceptor before calling the bean itself.

GlobalTransactionScanner details

For details, see Client configuration >>>> Client configuration process

2.2 Annotation interception

Usually methods are annotated to implement some functionality, and the ultimate principle is the Proxy, whether created by Java’s native Proxy class or implemented by Aop, the core is the same.

Step 1: Proxy

The proxy used here is CglibAopProxy

Step 2: The process of the invocation

The object of the first call for GlobalTransactionalInterceptor, represented by the invoke method to complete the relevant operation

PS: I originally thought that the framework would scan all methods marked @GlobalTransactional first and manage them through the relevant Manager, but looking at the code in this area alone, it is directly obtained after the broker

GlobalTransactionalInterceptor # invoke main process

// Take a look at the main flow of the Invoke method
C50- GlobalTransactionalInterceptor
    I- MethodInterceptor
    M50_01- invoke(finalMethodInvocation (MethodInvocation) - parse the target using the MethodInvocationclass- byClassUtilsAnalytical targetMethod- byfindBridgedMethodFind the original method ->PS:M50_01_01- Get the original methodGlobalTransactionalAnnotations - Get the original methodGlobalLockComments - IfGlobalTransactionalIf the annotation exists, executeglobalTransactionalAnnotation- ifGlobalLockThere is,GlobalTransactionalIf no, executehandleGlobalLock? - Notice, this is prioritizedglobalTransactionalAnnotation

Copy the code

GlobalTransactionalInterceptor other main method

            
// PS: Here's a look at some other methodsM - initDefaultGlobalTransactionTimeout M - supporting Configuration to load a Configuration changepublic void onChangeEvent(ConfigurationChangeEvent event) {
        if (ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION.equals(event.getDataId())) {
            disable = Boolean.parseBoolean(event.getNewValue().trim());
        } else if (ConfigurationKeys.CLIENT_DEGRADE_CHECK.equals(event.getDataId())) {
            degradeCheck = Boolean.parseBoolean(event.getNewValue());
            if(! degradeCheck) { degradeNum =0;
            }
        }
    }



M- handleGlobalLock
M- handleGlobalTransaction
    
// Notice that the transactionalTemplate is called to complete the main logic in this method
Object handleGlobalTransaction(final MethodInvocation methodInvocation,
        final GlobalTransactional globalTrxAnno) throws Throwable {
        boolean succeed = true;
        try {
            // Core, calling Template while building a TransactionalExecutor passed in
            // TransactionalExecutor is an executor object
            return transactionalTemplate.execute(new TransactionalExecutor() {
                @Override
                public Object execute(a) throws Throwable {
                    // Here is the corresponding method proxy, used to execute the concrete object -> PRO22001
                    return methodInvocation.proceed();
                }

                public String name(a) {
                    String name = globalTrxAnno.name();
                    if(! StringUtils.isNullOrEmpty(name)) {return name;
                    }
                    return formatMethod(methodInvocation.getMethod());
                }

                // This method is used to invoke transaction information in Tempalte
                @Override
                public TransactionInfo getTransactionInfo(a) {
                    // reset the value of timeout
                    int timeout = globalTrxAnno.timeoutMills();
                    if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {
                        timeout = defaultGlobalTransactionTimeout;
                    }
                    
                    // The current transaction information is built
                    TransactionInfo transactionInfo = new TransactionInfo();
                    transactionInfo.setTimeOut(timeout);
                    transactionInfo.setName(name());
                    transactionInfo.setPropagation(globalTrxAnno.propagation());
                    transactionInfo.setLockRetryInternal(globalTrxAnno.lockRetryInternal());
                    transactionInfo.setLockRetryTimes(globalTrxAnno.lockRetryTimes());
                    Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
                    
                    // Build methods that fall back and methods that do not fall back
                    for(Class<? > rbRule : globalTrxAnno.rollbackFor()) { rollbackRules.add(new RollbackRule(rbRule));
                    }
                    for (String rbRule : globalTrxAnno.rollbackForClassName()) {
                        rollbackRules.add(new RollbackRule(rbRule));
                    }
                    for(Class<? > rbRule : globalTrxAnno.noRollbackFor()) { rollbackRules.add(new NoRollbackRule(rbRule));
                    }
                    for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
                        rollbackRules.add(new NoRollbackRule(rbRule));
                    }
                    transactionInfo.setRollbackRules(rollbackRules);
                    returntransactionInfo; }}); }catch (TransactionalExecutor.ExecutionException e) {
            TransactionalExecutor.Code code = e.getCode();
            // Call different failureHandler methods with the Failure type
            switch (code) {
                case RollbackDone:
                    throw e.getOriginalException();
                case BeginFailure:
                    succeed = false;
                    failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                case CommitFailure:
                    succeed = false;
                    failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                case RollbackFailure:
                    failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException());
                    throw e.getOriginalException();
                case RollbackRetrying:
                    failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());
                    throw e.getOriginalException();
                default:
                    throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code)); }}finally {
            if (degradeCheck) {
                EVENT_BUS.post(newDegradeCheckEvent(succeed)); }}}Copy the code

[PRO22001] : methodInvocation Details

[PRO22002] : TransactionInfo Details

Conclusion:

From here you can see, in GlobalTransactionalInterceptor blocker, first call the template method, at the same time the executor of the method by which needs to perform incoming, and method through invoke agent call

In addition, exceptions are handled in the interceptor, not the Template type of Template

3. Logical processing of Client transactions

Client transactions are handled by the TransactionalTemplate template method. Take a look at the TransactionalTemplate function

In the previous step, when the interceptor completes its interception, it comes to the Template processing section, where the core class is: TransactionalTemplate, this step is the template method design pattern, in which TransactionalExecutor is called to execute the final logic

C51- TransactionalTemplate
	M51_01- execute 
        
// The comment is already clear, like this source code, clear, literal translation to know the process
public Object execute(TransactionalExecutor business) throws Throwable {
     // 1. Obtain transactionInfo
     TransactionInfo txInfo = business.getTransactionInfo();
     if (txInfo == null) {
           throw new ShouldNeverHappenException("transactionInfo does not exist");
     }
     / / 1.1 for the current transaction, if not empty, tx role for 'GlobalTransactionRole. The Participant'.
     GlobalTransaction tx = GlobalTransactionContext.getCurrent();

     // 1.2 Transaction propagation processing -> PS:M51_01_01
     Propagation propagation = txInfo.getPropagation();
     SuspendedResourcesHolder suspendedResourcesHolder = null;
     try {
           switch (propagation) {
                case NOT_SUPPORTED:
                     // If the transaction exists, suspend it.
                     if (existingTransaction(tx)) {
                           suspendedResourcesHolder = tx.suspend();
                     }
                     // Execute without transaction and return.
                     return business.execute();
                case REQUIRES_NEW:
                     // If the transaction exists, pause it and start a new transaction.
                     if (existingTransaction(tx)) {
                           suspendedResourcesHolder = tx.suspend();
                           tx = GlobalTransactionContext.createNew();
                     }
                     // Continue and execute with the new transaction
                     break;
                case SUPPORTS:
                     // If the transaction does not exist, the transaction is not executed.
                     if (notExistingTransaction(tx)) {
                           return business.execute();
                     }
                     // Continue and execute with the new transaction
                     break;
                case REQUIRED:
                     // If the current transaction exists, execute with the current transaction, otherwise continue and execute with the new transaction
                     break;
                case NEVER:
                     // Throw an exception if the transaction exists.
                     if (existingTransaction(tx)) {
                           throw new TransactionException(
                                String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
                                           , tx.getXid()));
                     } else {
                           // Execute without transaction and return.
                           return business.execute();
                     }
                case MANDATORY:
                     // If the transaction does not exist, throw an exception.
                     if (notExistingTransaction(tx)) {
                           throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
                     }
                     // Continue and execute the current transaction.
                     break;
                default:
                     throw new TransactionException("Not Supported Propagation:" + propagation);
           }

           / / 1.3 if the current GlobalTransaction is null, use roles' GlobalTransactionRole. The Launcher 'create a new Transaction.
           if (tx == null) {
                tx = GlobalTransactionContext.createNew();
           }

           // Set the current tx configuration to holder
           GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);

           try {
                / / 2. If the current tx (GlobalTransaction) role as' GlobalTransactionRole. The Launcher ', send the beginTransaction request to TC,
                // Otherwise do nothing. However, hooks will always be triggered.
                beginTransaction(txInfo, tx);

                Object rs;
                try {
                     // Perform business
                     rs = business.execute();
                } catch (Throwable ex) {
                     // 3. The services required for rollback are abnormal.
                     completeTransactionAfterThrowing(txInfo, tx, ex);
                     throw ex;
                }

                // 4. When everything is normal, commit the transaction.
                commitTransaction(tx);

                return rs;
           } finally {
                //5. Clear the eventresumeGlobalLockConfig(previousConfig); triggerAfterCompletion(); cleanUp(); }}finally {
           // If the transaction is suspended, resume it.
           if(suspendedResourcesHolder ! =null) { tx.resume(suspendedResourcesHolder); }}}// PS:M51_01_01 what are the Propagation objects?

        
Copy the code

As you can see, the main flow of transaction processing has been completed in this

In conclusion, when an exception occurs after the execution of business.execute(), it will be processed in execute, while in Template, the main operation is the management and rollback of transactions, which is also the core of the design pattern using the Template approach, and only involves the unified logic

conclusion

In this article, we look at the Seata logic initiation process. We can see that a transaction is initiated by the following steps:

  • beginTransaction(txInfo, tx)
  • business.execute()
  • completeTransactionAfterThrowing(txInfo, tx, ex)
  • commitTransaction(tx)

In the next post we will see what these steps do >>> 👉

Appendix. Supplementary points

For read and write quarantine, refer to the official document 👉 Seata Quarantine Policy

4.1 Read quarantine details

The default global isolation level FOR Seata (AT mode) is Read Uncommitted, which is implemented through SELECT FOR UPDATE

Details of the process

  • The execution of the SELECT FOR UPDATE statement requests a global lock
  • If the global lock is held by another transaction, release the local lock (roll back the local execution of the SELECT FOR UPDATE statement) and retry (meaning waiting FOR the ongoing global commit to complete)

During this process, the query is blocked until the global lock is obtained, i.e. the data read is committed

PS: There is a question here, if the insert is too many cases, will cause the global lock has been occupied or occupied frequently, so here is still to do read and write separation

4.2 Write Quarantine Details

Write isolation is the key to ensuring transaction uniformity, which involves global and local locking features. To ensure accuracy, use the following modes:

1,2 represents 2 transactions:

A one-phase commit process

  1. Open the local transaction, get the local lock, get the global lock, commit the local transaction, release the local lock (PS: always hold the global lock)
  2. Other transactions commit, open the local transaction, get the local lock, wait for the global lock (PS: the previous transaction holds the global lock)

Phase two successfully commits the process

  1. When global processing is complete, the global lock is released
  2. Other transactions get the global lock and follow the original process

Two-phase rollback process (If an exception occurs, transaction 1 is rolled back)

1. An exception occurs, attempt to roll back and wait for the local lock (PS: because the local lock is held by another transaction) 2. The local transaction is rolled back and the local lock is released. 1. Get the local lock and complete the rollback