To better understand distributed transactions, let’s start with a question:

Assume that there are two tables ta and TB in the database. We need to change ra records in ta and RB records in TB respectively. Only when ra and RB records are modified successfully, the operation succeeds or the rollback fails. The handling of this situation is very simple, just need to use a transaction. However, if TA and TB are not in the same database or not on the same database instance, the transaction of the database can not be managed at the same time, how to solve this situation? How to ensure consistency between TA and TB operations?

In this case, TCC can be used to solve the above problems. TCC abstracts the service process into try-confirm-cancel by implementing the two-phase protocol:

Phase 1: Try, which is used to reserve resources

Phase 2: comfirm/ Cancel. Comfirm uses reserved resources and submits services. Cancel releases reserved resources and rolls back services

TCC is described from three aspects

  1. Seata TCC source code implementation
  2. Write the TCC implementation notes
  3. How is TCC mode highly available in SEATA

1. Implementation of TCC in SEATA

Seata consists mainly of three modules

  1. Transaction Coordinator (TC) – The Transaction Coordinator maintains the status of global and branch transactions and drives global transactions to be committed or rolled back.
  2. TM (Transaction Manager) – The Transaction Manager defines the scope of a global Transaction: start, commit, or roll back the global Transaction.
  3. Resource Manager (RM) – The Resource Manager manages the resources for branch transaction processing, talks with TCS to register branch transactions and report the status of branch transactions, and drives branch transactions to commit or roll back.

First of all, we look at the use of the source code: github.com/seata/seata… The service provider provides two services, TccActionOne and TccActionTwo

public interface TccActionOne {
    @TwoPhaseBusinessAction(name = "DubboTccActionOne" , commitMethod = "commit", rollbackMethod = "rollback")
    public boolean prepare(BusinessActionContext actionContext, int a);
    public boolean commit(BusinessActionContext actionContext);
    public boolean rollback(BusinessActionContext actionContext);
}

public interface TccActionTwo {
    @TwoPhaseBusinessAction(name = "DubboTccActionTwo" , commitMethod = "commit", rollbackMethod = "rollback")
    public boolean prepare(BusinessActionContext actionContext, String b);
    public boolean commit(BusinessActionContext actionContext);
    public boolean rollback(BusinessActionContext actionContext);

}
Copy the code

The specific GlobalTransactionScanner is introduced on both the Provider and consumer side, which initializes and registers TM and RM:

The specific call is shown as follows:

That’s a simple way to use it. Now that we’ve introduced the basic usage process, let’s look at the specific code implementation, focusing first on the following classes and annotations:

  1. GlobalTransactionScanner, which scans if distributed transactions are enabled and injects proxies into methods annotated with distributed transactions, such as TwoPhaseBusinessAction and GlobalTransactional
  2. Annotate TwoPhaseBusinessAction to indicate the TCC mode used by the method and specify both the commit and cancel methods
  3. The annotation GlobalTransactional is used to indicate that the method being decorated opens a distributed transaction for processing

GlobalTransactionScanner uses the AbstractAutoProxyCreator class to add a dynamic proxy to a method adorned with distributed annotations, so when the service is started, the relevant GlobalTransactionScanner method is executed. The main methods involved are:

  1. GlobalTransactionScanner#initClient, initialize TM and RM clients
  2. GlobalTransactionScanner#wrapIfNecessary, which adds the proxy to methods annotated with TwoPhaseBusinessAction and GlobalTransactional, Respectively modified method injection TccActionInterceptor and GlobalTransactionalInterceptor proxy class, at the same time will be a local service as the RM client registered in the TC server

1.1 Client Initialization

So GlobalTransactionScanner is the Seata client startup class. First look at the initialization of TM and RM clients. TM and RM initialize TmNettyRemotingClient and RmNettyRemotingClient respectively. The two classes is the parent of the AbstractNettyRemotingClient, in the init method of the class, will start a timer to check whether the TC channel is live, and at the same time will send the registration information to the TC, finally will start netty client

public void init() { timerExecutor.scheduleAtFixedRate(new Runnable() { @Override public void run() { // Check whether the channel connected to the TC is alive. If there is no corresponding channel or the channel is closed, the TC will be connected again. At the same time to send the registration information to the TC clientChannelManager in service. Reconnect (getTransactionServiceGroup ()); } }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS); if (NettyClientConfig.isEnableClientBatchSendRequest()) { mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD, MAX_MERGE_SEND_THREAD, KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD)); mergeSendExecutorService.submit(new MergedSendRunnable()); } super.init(); clientBootstrap.start(); }Copy the code

In NettyClientChannelManager# reconnect method, through access to all registered with the registry of TC service address, And then determine the current cache NettyClientChannelManager# channels in whether there is a corresponding address and survival of the state of the channel, if does not exist, will create the channel for the TC address, at the same time to change the address to send the registration information, TmNettyRemotingClient and RmNettyRemotingClient register information is RegisterTMRequest and RegisterRMRequest respectively. Is the main steps in the netty.Net tyClientChannelManager# doConnect created in the channel, and then in NettyPoolableFactory# makeObject method sends the corresponding registration message

In the GlobalTransactionScanner#wrapIfNecessary method, proxy implementations are added to TwoPhaseBusinessAction and GlobalTransactional modified methods

@Override protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) { try { synchronized (PROXYED_SET) { if (PROXYED_SET.contains(beanName)) { return bean; } interceptor = null; //check TCC proxy if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) { //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName)); ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)interceptor); } else { Class<? > serviceInterface = SpringProxyUtils.findTargetClass(bean); Class<? >[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean); if (! existsAnnotation(new Class[]{serviceInterface}) && ! existsAnnotation(interfacesIfJdk)) { return bean; } if (interceptor == null) { if (globalTransactionalInterceptor == null) { globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook); ConfigurationCache.addConfigListener( ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)globalTransactionalInterceptor); } interceptor = globalTransactionalInterceptor; } } LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName()); if (! AopUtils.isAopProxy(bean)) { bean = super.wrapIfNecessary(bean, beanName, cacheKey); } else { AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean); Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null)); for (Advisor avr : advisor) { advised.addAdvisor(0, avr); } } PROXYED_SET.add(beanName); return bean; } } catch (Exception exx) { throw new RuntimeException(exx); }}Copy the code

Method for TwoPhaseBusinessAction annotation modification method to generate TccActionInterceptor agent for GlobalTransactional generated GlobalTransactionalInterceptor agent. But if there is a TwoPhaseBusinessAction annotation in the TCCBeanParserUtils#isTccAutoProxy method, the registration information is sent via RmNettyRemotingClient#registerResource, In the specific methods in DefaultRemotingParser# parserRemotingServiceInfo, personal feel this step can be removed a bit redundant

1.2 Server Initialization

The TC server startup class io.seata.server. server #main initializes the DefaultCoordinator class, which is the processing class for all messages. The main properties of DefaultCoordinator are as follows

// Various scheduled tasks, Used to retry private ScheduledThreadPoolExecutor retryRollbacking = new ScheduledThreadPoolExecutor (1, new NamedThreadFactory("RetryRollbacking", 1)); private ScheduledThreadPoolExecutor retryCommitting = new ScheduledThreadPoolExecutor(1,new NamedThreadFactory("RetryCommitting", 1)); private ScheduledThreadPoolExecutor asyncCommitting = new ScheduledThreadPoolExecutor(1,new NamedThreadFactory("AsyncCommitting", 1)); private ScheduledThreadPoolExecutor timeoutCheck = new ScheduledThreadPoolExecutor(1,new NamedThreadFactory("TxTimeoutCheck", 1)); private ScheduledThreadPoolExecutor undoLogDelete = new ScheduledThreadPoolExecutor(1,new NamedThreadFactory("UndoLogDelete", 1)); private RemotingServer remotingServer; Private DefaultCore core; // Major transaction processingCopy the code

NettyRemotingServer#registerProcessor:

private void registerProcessor() { // 1. registry on request message processor ServerOnRequestProcessor onRequestProcessor = new ServerOnRequestProcessor(this, getHandler()); Super. registerProcessor(messageType. branch_register, onRequestProcessor, messageExecutor); super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor); super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor); super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor); super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor); super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor); super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor); super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor); super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor); // 2. Registry on message processor, The result of the branch transaction commit and rollback response processing ServerOnResponseProcessor onResponseProcessor = new ServerOnResponseProcessor (getHandler (), getFutures()); super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, messageExecutor); super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, messageExecutor); // 3. Registry rm message processor, RegRmProcessor RegRmProcessor = new RegRmProcessor(this); super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor); RegTmProcessor RegTmProcessor = new RegTmProcessor(this); super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null); // 5. Registry heartbeat message processor Process the client's heartbeat messages ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor (this); super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null); }Copy the code

You can see from the figure above that registration messages are processed in the RegRmProcessor and RegTmProcessor.

1.3 TCC message processing analysis

Let’s start with the sample code from the beginning:

@GlobalTransactional Public String doTransactionCommit(){Boolean result = tCCactionOne. prepare(null, 1); if(! result){ throw new RuntimeException("TccActionOne failed."); } List list = new ArrayList(); list.add("c1"); list.add("c2"); result = tccActionTwo.prepare(null, "two", list); if(! result){ throw new RuntimeException("TccActionTwo failed."); } return RootContext.getXID(); }Copy the code

When call doTransactionCommit method, will enter the proxy class GlobalTransactionalInterceptor, will eventually perform to TransactionalTemplate# the execute method, the main logic of the method is as follows:

beginTransaction(txInfo, tx); // start transaction rs = business.execute(); // Execute the business code, i.e. DoTransactionCommit commitTransaction(tx); // Commit the transactionCopy the code
  1. In the beginTransaction method, a GlobalBeginRequest message is sent to the TC service to obtain the transaction XID, which is eventually processed in the server DefaultCore# BEGIN method:

    1. Generate a random number as transactionId through snowflake algorithm. Generate xID according to transactionId, the specific rule is ipAddress + “:” + port +,”:” + transactionId, ipAddress is the local IP address, Port indicates the current service port
    2. Write the global transaction record to global_table with xID as primary key and transactionId as index
  2. Execute the business code, which calls the remote service, such as the tccactionOne. prepare method. Since this method is decorated with TwoPhaseBusinessAction, it executes the TccActionInterceptor proxy class, The invoke method in the TccActionInterceptor class has the following logic:

String branchId = doTccActionLogStore(method, arguments, businessAction, actionContext); // Register the branch transaction and get the branch transaction id..... ret.put(Constants.TCC_METHOD_RESULT, targetCallback.execute()); The business code that executes the actual branch transaction, such as the tccactionone.prepare methodCopy the code

In the doTccActionLogStore method, the client sends BranchRegisterRequest message, where clientId and lockKeys are both null. The resourceId is the name of the name in TwoPhaseBusinessAction and the message will end up in the server AbstractCore#branchRegister: Generate branchId using a random algorithm Generate a branch record and insert the record into the BRANch_table table, where branchId is the primary key

  1. DoTransactionCommit all after the success of the business logic implementation, GlobalTransactionalInterceptor proxy class performs to commitTransaction method, for the global transaction commit, The client will pass DefaultTransactionManager# commit send GlobalCommitRequest transaction commit message method, after the server receives the message, Commits global transactions via DefaultCore#doGlobalCommit. The main logic of this method is as follows:
    1. Call AbstractCore#branchCommitSend method for each branch transaction and send BranchCommitRequest message to the corresponding branch transaction client to commit the branch transaction. Upon receiving the message, the client executes the COMMIT method specified in the TwoPhaseBusinessAction annotation to complete the commit of the branch transaction
  2. When any branch service invocation fails, such as the tCCactionOne. prepare call, the global transaction is rolled back, and then all branch transactions are rolled back on the TC server

2. Write the notes of TCC implementation

Writing a complete implementation of TCC is required to deal with empty rollback, idempotent operations, and suspension.

  1. When the global transaction is rolled back, the branch transaction try interface may not be triggered or is still in the try phase due to network problems. At this time, the TC has triggered the cancel of the branch transaction. In this case, the branch transaction service must return success, or there will be retry, that is, the branch transaction must support the empty rollback
  2. Power ascending Because of network jitter, the try method in a branch transaction may be executed more than once, so to ensure that resources are not consumed twice, a solution can be to maintain a unique ID for each request, such as the branch transaction ID, to filter out repeated requests
  3. When the global transaction is rolled back, the branch transaction’s try method is executed for a long time. As a result, the branch transaction executes the cancel method before the try method is successfully executed. As a result, the resources locked by the try cannot be released. If an execution record of Cancel is found, the current try operation is rolled back

3. How can TCC mode be highly available in SEATA

To achieve high availability, to achieve stateless service, Seata does the following:

  1. Storage, TC transaction data storage avoid using local storage, can use mysql, etc
  2. Service discovery and registration. From the implementation analysis above, we can see that TC service will register the IP of the service in the registry, such as ZK, ETCD, etc. TM and RM clients will pull the addresses of all TC servers and register the IP of client services in all TC services. This ensures that each TC service has link information for all clients