Make writing a habit together! This is the fourth day of my participation in the “Gold Digging Day New Plan · April More text Challenge”. Click here for more details.

This article we Seata AT mode for a wave of source analysis, read the article on the best 2PC, 3PC protocol have a certain understanding of the basic implementation of Seata process, know the role of TC, TM, RM.

1 entry GlobalTransactionScanner

We all know that in Seata’s AT pattern, we only need to annotate the @globaltransaction annotation for business methods to achieve GlobalTransaction control across services, so how is this annotation captured and effective?

To explore Seata’s source code, we can start with Seata-spring-boot-starter to see what beans it injects into the Spring container. These beans are the key to Seata’s operation. SeataAutoConfiguration, seata’s core auto-assembly class, injects the GlobalTransactionScanner object, which, as its name suggests, scans global objects.

	@Bean
    @DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
    @ConditionalOnMissingBean(GlobalTransactionScanner.class)
    public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler,
            ConfigurableListableBeanFactory beanFactory,
            @Autowired(required = false) List<ScannerChecker> scannerCheckers) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Automatically configure Seata");
        }

        // Set up the Bean factory
        GlobalTransactionScanner.setBeanFactory(beanFactory);

        / / add '/ meta-inf/services/IO. Seata. Spring. The annotation. ScannerChecker' scan class
        GlobalTransactionScanner.addScannerCheckers(EnhancedServiceLoader.loadAll(ScannerChecker.class));
        // Add the scan class in Spring
        GlobalTransactionScanner.addScannerCheckers(scannerCheckers);
        // Add the configuration for scanning packets
        GlobalTransactionScanner.addScannablePackages(seataProperties.getScanPackages());
        // Add exclude configuration
        GlobalTransactionScanner.addScannerExcludeBeanNames(seataProperties.getExcludesForScanning());

        // Create a global transaction scanner
        return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
    }
Copy the code

So what does the GlobalTransactionScanner object do? First, it inherits its AbstractautoXyCreator and is an enhancement of the wrapIfNecessary method to determine if a class needs a dynamic proxy. This is the key for the scanner to determine whether it is a DYNAMIC proxy for TCC, verify global transaction annotations, and build an interceptor to dynamically proxy when the conditions are met. At the same time, you have implemented the three Spring interfaces InitializingBean, ApplicationContextAware, DisposableBean, so that you can bind this class to the Spring container lifecycle, The afterPropertiesSet method is rewritten to indicate that the resource is loaded and executed before the Bean is initialized. The core of this method is initClient. We can see that client TM and RM are initialized and registered with a Spring ShutdownHook. It overrides the destroy method, which frees resources; Implement ApplicationContextAware to get the context information of the Spring container.

public class GlobalTransactionScanner extends AbstractAutoProxyCreator
    implements ConfigurationChangeListener.InitializingBean.ApplicationContextAware.DisposableBean {...@Override
    protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
        // do checkers
        if(! doCheckers(bean, beanName)) {return bean;
        }

        try {
            synchronized (PROXYED_SET) {
                if (PROXYED_SET.contains(beanName)) {
                    return bean;
                }
                interceptor = null;
                // Determine if it is a TCC proxy
                if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
                    //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
                    interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
                    ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                        (ConfigurationChangeListener)interceptor);
                } else{ Class<? > serviceInterface = SpringProxyUtils.findTargetClass(bean); Class<? >[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);// Determine if the @globalTransaction annotation exists
                    if(! existsAnnotation(newClass[]{serviceInterface}) && ! existsAnnotation(interfacesIfJdk)) {return bean;
                    }

                    if (globalTransactionalInterceptor == null) {
                        globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                        ConfigurationCache.addConfigListener(
                            ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                            (ConfigurationChangeListener)globalTransactionalInterceptor);
                    }
                    interceptor = globalTransactionalInterceptor;
                }

                LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
                if(! AopUtils.isAopProxy(bean)) { bean =super.wrapIfNecessary(bean, beanName, cacheKey);
                } else {
                    AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
                    Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null.null.null));
                    int pos;
                    for (Advisor avr : advisor) {
                        // Find the position based on the advisor's order, and add to advisors by pos
                        pos = findAddSeataAdvisorPosition(advised, avr);
                        advised.addAdvisor(pos, avr);
                    }
                }
                PROXYED_SET.add(beanName);
                returnbean; }}catch (Exception exx) {
            throw newRuntimeException(exx); }}@Override
    public void destroy(a) {
        ShutdownHook.getInstance().destroyAll();
    }
    
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
        this.setBeanFactory(applicationContext);
    }
        
    @Override
    public void afterPropertiesSet(a) {
        if (disableGlobalTransaction) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Global transaction is disabled.");
            }
            ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                    (ConfigurationChangeListener)this);
            return;
        }
        if (initialized.compareAndSet(false.true)) { initClient(); }}private void initClient(a) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Initializing Global Transaction Clients ... ");
        }
        if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
            throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
        }
        //init TM
        TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
        }
        //init RM
        RMClient.init(applicationId, txServiceGroup);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
        }

        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Global Transaction Clients are initialized. "); } registerSpringShutdownHook(); }... }Copy the code

2 Initialize TM

Tm-transaction manager (defines the scope of a global transaction: start a global transaction, commit or roll back a global transaction), TM initialization methods are included in the SEATA-TM and Seata-core packages, which primarily initialize RPC clients built by Netty.

public class TMClient {

    public static void init(String applicationId, String transactionServiceGroup) {
        init(applicationId, transactionServiceGroup, null.null);
    }

    public static void init(String applicationId, String transactionServiceGroup, String accessKey, String secretKey) { TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup, accessKey, secretKey); tmNettyRemotingClient.init(); }}@Override
    public void init(a) {
        // registry processor
        registerProcessor();
        if (initialized.compareAndSet(false.true)) {
            super.init(); }}@Override
    public void init(a) {
        // Initialize the timer scheduler
        timerExecutor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run(a) {
                clientChannelManager.reconnect(getTransactionServiceGroup());
            }
        }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
        // Set the merge sending thread pool to merge messages sent from the same server and reduce unnecessary communication
        if (NettyClientConfig.isEnableClientBatchSendRequest()) {
            mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
                MAX_MERGE_SEND_THREAD,
                KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(),
                new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
            mergeSendExecutorService.submit(new MergedSendRunnable());
        }
        super.init();
        clientBootstrap.start();
    }

	// Obtain serverAddress according to transactionServiceGroup
    void reconnect(String transactionServiceGroup) {
        List<String> availList = null;
        try {
            availList = getAvailServerList(transactionServiceGroup);
        } catch (Exception e) {
            LOGGER.error("Failed to get available servers: {}", e.getMessage(), e);
            return;
        }
        if (CollectionUtils.isEmpty(availList)) {
            RegistryService registryService = RegistryFactory.getInstance();
            String clusterName = registryService.getServiceGroup(transactionServiceGroup);

            if (StringUtils.isBlank(clusterName)) {
                LOGGER.error("can not get cluster name in registry config '{}{}', please make sure registry config correct",
                        ConfigurationKeys.SERVICE_GROUP_MAPPING_PREFIX,
                        transactionServiceGroup);
                return;
            }

            if(! (registryServiceinstanceof FileRegistryServiceImpl)) {
                LOGGER.error("no available service found in cluster '{}', please make sure registry config correct and keep your seata server running", clusterName);
            }
            return;
        }
        for (String serverAddress : availList) {
            try {
                acquireChannel(serverAddress);
            } catch (Exception e) {
                LOGGER.error("{} can not connect to {} cause:{}",FrameworkErrorCode.NetConnect.getErrCode(), serverAddress, e.getMessage(), e); }}}// TM client start method
    @Override
    public void start(a) {
        if (this.defaultEventExecutorGroup == null) {
            this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(),
                new NamedThreadFactory(getThreadPrefix(nettyClientConfig.getClientWorkerThreadPrefix()),
                    nettyClientConfig.getClientWorkerThreads()));
        }
        this.bootstrap.group(this.eventLoopGroupWorker).channel(
            nettyClientConfig.getClientChannelClazz()).option(
            ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true).option(
            ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()).option(
            ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()).option(ChannelOption.SO_RCVBUF,
            nettyClientConfig.getClientSocketRcvBufSize());

        if (nettyClientConfig.enableNative()) {
            if (PlatformDependent.isOsx()) {
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("client run on macOS"); }}else {
                bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED)
                    .option(EpollChannelOption.TCP_QUICKACK, true);
            }
        }

        bootstrap.handler(
            new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) {
                    ChannelPipeline pipeline = ch.pipeline();
                    pipeline.addLast(
                        new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),
                            nettyClientConfig.getChannelMaxWriteIdleSeconds(),
                            nettyClientConfig.getChannelMaxAllIdleSeconds()))
                        .addLast(new ProtocolV1Decoder())
                        .addLast(new ProtocolV1Encoder());
                    if(channelHandlers ! =null) { addChannelPipelineLast(ch, channelHandlers); }}});if (initialized.compareAndSet(false.true) && LOGGER.isInfoEnabled()) {
            LOGGER.info("NettyClientBootstrap has started"); }}Copy the code

3 Initialize RM

RM – resource manager (manages the resources of branch transaction processing, talks with TC to register branch transaction and report the status of branch transaction, and drives the commit or rollback of branch transaction), RM initialization method is also in seata-RM and SeATa-core package, and its core initialization method is shared with TM. By AbstractNettyRemotingClient init method.

public class RMClient {

    /**
     * Init.
     *
     * @param applicationId           the application id
     * @param transactionServiceGroup the transaction service group
     */
    public static void init(String applicationId, String transactionServiceGroup) { RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup); rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get()); rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get()); rmNettyRemotingClient.init(); }}@Override
    public void init(a) {
        // registry processor
        registerProcessor();
        if (initialized.compareAndSet(false.true)) {
            super.init();

            // Found one or more resources that were registered before initialization
            if(resourceManager ! =null&&! resourceManager.getManagedResources().isEmpty() && StringUtils.isNotBlank(transactionServiceGroup)) { getClientChannelManager().reconnect(transactionServiceGroup); }}}Copy the code

4 Initialize the TC

Tc-transaction coordinator (maintains the state of global and branch transactions, and drives the commit or rollback of global transactions). Tc-transaction coordinator exists in the form of middleware, which is an independent Java service under the seATa-Server package, unlike TM and RM which are parasitic in business services in the form of SDK.

TC is a SpringBoot project that is started by the main method marked @SpringBootApplication.

@SpringBootApplication
public class ServerApplication {
    public static void main(String[] args) {`intport = PortHelper.getPort(args); System.setProperty(ConfigurationKeys.SERVER_RPC_PORT, Integer.toString(port)); SpringApplication.run(ServerApplication.class, args); }}Copy the code

The TC starts the Netty based RPC server by implementing CommandLineRunner’s Run method, which is automatically executed after the project is started.

@Component
public class ServerRunner implements CommandLineRunner {

    private static final Logger LOGGER = LoggerFactory.getLogger(ServerRunner.class);

    private boolean started = Boolean.FALSE;


    @Override
    public void run(String... args) {
        try {
            long start = System.currentTimeMillis();
            Server.start(args);
            started = true;

            long cost = System.currentTimeMillis() - start;
            LOGGER.info("seata server started in {} millSeconds", cost);
        } catch (Throwable e) {
            started = Boolean.FALSE;
            LOGGER.error("seata server start error: {} ", e.getMessage(), e);
            System.exit(-1); }}public boolean started(a) {
        returnstarted; }}public class Server {
    /**
     * The entry point of application.
     *
     * @param args the input arguments
     */
    public static void start(String[] args) {
        // create logger
        final Logger logger = LoggerFactory.getLogger(Server.class);
        if (ContainerHelper.isRunningInContainer()) {
            logger.info("The server is running in container.");
        }


		// Initialize the parameter parser, which should always be the first line to execute. Because, here we need to parse the parameters required for startup.
        ParameterParser parameterParser = new ParameterParser(args);

        // The indicator manager is initialized
        MetricsManager.get().init();

        System.setProperty(ConfigurationKeys.STORE_MODE, parameterParser.getStoreMode());

        ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),
                NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
                new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());

        NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);
        //server port
        nettyRemotingServer.setListenPort(parameterParser.getPort());
        UUIDGenerator.init(parameterParser.getServerNode());
        //log store mode : file, db, redis
        SessionHolder.init(parameterParser.getSessionStoreMode());
        LockerManagerFactory.init(parameterParser.getLockStoreMode());
        DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);
        coordinator.init();
        nettyRemotingServer.setHandler(coordinator);
        // register ShutdownHook
        ShutdownHook.getInstance().addDisposable(coordinator);
        ShutdownHook.getInstance().addDisposable(nettyRemotingServer);

        //127.0.0.1 and 0.0.0.0 are not valid here
        if (NetUtil.isValidIp(parameterParser.getHost(), false)) {
            XID.setIpAddress(parameterParser.getHost());
        } else{ XID.setIpAddress(NetUtil.getLocalIp()); } XID.setPort(nettyRemotingServer.getListenPort()); nettyRemotingServer.init(); }}Copy the code

5 Global transaction execution process

The flow of global things is shown in the figure below, and I will analyze the source code of each core process

5.1 TM initiates global transactions

The process to comb

  • The method that scans the @GlobalTransaction annotation mark produces the proxy object
  • Methods that execute the @GlobalTransaction annotation tag trigger the TM transaction template method
  • TM will send a global transaction request to TC and may get an Xid
  • Global transaction open, bind Xid

Source code analysis

First when mark @ GlobalTransaction annotation method is scanning, then will be globalTransactionalInterceptor interceptors to intercept, create a proxy object. The @GlobalTransaction annotated method is treated specially in the proxy object. One key method is the handleGlobalTransaction method, which calls the transaction template method in SEATA-TM to implement one-stage transaction initiation.

public class GlobalTransactionalInterceptor implements ConfigurationChangeListener.MethodInterceptor.SeataInterceptor {

    private static final Logger LOGGER = LoggerFactory.getLogger(GlobalTransactionalInterceptor.class);
    private static final FailureHandler DEFAULT_FAIL_HANDLER = new DefaultFailureHandlerImpl();

    private final TransactionalTemplate transactionalTemplate = new TransactionalTemplate();
    private final GlobalLockTemplate globalLockTemplate = new GlobalLockTemplate();
    private final FailureHandler failureHandler;
    private volatile boolean disable;
    private int order;

    private static int degradeCheckPeriod;
    private static volatile boolean degradeCheck;
    private static int degradeCheckAllowTimes;
    private static volatile Integer degradeNum = 0;
    private static volatile Integer reachNum = 0;
    private static final EventBus EVENT_BUS = new GuavaEventBus("degradeCheckEventBus".true);
    private static ScheduledThreadPoolExecutor executor =
        new ScheduledThreadPoolExecutor(1.new NamedThreadFactory("degradeCheckWorker".1.true));
    
    private static int defaultGlobalTransactionTimeout = 0;

    private void initDefaultGlobalTransactionTimeout(a) {
        if (GlobalTransactionalInterceptor.defaultGlobalTransactionTimeout <= 0) {
            int defaultGlobalTransactionTimeout;
            try {
                defaultGlobalTransactionTimeout = ConfigurationFactory.getInstance().getInt(
                        ConfigurationKeys.DEFAULT_GLOBAL_TRANSACTION_TIMEOUT, DEFAULT_GLOBAL_TRANSACTION_TIMEOUT);
            } catch (Exception e) {
                LOGGER.error("Illegal global transaction timeout value: " + e.getMessage());
                defaultGlobalTransactionTimeout = DEFAULT_GLOBAL_TRANSACTION_TIMEOUT;
            }
            if (defaultGlobalTransactionTimeout <= 0) {
                LOGGER.warn("Global transaction timeout value '{}' is illegal, and has been reset to the default value '{}'", defaultGlobalTransactionTimeout, DEFAULT_GLOBAL_TRANSACTION_TIMEOUT); defaultGlobalTransactionTimeout = DEFAULT_GLOBAL_TRANSACTION_TIMEOUT; } GlobalTransactionalInterceptor.defaultGlobalTransactionTimeout = defaultGlobalTransactionTimeout; }}/** * instantiates a new global transaction interceptor
    public GlobalTransactionalInterceptor(FailureHandler failureHandler) {
        this.failureHandler = failureHandler == null ? DEFAULT_FAIL_HANDLER : failureHandler;
        this.disable = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
            DEFAULT_DISABLE_GLOBAL_TRANSACTION);
        this.order = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.TM_INTERCEPTOR_ORDER, TM_INTERCEPTOR_ORDER);
        degradeCheck = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.CLIENT_DEGRADE_CHECK,
            DEFAULT_TM_DEGRADE_CHECK);
        if (degradeCheck) {
            ConfigurationCache.addConfigListener(ConfigurationKeys.CLIENT_DEGRADE_CHECK, this);
            degradeCheckPeriod = ConfigurationFactory.getInstance().getInt(
                ConfigurationKeys.CLIENT_DEGRADE_CHECK_PERIOD, DEFAULT_TM_DEGRADE_CHECK_PERIOD);
            degradeCheckAllowTimes = ConfigurationFactory.getInstance().getInt(
                ConfigurationKeys.CLIENT_DEGRADE_CHECK_ALLOW_TIMES, DEFAULT_TM_DEGRADE_CHECK_ALLOW_TIMES);
            EVENT_BUS.register(this);
            if (degradeCheckPeriod > 0 && degradeCheckAllowTimes > 0) { startDegradeCheck(); }}this.initDefaultGlobalTransactionTimeout();
    }

    // method interception
    @Override
    public Object invoke(final MethodInvocation methodInvocation) throws Throwable { Class<? > targetClass = methodInvocation.getThis() ! =null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
        Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
        if(specificMethod ! =null && !specificMethod.getDeclaringClass().equals(Object.class)) {
            final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
            final GlobalTransactional globalTransactionalAnnotation =
                getAnnotation(method, targetClass, GlobalTransactional.class);
            final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
            boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
            if(! localDisable) {// Determine the global transaction annotation, global lock annotation, execute the corresponding handler
                if(globalTransactionalAnnotation ! =null) {
                    return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
                } else if(globalLockAnnotation ! =null) {
                    returnhandleGlobalLock(methodInvocation, globalLockAnnotation); }}}return methodInvocation.proceed();
    }

    /** * Global lock handler */
    Object handleGlobalLock(final MethodInvocation methodInvocation,
        final GlobalLock globalLockAnno) throws Throwable {
        return globalLockTemplate.execute(new GlobalLockExecutor() {
            @Override
            public Object execute(a) throws Throwable {
                return methodInvocation.proceed();
            }

            @Override
            public GlobalLockConfig getGlobalLockConfig(a) {
                GlobalLockConfig config = new GlobalLockConfig();
                config.setLockRetryInterval(globalLockAnno.lockRetryInterval());
                config.setLockRetryTimes(globalLockAnno.lockRetryTimes());
                returnconfig; }}); }/** * The handler of the global transaction (one-stage commit) */
    Object handleGlobalTransaction(final MethodInvocation methodInvocation,
        final GlobalTransactional globalTrxAnno) throws Throwable {
        boolean succeed = true;
        try {
            // The template method for things
            return transactionalTemplate.execute(new TransactionalExecutor() {
                @Override
                public Object execute(a) throws Throwable {
                    return methodInvocation.proceed();
                }

                public String name(a) {
                    String name = globalTrxAnno.name();
                    if(! StringUtils.isNullOrEmpty(name)) {return name;
                    }
                    return formatMethod(methodInvocation.getMethod());
                }

                @Override
                public TransactionInfo getTransactionInfo(a) {
                    // Set the timeout period
                    int timeout = globalTrxAnno.timeoutMills();
                    if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {
                        timeout = defaultGlobalTransactionTimeout;
                    }
					// Build the thing object
                    TransactionInfo transactionInfo = new TransactionInfo();
                    transactionInfo.setTimeOut(timeout);
                    transactionInfo.setName(name());
                    transactionInfo.setPropagation(globalTrxAnno.propagation());
                    transactionInfo.setLockRetryInterval(globalTrxAnno.lockRetryInterval());
                    transactionInfo.setLockRetryTimes(globalTrxAnno.lockRetryTimes());
                    Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
                    for(Class<? > rbRule : globalTrxAnno.rollbackFor()) { rollbackRules.add(new RollbackRule(rbRule));
                    }
                    for (String rbRule : globalTrxAnno.rollbackForClassName()) {
                        rollbackRules.add(new RollbackRule(rbRule));
                    }
                    for(Class<? > rbRule : globalTrxAnno.noRollbackFor()) { rollbackRules.add(new NoRollbackRule(rbRule));
                    }
                    for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
                        rollbackRules.add(new NoRollbackRule(rbRule));
                    }
                    transactionInfo.setRollbackRules(rollbackRules);
                    returntransactionInfo; }}); }catch (TransactionalExecutor.ExecutionException e) {
            // Initiate an exception and roll back the object
            TransactionalExecutor.Code code = e.getCode();
            switch (code) {
                case RollbackDone:
                    throw e.getOriginalException();
                case BeginFailure:
                    succeed = false;
                    failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                case CommitFailure:
                    succeed = false;
                    failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                case RollbackFailure:
                    failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException());
                    throw e.getOriginalException();
                case RollbackRetrying:
                    failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());
                    throw e.getOriginalException();
                default:
                    throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code)); }}finally {
            if (degradeCheck) {
                EVENT_BUS.post(newDegradeCheckEvent(succeed)); }}}/** * get comments */
    public <T extends Annotation> T getAnnotation(Method method, Class
        targetClass, Class
       
         annotationClass)
        {
        returnOptional.ofNullable(method).map(m -> m.getAnnotation(annotationClass)) .orElse(Optional.ofNullable(targetClass).map(t ->  t.getAnnotation(annotationClass)).orElse(null));
    }

    private String formatMethod(Method method) {
        StringBuilder sb = new StringBuilder(method.getName()).append("("); Class<? >[] params = method.getParameterTypes();int in = 0;
        for(Class<? > clazz : params) { sb.append(clazz.getName());if (++in < params.length) {
                sb.append(","); }}return sb.append(")").toString();
    }

    @Override
    public void onChangeEvent(ConfigurationChangeEvent event) {
        if (ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION.equals(event.getDataId())) {
            LOGGER.info("{} config changed, old value:{}, new value:{}", ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                disable, event.getNewValue());
            disable = Boolean.parseBoolean(event.getNewValue().trim());
        } else if (ConfigurationKeys.CLIENT_DEGRADE_CHECK.equals(event.getDataId())) {
            degradeCheck = Boolean.parseBoolean(event.getNewValue());
            if(! degradeCheck) { degradeNum =0; }}}/** * Automatic upgrade service detection */
    private static void startDegradeCheck(a) {
        executor.scheduleAtFixedRate(() -> {
            if (degradeCheck) {
                try {
                    String xid = TransactionManagerHolder.get().begin(null.null."degradeCheck".60000);
                    TransactionManagerHolder.get().commit(xid);
                    EVENT_BUS.post(new DegradeCheckEvent(true));
                } catch (Exception e) {
                    EVENT_BUS.post(new DegradeCheckEvent(false));
                }
            }
        }, degradeCheckPeriod, degradeCheckPeriod, TimeUnit.MILLISECONDS);
    }

    @Subscribe
    public static void onDegradeCheck(DegradeCheckEvent event) {
        if (event.isRequestSuccess()) {
            if (degradeNum >= degradeCheckAllowTimes) {
                reachNum++;
                if (reachNum >= degradeCheckAllowTimes) {
                    reachNum = 0;
                    degradeNum = 0;
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("the current global transaction has been restored"); }}}else if(degradeNum ! =0) {
                degradeNum = 0; }}else {
            if (degradeNum < degradeCheckAllowTimes) {
                degradeNum++;
                if (degradeNum >= degradeCheckAllowTimes) {
                    if (LOGGER.isWarnEnabled()) {
                        LOGGER.warn("the current global transaction has been automatically downgraded"); }}}else if(reachNum ! =0) {
                reachNum = 0; }}}@Override
    public int getOrder(a) {
        return order;
    }

    @Override
    public void setOrder(int order) {
        this.order = order;
    }

    @Override
    public SeataInterceptorPosition getPosition(a) {
        returnSeataInterceptorPosition.BeforeTransaction; }}Copy the code

In TM, the template method mode is used here to initiate one-stage submission, and the core class is as follows

This class contains core business processes such as pre-processing of things, execution of things, abnormal rollback of things, and submission of things. We also perform subsequent operations of global things through the entrance here.

public class TransactionalTemplate {

    private static final Logger LOGGER = LoggerFactory.getLogger(TransactionalTemplate.class);

    public Object execute(TransactionalExecutor business) throws Throwable {
        // 1. Obtain transaction information, implemented in proxy class
        TransactionInfo txInfo = business.getTransactionInfo();
        if (txInfo == null) {
            throw new ShouldNeverHappenException("transactionInfo does not exist");
        }
        / / 1.1 for current things, if not empty, tx is' GlobalTransactionRole. The Participant role.
        GlobalTransaction tx = GlobalTransactionContext.getCurrent();

        // 1.2 Handle the propagation of things.
        Propagation propagation = txInfo.getPropagation();
        SuspendedResourcesHolder suspendedResourcesHolder = null;
        try {
            switch (propagation) {
                case NOT_SUPPORTED:
                    // If transaction is existing, suspend it.
                    if (existingTransaction(tx)) {
                        suspendedResourcesHolder = tx.suspend();
                    }
                    // Execute without transaction and return.
                    return business.execute();
                case REQUIRES_NEW:
                    // If transaction is existing, suspend it, and then begin new transaction.
                    if (existingTransaction(tx)) {
                        suspendedResourcesHolder = tx.suspend();
                        tx = GlobalTransactionContext.createNew();
                    }
                    // Continue and execute with new transaction
                    break;
                case SUPPORTS:
                    // If transaction is not existing, execute without transaction.
                    if (notExistingTransaction(tx)) {
                        return business.execute();
                    }
                    // Continue and execute with new transaction
                    break;
                case REQUIRED:
                    // If current transaction is existing, execute with current transaction,
                    // else continue and execute with new transaction.
                    break;
                case NEVER:
                    // If transaction is existing, throw exception.
                    if (existingTransaction(tx)) {
                        throw new TransactionException(
                            String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
                                    , tx.getXid()));
                    } else {
                        // Execute without transaction and return.
                        return business.execute();
                    }
                case MANDATORY:
                    // If transaction is not existing, throw exception.
                    if (notExistingTransaction(tx)) {
                        throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
                    }
                    // Continue and execute with current transaction.
                    break;
                default:
                    throw new TransactionException("Not Supported Propagation:" + propagation);
            }

            / / 1.3 If null, the create new transaction with roles' GlobalTransactionRole. The Launcher '.
            if (tx == null) {
                tx = GlobalTransactionContext.createNew();
            }

            // set current tx config to holder
            GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);

            try {
                // 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
                // else do nothing. Of course, the hooks will still be triggered.
                beginTransaction(txInfo, tx);

                Object rs;
                try {
                    // Do Your Business
                    rs = business.execute();
                } catch (Throwable ex) {
                    // 3. The needed business exception to rollback.
                    completeTransactionAfterThrowing(txInfo, tx, ex);
                    throw ex;
                }

                // 4. everything is fine, commit.
                commitTransaction(tx);

                return rs;
            } finally {
                //5. clearresumeGlobalLockConfig(previousConfig); triggerAfterCompletion(); cleanUp(); }}finally {
            // If the transaction is suspended, resume it.
            if(suspendedResourcesHolder ! =null) { tx.resume(suspendedResourcesHolder); }}}private boolean existingTransaction(GlobalTransaction tx) {
        returntx ! =null;
    }

    private boolean notExistingTransaction(GlobalTransaction tx) {
        return tx == null;
    }

    private GlobalLockConfig replaceGlobalLockConfig(TransactionInfo info) {
        GlobalLockConfig myConfig = new GlobalLockConfig();
        myConfig.setLockRetryInterval(info.getLockRetryInterval());
        myConfig.setLockRetryTimes(info.getLockRetryTimes());
        return GlobalLockConfigHolder.setAndReturnPrevious(myConfig);
    }

    private void resumeGlobalLockConfig(GlobalLockConfig config) {
        if(config ! =null) {
            GlobalLockConfigHolder.setAndReturnPrevious(config);
        } else{ GlobalLockConfigHolder.remove(); }}private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException) throws TransactionalExecutor.ExecutionException {
        //roll back
        if(txInfo ! =null && txInfo.rollbackOn(originalException)) {
            try {
                rollbackTransaction(tx, originalException);
            } catch (TransactionException txe) {
                // Failed to rollback
                throw newTransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.RollbackFailure, originalException); }}else {
            // not roll back on this exception, so commitcommitTransaction(tx); }}private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
        try {
            triggerBeforeCommit();
            tx.commit();
            triggerAfterCommit();
        } catch (TransactionException txe) {
            // 4.1 Failed to commit
            throw newTransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.CommitFailure); }}private void rollbackTransaction(GlobalTransaction tx, Throwable originalException) throws TransactionException, TransactionalExecutor.ExecutionException {
        triggerBeforeRollback();
        tx.rollback();
        triggerAfterRollback();
        // 3.1 Successfully rolled back
        throw new TransactionalExecutor.ExecutionException(tx, GlobalStatus.RollbackRetrying.equals(tx.getLocalStatus())
            ? TransactionalExecutor.Code.RollbackRetrying : TransactionalExecutor.Code.RollbackDone, originalException);
    }

    private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
        try {
            triggerBeforeBegin();
            tx.begin(txInfo.getTimeOut(), txInfo.getName());
            triggerAfterBegin();
        } catch (TransactionException txe) {
            throw newTransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.BeginFailure); }}private void triggerBeforeBegin(a) {
        for (TransactionHook hook : getCurrentHooks()) {
            try {
                hook.beforeBegin();
            } catch (Exception e) {
                LOGGER.error("Failed execute beforeBegin in hook {}", e.getMessage(), e); }}}private void triggerAfterBegin(a) {
        for (TransactionHook hook : getCurrentHooks()) {
            try {
                hook.afterBegin();
            } catch (Exception e) {
                LOGGER.error("Failed execute afterBegin in hook {}", e.getMessage(), e); }}}private void triggerBeforeRollback(a) {
        for (TransactionHook hook : getCurrentHooks()) {
            try {
                hook.beforeRollback();
            } catch (Exception e) {
                LOGGER.error("Failed execute beforeRollback in hook {}", e.getMessage(), e); }}}private void triggerAfterRollback(a) {
        for (TransactionHook hook : getCurrentHooks()) {
            try {
                hook.afterRollback();
            } catch (Exception e) {
                LOGGER.error("Failed execute afterRollback in hook {}", e.getMessage(), e); }}}private void triggerBeforeCommit(a) {
        for (TransactionHook hook : getCurrentHooks()) {
            try {
                hook.beforeCommit();
            } catch (Exception e) {
                LOGGER.error("Failed execute beforeCommit in hook {}", e.getMessage(), e); }}}private void triggerAfterCommit(a) {
        for (TransactionHook hook : getCurrentHooks()) {
            try {
                hook.afterCommit();
            } catch (Exception e) {
                LOGGER.error("Failed execute afterCommit in hook {}", e.getMessage(), e); }}}private void triggerAfterCompletion(a) {
        for (TransactionHook hook : getCurrentHooks()) {
            try {
                hook.afterCompletion();
            } catch (Exception e) {
                LOGGER.error("Failed execute afterCompletion in hook {}", e.getMessage(), e); }}}private void cleanUp(a) {
        TransactionHookManager.clear();
    }

    private List<TransactionHook> getCurrentHooks(a) {
        returnTransactionHookManager.getHooks(); }}Copy the code

The core method is tx.begin(txinfo.getTimeout (), txinfo.getName ()). In the core method, the hook function before and after processing is added respectively. In the core method, TM initiates the global transaction application to TC. TC returns xID to TM, and TM binds xID

    private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
        try {
            triggerBeforeBegin();
            tx.begin(txInfo.getTimeOut(), txInfo.getName());
            triggerAfterBegin();
        } catch (TransactionException txe) {
            throw newTransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.BeginFailure); }}// The default implementation
    @Override
    public void begin(int timeout, String name) throws TransactionException {
        if(role ! = GlobalTransactionRole.Launcher) { assertXIDNotNull();if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
            }
            return;
        }
        assertXIDNull();
        String currentXid = RootContext.getXID();
        if(currentXid ! =null) {
            throw new IllegalStateException("Global transaction already exists," +
                " can't begin a new global transaction, currentXid = " + currentXid);
        }
        // Open it through the global transaction manager
        xid = transactionManager.begin(null.null, name, timeout);
        status = GlobalStatus.Begin;
        RootContext.bind(xid);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Begin new global transaction [{}]", xid); }}Copy the code

5.2 RM executes branch things

The process to comb

  • Data source proxy object injection
  • Parse the SQL to get the type (UPDATE), table name, condition (where name = ‘TXC’) of the SQL
  • According to the condition information obtained by parsing, generate query statements, locate data, or get the front mirror
  • Execute the SQL in the corresponding business code.
  • According to the result of the front mirror, the data is located by the primary key and the rear mirror is obtained.
  • Insert a rollback log record into the UNDO_LOG table composed of front and back mirror data and business SQL related information
  • Register branch transactions with TC before submission
  • Updates to the business data are committed along with the UNDO LOG generated in the previous steps
  • The local transaction submission result is reported to the TC

Source code analysis

The execution of the transaction is also the execution of the business method. When the business method is executed, the undo log will be generated. How is the undo log generated?

Here we also have to go back to seata-spring-boot-starter, where we see that the starter has injected the object of the data source agent, which is the RM pair of SeATA in the data source agent

DataSource, Connection, Statement, PreparedStatement, etc. have proxy enhancements to generate undo logs

@ConditionalOnBean(DataSource.class)
@ConditionalOnExpression("${seata.enabled:true} && ${seata.enableAutoDataSourceProxy:true} && ${seata.enable-auto-data-source-proxy:true}")
@AutoConfigureAfter({SeataCorePropertiesAutoConfiguration.class, SeataClientPropertiesAutoConfiguration.class})
public class SeataDataSourceAutoConfiguration {

    @Bean(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR)
    @ConditionalOnMissingBean(SeataAutoDataSourceProxyCreator.class)
    public SeataAutoDataSourceProxyCreator seataAutoDataSourceProxyCreator(SeataProperties seataProperties) {
        return newSeataAutoDataSourceProxyCreator(seataProperties.isUseJdkProxy(), seataProperties.getExcludesForAutoProxying(), seataProperties.getDataSourceProxyMode()); }}Copy the code

When seATA is executed, it determines whether to enable the global transaction first, and then goes to the data source of the SEATA agent. Then it parses the SQL and determines whether to generate undo logs according to the TYPE of THE SQL.

The proxy design principle of the data source is the same as that of the global transaction scanner. It also inherits abstractautoxyCreator class. The judgment on whether to generate the proxy class is discussed

    @Override
    protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
        // we only care DataSource bean
        if(! (beaninstanceof DataSource)) {
            return bean;
        }

        // when this bean is just a simple DataSource, not SeataDataSourceProxy
        if(! (beaninstanceof SeataDataSourceProxy)) {
            Object enhancer = super.wrapIfNecessary(bean, beanName, cacheKey);
            // this mean this bean is either excluded by user or had been proxy before
            if (bean == enhancer) {
                return bean;
            }
            // else, build proxy, put 
      
        to holder and return enhancer
      ,>
            DataSource origin = (DataSource) bean;
            SeataDataSourceProxy proxy = buildProxy(origin, dataSourceProxyMode);
            DataSourceProxyHolder.put(origin, proxy);
            return enhancer;
        }

        LOGGER.warn("Manually register SeataDataSourceProxy(or its subclass) bean is discouraged! bean name: {}", beanName);
        SeataDataSourceProxy proxy = (SeataDataSourceProxy) bean;
        DataSource origin = proxy.getTargetDataSource();
        Object originEnhancer = super.wrapIfNecessary(origin, beanName, cacheKey);
        // this mean origin is either excluded by user or had been proxy before
        if (origin == originEnhancer) {
            return origin;
        }
        // else, put <origin, proxy> to holder and return originEnhancer
        DataSourceProxyHolder.put(origin, proxy);
        return originEnhancer;
    }
Copy the code

Look at the SeataDataSourceProxy core initialization method of object, here we can see the most critical one line of code DefaultResourceManager. The get () registerResource (this), RM is played here, In addition, the control of data source is also managed by RM.

The resource manager is a singleton object of a static inner class that is registered by the RM during initialization of the data source

    private void init(DataSource dataSource, String resourceGroupId) {
        this.resourceGroupId = resourceGroupId;
        try (Connection connection = dataSource.getConnection()) {
            jdbcUrl = connection.getMetaData().getURL();
            dbType = JdbcUtils.getDbType(jdbcUrl);
            if(JdbcConstants.ORACLE.equals(dbType)) { userName = connection.getMetaData().getUserName(); }}catch (SQLException e) {
            throw new IllegalStateException("can not init dataSource", e);
        }
        DefaultResourceManager.get().registerResource(this);
        if (ENABLE_TABLE_META_CHECKER_ENABLE) {
            tableMetaExcutor.scheduleAtFixedRate(() -> {
                try (Connection connection = dataSource.getConnection()) {
                    TableMetaCacheFactory.getTableMetaCache(DataSourceProxy.this.getDbType())
                        .refresh(connection, DataSourceProxy.this.getResourceId());
                } catch (Exception ignore) {
                }
            }, 0, TABLE_META_CHECKER_INTERVAL, TimeUnit.MILLISECONDS);
        }

        //Set the default branch type to 'AT' in the RootContext.
        RootContext.setDefaultBranchType(this.getBranchType());
    }
Copy the code

When we need to perform operations on the Dao layer, we need to apply for a connection from the data source agent. At this time, the connection we get is also the Seata proxy connection. When we use this connection to process SQL, the Statement and PreparedStatement we get are proxy objects. Our SQL parsing is done in StatementProxy.

    @Override
    public boolean execute(String sql) throws SQLException {
        this.targetSQL = sql;
        return ExecuteTemplate.execute(this, (statement, args) -> statement.execute((String) args[0]), sql);
    }
Copy the code

Here comes the template method for SQL execution, where SQL is categorized and different SQL is treated differently

public class ExecuteTemplate {

    public static <T, S extends Statement> T execute(StatementProxy statementProxy, StatementCallback
        
          statementCallback, Object... args)
        ,> throws SQLException {
        return execute(null, statementProxy, statementCallback, args);
    }

    public static <T, S extends Statement> T execute(List
       
         sqlRecognizers, StatementProxy
         statementProxy, StatementCallback
         
           statementCallback, Object... args)
         ,>
        throws SQLException {
        if(! RootContext.requireGlobalLock() && BranchType.AT ! = RootContext.getBranchType()) {// Just work as original statement
            return statementCallback.execute(statementProxy.getTargetStatement(), args);
        }

        String dbType = statementProxy.getConnectionProxy().getDbType();
        if (CollectionUtils.isEmpty(sqlRecognizers)) {
            sqlRecognizers = SQLVisitorFactory.get(
                    statementProxy.getTargetSQL(),
                    dbType);
        }
        Executor<T> executor;
        if (CollectionUtils.isEmpty(sqlRecognizers)) {
            executor = new PlainExecutor<>(statementProxy, statementCallback);
        } else {
            if (sqlRecognizers.size() == 1) {
                SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
                switch (sqlRecognizer.getSQLType()) {
                    case INSERT:
                        executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
                                    new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
                                    new Object[]{statementProxy, statementCallback, sqlRecognizer});
                        break;
                    case UPDATE:
                        executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    case DELETE:
                        executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    case SELECT_FOR_UPDATE:
                        executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    case INSERT_ON_DUPLICATE_UPDATE:
                        if (JdbcConstants.MYSQL.equals(dbType)) {
                            executor = new MySQLInsertOrUpdateExecutor(statementProxy,statementCallback,sqlRecognizer);
                        } else {
                            throw new NotSupportYetException(dbType + " not support to INSERT_ON_DUPLICATE_UPDATE");
                        }
                        break;
                    default:
                        executor = new PlainExecutor<>(statementProxy, statementCallback);
                        break; }}else {
                executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
            }
        }
        T rs;
        try {
            rs = executor.execute(args);
        } catch (Throwable ex) {
            if(! (exinstanceof SQLException)) {
                // Turn other exception into SQLException
                ex = new SQLException(ex);
            }
            throw (SQLException) ex;
        }
        returnrs; }}Copy the code

The SPI mechanism is introduced for the inserted actuator, and its configuration file provides a variety of ways to dynamically select according to the type of data source

Let’s take a look at how undo logs are generated during SQL execution. The SQL execution requires turning off the automatic submission of food and calling beforeImage(), afterImage(beforeImage) methods to generate TableRecords objects before and after SQL execution. Finally, the front and back images are added to the context of the connectionProxy through prepareUndoLog().

    @Override
    public T execute(Object... args) throws Throwable {
        String xid = RootContext.getXID();
        if(xid ! =null) {
            statementProxy.getConnectionProxy().bind(xid);
        }

        statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
        return doExecute(args);
    }

    @Override
    public T doExecute(Object... args) throws Throwable {
        AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        if (connectionProxy.getAutoCommit()) {
            return executeAutoCommitTrue(args);
        } else {
            returnexecuteAutoCommitFalse(args); }}// Both front and back images are generated when auto commit is disabled
    protected T executeAutoCommitFalse(Object[] args) throws Exception {
        if(! JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {throw new NotSupportYetException("multi pk only support mysql!");
        }
        TableRecords beforeImage = beforeImage();
        T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
        TableRecords afterImage = afterImage(beforeImage);
        prepareUndoLog(beforeImage, afterImage);
        return result;
    }
Copy the code

After SQL is executed, we should execute the commit of our connection. If it is judged to be a global thing, we need to register it as a branch thing with TC, and then save the undo log to submit the branch thing.

    @Override
    public void commit(a) throws SQLException {
        try {
            lockRetryPolicy.execute(() -> {
                doCommit();
                return null;
            });
        } catch (SQLException e) {
            if(targetConnection ! =null&&! getAutoCommit() && ! getContext().isAutoCommitChanged()) { rollback(); }throw e;
        } catch (Exception e) {
            throw newSQLException(e); }}private void doCommit(a) throws SQLException {
        if (context.inGlobalTransaction()) {
            processGlobalTransactionCommit();
        } else if (context.isGlobalLockRequire()) {
            processLocalCommitWithGlobalLocks();
        } else{ targetConnection.commit(); }}private void processGlobalTransactionCommit(a) throws SQLException {
        try {
            register();
        } catch (TransactionException e) {
            recognizeLockKeyConflictException(e, context.buildLockKeys());
        }
        try {
            UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
            targetConnection.commit();
        } catch (Throwable ex) {
            LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
            report(false);
            throw new SQLException(ex);
        }
        if (IS_REPORT_SUCCESS_ENABLE) {
            report(true);
        }
        context.reset();
    }

    private void register(a) throws TransactionException {
        if(! context.hasUndoLog() || ! context.hasLockKey()) {return;
        }
        Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
            null, context.getXid(), null, context.buildLockKeys());
        context.setBranchId(branchId);
    }
Copy the code

5.3 Submission process

The process to comb

  • Phase 2 commit initiate
  • The TC sends a submission notification to RM
  • RM Asynchronous Submission
    • Group commits by resource so that the same database can operate together, avoiding the performance cost of establishing multiple connections
    • Each group goes or gets a corresponding normal database connection
    • Clear corresponding undo logs

Source code analysis

After the transaction is successfully executed in the transaction template, THE commitTransaction method of the transaction will be called, and TM will initiate the transaction submission application to TC

    private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
        try {
            triggerBeforeCommit();
            tx.commit();
            triggerAfterCommit();
        } catch (TransactionException txe) {
            // 4.1 Failed to commit
            throw newTransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.CommitFailure); }}@Override
    public void commit(a) throws TransactionException {
        if (role == GlobalTransactionRole.Participant) {
            // Participant has no responsibility of committing
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", xid);
            }
            return;
        }
        assertXIDNotNull();
        int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;
        try {
            while (retry > 0) {
                try {
                    status = transactionManager.commit(xid);
                    break;
                } catch (Throwable ex) {
                    LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}".this.getXid(), retry, ex.getMessage());
                    retry--;
                    if (retry == 0) {
                        throw new TransactionException("Failed to report global commit", ex); }}}}finally {
            if(xid.equals(RootContext.getXID())) { suspend(); }}if (LOGGER.isInfoEnabled()) {
            LOGGER.info("[{}] commit status: {}", xid, status); }}Copy the code

The core commit method is doGlobalCommit for DefaultCore, where the TC drives each branch transaction to complete the commit of the transaction

    @Override
    public GlobalStatus commit(String xid) throws TransactionException {
        GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
        if (globalSession == null) {
            return GlobalStatus.Finished;
        }
        globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
        // just lock changeStatus

        boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {
            // Highlight: Firstly, close the session, then no more branch can be registered.
            globalSession.closeAndClean();
            if (globalSession.getStatus() == GlobalStatus.Begin) {
                if (globalSession.canBeCommittedAsync()) {
                    globalSession.asyncCommit();
                    return false;
                } else {
                    globalSession.changeStatus(GlobalStatus.Committing);
                    return true; }}return false;
        });

        if (shouldCommit) {
            boolean success = doGlobalCommit(globalSession, false);
            //If successful and all remaining branches can be committed asynchronously, do async commit.
            if (success && globalSession.hasBranch() && globalSession.canBeCommittedAsync()) {
                globalSession.asyncCommit();
                return GlobalStatus.Committed;
            } else {
                returnglobalSession.getStatus(); }}else {
            returnglobalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus(); }}@Override
    public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
        boolean success = true;
        // Start submitting events
        eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
            globalSession.getTransactionName(), globalSession.getApplicationId(), globalSession.getTransactionServiceGroup(),
            globalSession.getBeginTime(), null, globalSession.getStatus()));

        if (globalSession.isSaga()) {
            success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);
        } else {
            Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {
                // If you do not retry, skip the canBeCommittedAsync branch
                if(! retrying && branchSession.canBeCommittedAsync()) {return CONTINUE;
                }

                BranchStatus currentStatus = branchSession.getStatus();
                if (currentStatus == BranchStatus.PhaseOne_Failed) {
                    globalSession.removeBranch(branchSession);
                    return CONTINUE;
                }
                try {
                    BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);

                    switch (branchStatus) {
                        case PhaseTwo_Committed:
                            globalSession.removeBranch(branchSession);
                            return CONTINUE;
                        case PhaseTwo_CommitFailed_Unretryable:
                            if (globalSession.canBeCommittedAsync()) {
                                LOGGER.error(
                                    "Committing branch transaction[{}], status: PhaseTwo_CommitFailed_Unretryable, please check the business log.", branchSession.getBranchId());
                                return CONTINUE;
                            } else {
                                SessionHelper.endCommitFailed(globalSession);
                                LOGGER.error("Committing global transaction[{}] finally failed, caused by branch transaction[{}] commit failed.", globalSession.getXid(), branchSession.getBranchId());
                                return false;
                            }
                        default:
                            if(! retrying) { globalSession.queueToRetryCommit();return false;
                            }
                            if (globalSession.canBeCommittedAsync()) {
                                LOGGER.error("Committing branch transaction[{}], status:{} and will retry later",
                                    branchSession.getBranchId(), branchStatus);
                                return CONTINUE;
                            } else {
                                LOGGER.error(
                                    "Committing global transaction[{}] failed, caused by branch transaction[{}] commit failed, will retry later.", globalSession.getXid(), branchSession.getBranchId());
                                return false; }}}catch (Exception ex) {
                    StackTraceLogger.error(LOGGER, ex, "Committing branch transaction exception: {}".new String[] {branchSession.toString()});
                    if(! retrying) { globalSession.queueToRetryCommit();throw newTransactionException(ex); }}return CONTINUE;
            });
            // Return if the result is not null
            if(result ! =null) {
                return result;
            }
            //If has branch and not all remaining branches can be committed asynchronously,
            //do print log and return false
            if(globalSession.hasBranch() && ! globalSession.canBeCommittedAsync()) { LOGGER.info("Committing global transaction is NOT done, xid = {}.", globalSession.getXid());
                return false; }}// if it succeeds and there is no branch, retrying=true is the asynchronous state when retrying. EndCommitted is executed to improve concurrency performance, and the global transaction ends..
        if (success && globalSession.getBranchSessions().isEmpty() && retrying) {
            SessionHelper.endCommitted(globalSession);

            // committed event
            eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
                globalSession.getTransactionName(), globalSession.getApplicationId(), globalSession.getTransactionServiceGroup(),
                globalSession.getBeginTime(), System.currentTimeMillis(), globalSession.getStatus()));

            LOGGER.info("Committing global transaction is successfully done, xid = {}.", globalSession.getXid());
        }
        return success;
    }
    @Override
    public BranchStatus branchCommit(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {
        try {
            BranchCommitRequest request = new BranchCommitRequest();
            request.setXid(branchSession.getXid());
            request.setBranchId(branchSession.getBranchId());
            request.setResourceId(branchSession.getResourceId());
            request.setApplicationData(branchSession.getApplicationData());
            request.setBranchType(branchSession.getBranchType());
            return branchCommitSend(request, globalSession, branchSession);
        } catch (IOException | TimeoutException e) {
            throw new BranchTransactionException(FailedToSendBranchCommitRequest,
                    String.format("Send branch commit failed, xid = %s branchId = %s", branchSession.getXid(), branchSession.getBranchId()), e); }}Copy the code

So how does RM perform the commit of things? RM is executed via a callback function, and its core class is AbstractRMHandler, which eventually commits branch things via an asynchronous task.

    protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response)
        throws TransactionException {
        String xid = request.getXid();
        long branchId = request.getBranchId();
        String resourceId = request.getResourceId();
        String applicationData = request.getApplicationData();
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Branch committing: " + xid + "" + branchId + "" + resourceId + "" + applicationData);
        }
        BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId,
            applicationData);
        response.setXid(xid);
        response.setBranchId(branchId);
        response.setBranchStatus(status);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Branch commit result: "+ status); }}@Override
    public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
                                     String applicationData) throws TransactionException {
        return asyncWorker.branchCommit(xid, branchId, resourceId);
    }

    public BranchStatus branchCommit(String xid, long branchId, String resourceId) {
        Phase2Context context = new Phase2Context(xid, branchId, resourceId);
        addToCommitQueue(context);
        return BranchStatus.PhaseTwo_Committed;
    }
	// Commit asynchronously
    private void addToCommitQueue(Phase2Context context) {
        if (commitQueue.offer(context)) {
            return;
        }
        CompletableFuture.runAsync(this::doBranchCommitSafely, scheduledExecutor)
                .thenRun(() -> addToCommitQueue(context));
    }
	// Submit method
    void doBranchCommitSafely(a) {
        try {
            doBranchCommit();
        } catch (Throwable e) {
            LOGGER.error("Exception occur when doing branch commit", e); }}// Submit method
    private void doBranchCommit(a) {
        if (commitQueue.isEmpty()) {
            return;
        }

        // Transfer all currently received contexts to this list
        List<Phase2Context> allContexts = new LinkedList<>();
        commitQueue.drainTo(allContexts);

        // Group contexts by resource ID
        Map<String, List<Phase2Context>> groupedContexts = groupedByResourceId(allContexts);

        groupedContexts.forEach(this::dealWithGroupedContexts);
    }

	// Handle the grouping context
    private void dealWithGroupedContexts(String resourceId, List<Phase2Context> contexts) {
        DataSourceProxy dataSourceProxy = dataSourceManager.get(resourceId);
        if (dataSourceProxy == null) {
            LOGGER.warn("Failed to find resource for {}", resourceId);
            return;
        }

        Connection conn;
        try {
            conn = dataSourceProxy.getPlainConnection();
        } catch (SQLException sqle) {
            LOGGER.error("Failed to get connection for async committing on {}", resourceId, sqle);
            return;
        }

        UndoLogManager undoLogManager = UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType());

        // Split the context into multiple lists, each containing no more than the size limit
        List<List<Phase2Context>> splitByLimit = Lists.partition(contexts, UNDOLOG_DELETE_LIMIT_SIZE);
        / / remove UndoLog
        splitByLimit.forEach(partition -> deleteUndoLog(conn, undoLogManager, partition));
    }
Copy the code

5.4 Roll back the transaction flow

The process to comb

  • Phase 2 rollback initiated
  • The TC sent a rollback notification to RM
  • RM rolls back the things based on the undo logs it stores
    • Get the undo log according to xID and branch transaction ID
    • Check the status of undo logs to avoid repeated execution
    • Based on the undo log, the SQL rollback is parsed and executed. After the execution, the undo status is changed
    • Clear corresponding undo logs
    • Commit a local transaction. In addition, the execution result of the local transaction (that is, the rollback result of the branch transaction) is reported to the TC.

Source code analysis

Rollback is when things abnormal, when executed in exception handling completeTransactionAfterThrowing method, is also controlled by TM, and performed by the TC.

    private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException) throws TransactionalExecutor.ExecutionException {
        //roll back
        if(txInfo ! =null && txInfo.rollbackOn(originalException)) {
            try {
                rollbackTransaction(tx, originalException);
            } catch (TransactionException txe) {
                // Failed to rollback
                throw newTransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.RollbackFailure, originalException); }}else {
            // not roll back on this exception, so commitcommitTransaction(tx); }}private void rollbackTransaction(GlobalTransaction tx, Throwable originalException) throws TransactionException, TransactionalExecutor.ExecutionException {
        triggerBeforeRollback();
        tx.rollback();
        triggerAfterRollback();
        // 3.1 Successfully rolled back
        throw new TransactionalExecutor.ExecutionException(tx, GlobalStatus.RollbackRetrying.equals(tx.getLocalStatus())
            ? TransactionalExecutor.Code.RollbackRetrying : TransactionalExecutor.Code.RollbackDone, originalException);
    }

    @Override
    public void rollback(a) throws TransactionException {
        if (role == GlobalTransactionRole.Participant) {
            // Participant has no responsibility of rollback
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Ignore Rollback(): just involved in global transaction [{}]", xid);
            }
            return;
        }
        assertXIDNotNull();

        int retry = ROLLBACK_RETRY_COUNT <= 0 ? DEFAULT_TM_ROLLBACK_RETRY_COUNT : ROLLBACK_RETRY_COUNT;
        try {
            while (retry > 0) {
                try {
                    status = transactionManager.rollback(xid);
                    break;
                } catch (Throwable ex) {
                    LOGGER.error("Failed to report global rollback [{}],Retry Countdown: {}, reason: {}".this.getXid(), retry, ex.getMessage());
                    retry--;
                    if (retry == 0) {
                        throw new TransactionException("Failed to report global rollback", ex); }}}}finally {
            if(xid.equals(RootContext.getXID())) { suspend(); }}if (LOGGER.isInfoEnabled()) {
            LOGGER.info("[{}] rollback status: {}", xid, status); }}Copy the code

The core rollback method is doGlobalRollback of DefaultCore, where the TC drives each RM to roll back the thing

    @Override
    public GlobalStatus rollback(String xid) throws TransactionException {
        GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
        if (globalSession == null) {
            return GlobalStatus.Finished;
        }
        globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
        // just lock changeStatus
        boolean shouldRollBack = SessionHolder.lockAndExecute(globalSession, () -> {
            globalSession.close(); // Highlight: Firstly, close the session, then no more branch can be registered.
            if (globalSession.getStatus() == GlobalStatus.Begin) {
                globalSession.changeStatus(GlobalStatus.Rollbacking);
                return true;
            }
            return false;
        });
        if(! shouldRollBack) {return globalSession.getStatus();
        }

        doGlobalRollback(globalSession, false);
        return globalSession.getStatus();
    }

	@Override
    public boolean doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {
        boolean success = true;
        // start rollback event
        eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(),
                GlobalTransactionEvent.ROLE_TC, globalSession.getTransactionName(),
                globalSession.getApplicationId(),
                globalSession.getTransactionServiceGroup(), globalSession.getBeginTime(),
                null, globalSession.getStatus()));

        if (globalSession.isSaga()) {
            success = getCore(BranchType.SAGA).doGlobalRollback(globalSession, retrying);
        } else {
            Boolean result = SessionHelper.forEach(globalSession.getReverseSortedBranches(), branchSession -> {
                BranchStatus currentBranchStatus = branchSession.getStatus();
                if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {
                    globalSession.removeBranch(branchSession);
                    return CONTINUE;
                }
                try {
                    BranchStatus branchStatus = branchRollback(globalSession, branchSession);
                    switch (branchStatus) {
                        case PhaseTwo_Rollbacked:
                            globalSession.removeBranch(branchSession);
                            LOGGER.info("Rollback branch transaction successfully, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
                            return CONTINUE;
                        case PhaseTwo_RollbackFailed_Unretryable:
                            SessionHelper.endRollbackFailed(globalSession);
                            LOGGER.info("Rollback branch transaction fail and stop retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
                            return false;
                        default:
                            LOGGER.info("Rollback branch transaction fail and will retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
                            if(! retrying) { globalSession.queueToRetryRollback(); }return false; }}catch (Exception ex) {
                    StackTraceLogger.error(LOGGER, ex,
                        "Rollback branch transaction exception, xid = {} branchId = {} exception = {}".new String[] {globalSession.getXid(), String.valueOf(branchSession.getBranchId()), ex.getMessage()});
                    if(! retrying) { globalSession.queueToRetryRollback(); }throw newTransactionException(ex); }});// Return if the result is not null
            if(result ! =null) {
                return result;
            }

            GlobalSession globalSessionTwice = SessionHolder.findGlobalSession(globalSession.getXid());
            if(globalSessionTwice ! =null && globalSessionTwice.hasBranch()) {
                LOGGER.info("Rollbacking global transaction is NOT done, xid = {}.", globalSession.getXid());
                return false; }}if (success) {
            SessionHelper.endRollbacked(globalSession);

            // rollbacked event
            eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(),
                    GlobalTransactionEvent.ROLE_TC, globalSession.getTransactionName(),
                    globalSession.getApplicationId(),
                    globalSession.getTransactionServiceGroup(),
                    globalSession.getBeginTime(), System.currentTimeMillis(),
                    globalSession.getStatus()));

            LOGGER.info("Rollback global transaction successfully, xid = {}.", globalSession.getXid());
        }
        return success;
    }

    @Override
    public BranchStatus branchRollback(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {
        try {
            BranchRollbackRequest request = new BranchRollbackRequest();
            request.setXid(branchSession.getXid());
            request.setBranchId(branchSession.getBranchId());
            request.setResourceId(branchSession.getResourceId());
            request.setApplicationData(branchSession.getApplicationData());
            request.setBranchType(branchSession.getBranchType());
            return branchRollbackSend(request, globalSession, branchSession);
        } catch (IOException | TimeoutException e) {
            throw new BranchTransactionException(FailedToSendBranchRollbackRequest,
                    String.format("Send branch rollback failed, xid = %s branchId = %s", branchSession.getXid(), branchSession.getBranchId()), e); }}Copy the code

So how does RM perform rollback of things? RM is executed via the callback function, and its core class is AbstractRMHandler

    @Override
    public BranchRollbackResponse handle(BranchRollbackRequest request) {
        BranchRollbackResponse response = new BranchRollbackResponse();
        exceptionHandleTemplate(new AbstractCallback<BranchRollbackRequest, BranchRollbackResponse>() {
            @Override
            public void execute(BranchRollbackRequest request, BranchRollbackResponse response)
                throws TransactionException {
                doBranchRollback(request, response);
            }
        }, request, response);
        return response;
    }

    protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response)
        throws TransactionException {
        String xid = request.getXid();
        long branchId = request.getBranchId();
        String resourceId = request.getResourceId();
        String applicationData = request.getApplicationData();
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Branch Rollbacking: " + xid + "" + branchId + "" + resourceId);
        }
        BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId,
            applicationData);
        response.setXid(xid);
        response.setBranchId(branchId);
        response.setBranchStatus(status);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Branch Rollbacked result: "+ status); }}@Override
    public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
                                       String applicationData) throws TransactionException {
        DataSourceProxy dataSourceProxy = get(resourceId);
        if (dataSourceProxy == null) {
            throw new ShouldNeverHappenException();
        }
        try {
            UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);
        } catch (TransactionException te) {
            StackTraceLogger.info(LOGGER, te,
                "branchRollback failed. branchType:[{}], xid:[{}], branchId:[{}], resourceId:[{}], applicationData:[{}]. reason:[{}]".new Object[]{branchType, xid, branchId, resourceId, applicationData, te.getMessage()});
            if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
                return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
            } else {
                returnBranchStatus.PhaseTwo_RollbackFailed_Retryable; }}return BranchStatus.PhaseTwo_Rollbacked;

    }

    @Override
    public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
        Connection conn = null;
        ResultSet rs = null;
        PreparedStatement selectPST = null;
        boolean originalAutoCommit = true;

        for(; ;) {try {
                conn = dataSourceProxy.getPlainConnection();

                // The entire undo process should run in a local transaction.
                if (originalAutoCommit = conn.getAutoCommit()) {
                    conn.setAutoCommit(false);
                }

                // Find UNDO LOG
                selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
                selectPST.setLong(1, branchId);
                selectPST.setString(2, xid);
                rs = selectPST.executeQuery();

                boolean exists = false;
                while (rs.next()) {
                    exists = true;

                    // The server may repeatedly send rollback requests for rollback
                    // Move the same branch transaction to multiple processes, ensuring that only normal undo_ logs are processed.
                    int state = rs.getInt(ClientTableColumnsName.UNDO_LOG_LOG_STATUS);
                    if(! canUndo(state)) {if (LOGGER.isInfoEnabled()) {
                            LOGGER.info("xid {} branch {}, ignore {} undo_log", xid, branchId, state);
                        }
                        return;
                    }

                    String contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);
                    Map<String, String> context = parseContext(contextString);
                    byte[] rollbackInfo = getRollbackInfo(rs);

                    String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);
                    UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance()
                        : UndoLogParserFactory.getInstance(serializer);
                    BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);

                    try {
                        // put serializer name to local
                        setCurrentSerializer(parser.getName());
                        List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
                        if (sqlUndoLogs.size() > 1) {
                            Collections.reverse(sqlUndoLogs);
                        }
                        for(SQLUndoLog sqlUndoLog : sqlUndoLogs) { TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta( conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId()); sqlUndoLog.setTableMeta(tableMeta); AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor( dataSourceProxy.getDbType(), sqlUndoLog); undoExecutor.executeOn(conn); }}finally {
                        // remove serializer nameremoveCurrentSerializer(); }}If undo_log exists, it indicates that the first phase of the branch transaction has been completed. Otherwise, it indicates that there is an exception in the branch transaction and undo_log cannot be written to the database.
                if (exists) {
                    deleteUndoLog(xid, branchId, conn);
                    conn.commit();
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("xid {} branch {}, undo_log deleted with {}", xid, branchId, State.GlobalFinished.name()); }}else {
                    insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);
                    conn.commit();
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("xid {} branch {}, undo_log added with {}", xid, branchId, State.GlobalFinished.name()); }}return;
            } catch (SQLIntegrityConstraintViolationException e) {
                // Possible undo_log has been inserted into the database by other processes, retrying rollback undo_log
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("xid {} branch {}, undo_log inserted, retry rollback", xid, branchId); }}catch (Throwable e) {
                if(conn ! =null) {
                    try {
                        conn.rollback();
                    } catch (SQLException rollbackEx) {
                        LOGGER.warn("Failed to close JDBC resource while undo ... ", rollbackEx); }}throw new BranchTransactionException(BranchRollbackFailed_Retriable, String
                    .format("Branch session rollback failed and try again later xid = %s branchId = %s %s", xid,
                        branchId, e.getMessage()), e);

            } finally {
                try {
                    if(rs ! =null) {
                        rs.close();
                    }
                    if(selectPST ! =null) {
                        selectPST.close();
                    }
                    if(conn ! =null) {
                        if (originalAutoCommit) {
                            conn.setAutoCommit(true); } conn.close(); }}catch (SQLException closeEx) {
                    LOGGER.warn("Failed to close JDBC resource while undo ... ", closeEx); }}}}Copy the code

6 @GlobalLock’s role

When we don’t want to ordinary business method, affect the normal operation of global things, and can use @ GlobalLock annotations logo, so although this method is not globally things branch of things, but it in access to resources, also needs to query the global lock, if global things in the implementation of this method also needs to wait for, so as not to interfere with the operation of global things.

GlobalTransactionalInterceptor will also mark the global lock annotation intercept method, and the configuration in the properties in the design.

    Object handleGlobalLock(final MethodInvocation methodInvocation,
        final GlobalLock globalLockAnno) throws Throwable {
        return globalLockTemplate.execute(new GlobalLockExecutor() {
            @Override
            public Object execute(a) throws Throwable {
                return methodInvocation.proceed();
            }

            @Override
            public GlobalLockConfig getGlobalLockConfig(a) {
                GlobalLockConfig config = new GlobalLockConfig();
                config.setLockRetryInterval(globalLockAnno.lockRetryInterval());
                config.setLockRetryTimes(globalLockAnno.lockRetryTimes());
                returnconfig; }}); }Copy the code

Global lock annotations also have corresponding template methods that bind global lock tokens before and after execution

public class GlobalLockTemplate {

    public Object execute(GlobalLockExecutor executor) throws Throwable {
        boolean alreadyInGlobalLock = RootContext.requireGlobalLock();
        if(! alreadyInGlobalLock) { RootContext.bindGlobalLockFlag(); }// set my config to config holder so that it can be access in further execution
        // for example, LockRetryController can access it with config holder
        GlobalLockConfig myConfig = executor.getGlobalLockConfig();
        GlobalLockConfig previousConfig = GlobalLockConfigHolder.setAndReturnPrevious(myConfig);

        try {
            return executor.execute();
        } finally {
            // only unbind when this is the root caller.
            // otherwise, the outer caller would lose global lock flag
            if(! alreadyInGlobalLock) { RootContext.unbindGlobalLockFlag(); }// if previous config is not null, we need to set it back
            // so that the outer logic can still use their config
            if(previousConfig ! =null) {
                GlobalLockConfigHolder.setAndReturnPrevious(previousConfig);
            } else{ GlobalLockConfigHolder.remove(); }}}}Copy the code

When the transaction is committed, the binding of the global lock flag is blocked and waits for the lock to be acquired before it can be executed.

    private void doCommit(a) throws SQLException {
        if (context.inGlobalTransaction()) {
            processGlobalTransactionCommit();
        } else if (context.isGlobalLockRequire()) {
            processLocalCommitWithGlobalLocks();
        } else{ targetConnection.commit(); }}private void processLocalCommitWithGlobalLocks(a) throws SQLException {
        checkLock(context.buildLockKeys());
        try {
            targetConnection.commit();
        } catch (Throwable ex) {
            throw new SQLException(ex);
        }
        context.reset();
    }
Copy the code

Thank you for reading, if you feel helpful, please click a thumbs-up, thanks a million!!