Make writing a habit together! This is the fourth day of my participation in the “Gold Digging Day New Plan · April More text Challenge”. Click here for more details.
This article we Seata AT mode for a wave of source analysis, read the article on the best 2PC, 3PC protocol have a certain understanding of the basic implementation of Seata process, know the role of TC, TM, RM.
1 entry GlobalTransactionScanner
We all know that in Seata’s AT pattern, we only need to annotate the @globaltransaction annotation for business methods to achieve GlobalTransaction control across services, so how is this annotation captured and effective?
To explore Seata’s source code, we can start with Seata-spring-boot-starter to see what beans it injects into the Spring container. These beans are the key to Seata’s operation. SeataAutoConfiguration, seata’s core auto-assembly class, injects the GlobalTransactionScanner object, which, as its name suggests, scans global objects.
@Bean
@DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
@ConditionalOnMissingBean(GlobalTransactionScanner.class)
public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler,
ConfigurableListableBeanFactory beanFactory,
@Autowired(required = false) List<ScannerChecker> scannerCheckers) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Automatically configure Seata");
}
// Set up the Bean factory
GlobalTransactionScanner.setBeanFactory(beanFactory);
/ / add '/ meta-inf/services/IO. Seata. Spring. The annotation. ScannerChecker' scan class
GlobalTransactionScanner.addScannerCheckers(EnhancedServiceLoader.loadAll(ScannerChecker.class));
// Add the scan class in Spring
GlobalTransactionScanner.addScannerCheckers(scannerCheckers);
// Add the configuration for scanning packets
GlobalTransactionScanner.addScannablePackages(seataProperties.getScanPackages());
// Add exclude configuration
GlobalTransactionScanner.addScannerExcludeBeanNames(seataProperties.getExcludesForScanning());
// Create a global transaction scanner
return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
}
Copy the code
So what does the GlobalTransactionScanner object do? First, it inherits its AbstractautoXyCreator and is an enhancement of the wrapIfNecessary method to determine if a class needs a dynamic proxy. This is the key for the scanner to determine whether it is a DYNAMIC proxy for TCC, verify global transaction annotations, and build an interceptor to dynamically proxy when the conditions are met. At the same time, you have implemented the three Spring interfaces InitializingBean, ApplicationContextAware, DisposableBean, so that you can bind this class to the Spring container lifecycle, The afterPropertiesSet method is rewritten to indicate that the resource is loaded and executed before the Bean is initialized. The core of this method is initClient. We can see that client TM and RM are initialized and registered with a Spring ShutdownHook. It overrides the destroy method, which frees resources; Implement ApplicationContextAware to get the context information of the Spring container.
public class GlobalTransactionScanner extends AbstractAutoProxyCreator
implements ConfigurationChangeListener.InitializingBean.ApplicationContextAware.DisposableBean {...@Override
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
// do checkers
if(! doCheckers(bean, beanName)) {return bean;
}
try {
synchronized (PROXYED_SET) {
if (PROXYED_SET.contains(beanName)) {
return bean;
}
interceptor = null;
// Determine if it is a TCC proxy
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)interceptor);
} else{ Class<? > serviceInterface = SpringProxyUtils.findTargetClass(bean); Class<? >[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);// Determine if the @globalTransaction annotation exists
if(! existsAnnotation(newClass[]{serviceInterface}) && ! existsAnnotation(interfacesIfJdk)) {return bean;
}
if (globalTransactionalInterceptor == null) {
globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
ConfigurationCache.addConfigListener(
ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)globalTransactionalInterceptor);
}
interceptor = globalTransactionalInterceptor;
}
LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
if(! AopUtils.isAopProxy(bean)) { bean =super.wrapIfNecessary(bean, beanName, cacheKey);
} else {
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null.null.null));
int pos;
for (Advisor avr : advisor) {
// Find the position based on the advisor's order, and add to advisors by pos
pos = findAddSeataAdvisorPosition(advised, avr);
advised.addAdvisor(pos, avr);
}
}
PROXYED_SET.add(beanName);
returnbean; }}catch (Exception exx) {
throw newRuntimeException(exx); }}@Override
public void destroy(a) {
ShutdownHook.getInstance().destroyAll();
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
this.setBeanFactory(applicationContext);
}
@Override
public void afterPropertiesSet(a) {
if (disableGlobalTransaction) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global transaction is disabled.");
}
ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)this);
return;
}
if (initialized.compareAndSet(false.true)) { initClient(); }}private void initClient(a) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Initializing Global Transaction Clients ... ");
}
if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
}
//init TM
TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
}
//init RM
RMClient.init(applicationId, txServiceGroup);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global Transaction Clients are initialized. "); } registerSpringShutdownHook(); }... }Copy the code
2 Initialize TM
Tm-transaction manager (defines the scope of a global transaction: start a global transaction, commit or roll back a global transaction), TM initialization methods are included in the SEATA-TM and Seata-core packages, which primarily initialize RPC clients built by Netty.
public class TMClient {
public static void init(String applicationId, String transactionServiceGroup) {
init(applicationId, transactionServiceGroup, null.null);
}
public static void init(String applicationId, String transactionServiceGroup, String accessKey, String secretKey) { TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup, accessKey, secretKey); tmNettyRemotingClient.init(); }}@Override
public void init(a) {
// registry processor
registerProcessor();
if (initialized.compareAndSet(false.true)) {
super.init(); }}@Override
public void init(a) {
// Initialize the timer scheduler
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run(a) {
clientChannelManager.reconnect(getTransactionServiceGroup());
}
}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
// Set the merge sending thread pool to merge messages sent from the same server and reduce unnecessary communication
if (NettyClientConfig.isEnableClientBatchSendRequest()) {
mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable());
}
super.init();
clientBootstrap.start();
}
// Obtain serverAddress according to transactionServiceGroup
void reconnect(String transactionServiceGroup) {
List<String> availList = null;
try {
availList = getAvailServerList(transactionServiceGroup);
} catch (Exception e) {
LOGGER.error("Failed to get available servers: {}", e.getMessage(), e);
return;
}
if (CollectionUtils.isEmpty(availList)) {
RegistryService registryService = RegistryFactory.getInstance();
String clusterName = registryService.getServiceGroup(transactionServiceGroup);
if (StringUtils.isBlank(clusterName)) {
LOGGER.error("can not get cluster name in registry config '{}{}', please make sure registry config correct",
ConfigurationKeys.SERVICE_GROUP_MAPPING_PREFIX,
transactionServiceGroup);
return;
}
if(! (registryServiceinstanceof FileRegistryServiceImpl)) {
LOGGER.error("no available service found in cluster '{}', please make sure registry config correct and keep your seata server running", clusterName);
}
return;
}
for (String serverAddress : availList) {
try {
acquireChannel(serverAddress);
} catch (Exception e) {
LOGGER.error("{} can not connect to {} cause:{}",FrameworkErrorCode.NetConnect.getErrCode(), serverAddress, e.getMessage(), e); }}}// TM client start method
@Override
public void start(a) {
if (this.defaultEventExecutorGroup == null) {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(),
new NamedThreadFactory(getThreadPrefix(nettyClientConfig.getClientWorkerThreadPrefix()),
nettyClientConfig.getClientWorkerThreads()));
}
this.bootstrap.group(this.eventLoopGroupWorker).channel(
nettyClientConfig.getClientChannelClazz()).option(
ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true).option(
ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()).option(
ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()).option(ChannelOption.SO_RCVBUF,
nettyClientConfig.getClientSocketRcvBufSize());
if (nettyClientConfig.enableNative()) {
if (PlatformDependent.isOsx()) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("client run on macOS"); }}else {
bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED)
.option(EpollChannelOption.TCP_QUICKACK, true);
}
}
bootstrap.handler(
new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(
new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),
nettyClientConfig.getChannelMaxWriteIdleSeconds(),
nettyClientConfig.getChannelMaxAllIdleSeconds()))
.addLast(new ProtocolV1Decoder())
.addLast(new ProtocolV1Encoder());
if(channelHandlers ! =null) { addChannelPipelineLast(ch, channelHandlers); }}});if (initialized.compareAndSet(false.true) && LOGGER.isInfoEnabled()) {
LOGGER.info("NettyClientBootstrap has started"); }}Copy the code
3 Initialize RM
RM – resource manager (manages the resources of branch transaction processing, talks with TC to register branch transaction and report the status of branch transaction, and drives the commit or rollback of branch transaction), RM initialization method is also in seata-RM and SeATa-core package, and its core initialization method is shared with TM. By AbstractNettyRemotingClient init method.
public class RMClient {
/**
* Init.
*
* @param applicationId the application id
* @param transactionServiceGroup the transaction service group
*/
public static void init(String applicationId, String transactionServiceGroup) { RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup); rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get()); rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get()); rmNettyRemotingClient.init(); }}@Override
public void init(a) {
// registry processor
registerProcessor();
if (initialized.compareAndSet(false.true)) {
super.init();
// Found one or more resources that were registered before initialization
if(resourceManager ! =null&&! resourceManager.getManagedResources().isEmpty() && StringUtils.isNotBlank(transactionServiceGroup)) { getClientChannelManager().reconnect(transactionServiceGroup); }}}Copy the code
4 Initialize the TC
Tc-transaction coordinator (maintains the state of global and branch transactions, and drives the commit or rollback of global transactions). Tc-transaction coordinator exists in the form of middleware, which is an independent Java service under the seATa-Server package, unlike TM and RM which are parasitic in business services in the form of SDK.
TC is a SpringBoot project that is started by the main method marked @SpringBootApplication.
@SpringBootApplication
public class ServerApplication {
public static void main(String[] args) {`intport = PortHelper.getPort(args); System.setProperty(ConfigurationKeys.SERVER_RPC_PORT, Integer.toString(port)); SpringApplication.run(ServerApplication.class, args); }}Copy the code
The TC starts the Netty based RPC server by implementing CommandLineRunner’s Run method, which is automatically executed after the project is started.
@Component
public class ServerRunner implements CommandLineRunner {
private static final Logger LOGGER = LoggerFactory.getLogger(ServerRunner.class);
private boolean started = Boolean.FALSE;
@Override
public void run(String... args) {
try {
long start = System.currentTimeMillis();
Server.start(args);
started = true;
long cost = System.currentTimeMillis() - start;
LOGGER.info("seata server started in {} millSeconds", cost);
} catch (Throwable e) {
started = Boolean.FALSE;
LOGGER.error("seata server start error: {} ", e.getMessage(), e);
System.exit(-1); }}public boolean started(a) {
returnstarted; }}public class Server {
/**
* The entry point of application.
*
* @param args the input arguments
*/
public static void start(String[] args) {
// create logger
final Logger logger = LoggerFactory.getLogger(Server.class);
if (ContainerHelper.isRunningInContainer()) {
logger.info("The server is running in container.");
}
// Initialize the parameter parser, which should always be the first line to execute. Because, here we need to parse the parameters required for startup.
ParameterParser parameterParser = new ParameterParser(args);
// The indicator manager is initialized
MetricsManager.get().init();
System.setProperty(ConfigurationKeys.STORE_MODE, parameterParser.getStoreMode());
ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),
NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);
//server port
nettyRemotingServer.setListenPort(parameterParser.getPort());
UUIDGenerator.init(parameterParser.getServerNode());
//log store mode : file, db, redis
SessionHolder.init(parameterParser.getSessionStoreMode());
LockerManagerFactory.init(parameterParser.getLockStoreMode());
DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);
coordinator.init();
nettyRemotingServer.setHandler(coordinator);
// register ShutdownHook
ShutdownHook.getInstance().addDisposable(coordinator);
ShutdownHook.getInstance().addDisposable(nettyRemotingServer);
//127.0.0.1 and 0.0.0.0 are not valid here
if (NetUtil.isValidIp(parameterParser.getHost(), false)) {
XID.setIpAddress(parameterParser.getHost());
} else{ XID.setIpAddress(NetUtil.getLocalIp()); } XID.setPort(nettyRemotingServer.getListenPort()); nettyRemotingServer.init(); }}Copy the code
5 Global transaction execution process
The flow of global things is shown in the figure below, and I will analyze the source code of each core process
5.1 TM initiates global transactions
The process to comb
- The method that scans the @GlobalTransaction annotation mark produces the proxy object
- Methods that execute the @GlobalTransaction annotation tag trigger the TM transaction template method
- TM will send a global transaction request to TC and may get an Xid
- Global transaction open, bind Xid
Source code analysis
First when mark @ GlobalTransaction annotation method is scanning, then will be globalTransactionalInterceptor interceptors to intercept, create a proxy object. The @GlobalTransaction annotated method is treated specially in the proxy object. One key method is the handleGlobalTransaction method, which calls the transaction template method in SEATA-TM to implement one-stage transaction initiation.
public class GlobalTransactionalInterceptor implements ConfigurationChangeListener.MethodInterceptor.SeataInterceptor {
private static final Logger LOGGER = LoggerFactory.getLogger(GlobalTransactionalInterceptor.class);
private static final FailureHandler DEFAULT_FAIL_HANDLER = new DefaultFailureHandlerImpl();
private final TransactionalTemplate transactionalTemplate = new TransactionalTemplate();
private final GlobalLockTemplate globalLockTemplate = new GlobalLockTemplate();
private final FailureHandler failureHandler;
private volatile boolean disable;
private int order;
private static int degradeCheckPeriod;
private static volatile boolean degradeCheck;
private static int degradeCheckAllowTimes;
private static volatile Integer degradeNum = 0;
private static volatile Integer reachNum = 0;
private static final EventBus EVENT_BUS = new GuavaEventBus("degradeCheckEventBus".true);
private static ScheduledThreadPoolExecutor executor =
new ScheduledThreadPoolExecutor(1.new NamedThreadFactory("degradeCheckWorker".1.true));
private static int defaultGlobalTransactionTimeout = 0;
private void initDefaultGlobalTransactionTimeout(a) {
if (GlobalTransactionalInterceptor.defaultGlobalTransactionTimeout <= 0) {
int defaultGlobalTransactionTimeout;
try {
defaultGlobalTransactionTimeout = ConfigurationFactory.getInstance().getInt(
ConfigurationKeys.DEFAULT_GLOBAL_TRANSACTION_TIMEOUT, DEFAULT_GLOBAL_TRANSACTION_TIMEOUT);
} catch (Exception e) {
LOGGER.error("Illegal global transaction timeout value: " + e.getMessage());
defaultGlobalTransactionTimeout = DEFAULT_GLOBAL_TRANSACTION_TIMEOUT;
}
if (defaultGlobalTransactionTimeout <= 0) {
LOGGER.warn("Global transaction timeout value '{}' is illegal, and has been reset to the default value '{}'", defaultGlobalTransactionTimeout, DEFAULT_GLOBAL_TRANSACTION_TIMEOUT); defaultGlobalTransactionTimeout = DEFAULT_GLOBAL_TRANSACTION_TIMEOUT; } GlobalTransactionalInterceptor.defaultGlobalTransactionTimeout = defaultGlobalTransactionTimeout; }}/** * instantiates a new global transaction interceptor
public GlobalTransactionalInterceptor(FailureHandler failureHandler) {
this.failureHandler = failureHandler == null ? DEFAULT_FAIL_HANDLER : failureHandler;
this.disable = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
DEFAULT_DISABLE_GLOBAL_TRANSACTION);
this.order = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.TM_INTERCEPTOR_ORDER, TM_INTERCEPTOR_ORDER);
degradeCheck = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.CLIENT_DEGRADE_CHECK,
DEFAULT_TM_DEGRADE_CHECK);
if (degradeCheck) {
ConfigurationCache.addConfigListener(ConfigurationKeys.CLIENT_DEGRADE_CHECK, this);
degradeCheckPeriod = ConfigurationFactory.getInstance().getInt(
ConfigurationKeys.CLIENT_DEGRADE_CHECK_PERIOD, DEFAULT_TM_DEGRADE_CHECK_PERIOD);
degradeCheckAllowTimes = ConfigurationFactory.getInstance().getInt(
ConfigurationKeys.CLIENT_DEGRADE_CHECK_ALLOW_TIMES, DEFAULT_TM_DEGRADE_CHECK_ALLOW_TIMES);
EVENT_BUS.register(this);
if (degradeCheckPeriod > 0 && degradeCheckAllowTimes > 0) { startDegradeCheck(); }}this.initDefaultGlobalTransactionTimeout();
}
// method interception
@Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable { Class<? > targetClass = methodInvocation.getThis() ! =null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
if(specificMethod ! =null && !specificMethod.getDeclaringClass().equals(Object.class)) {
final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
final GlobalTransactional globalTransactionalAnnotation =
getAnnotation(method, targetClass, GlobalTransactional.class);
final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
if(! localDisable) {// Determine the global transaction annotation, global lock annotation, execute the corresponding handler
if(globalTransactionalAnnotation ! =null) {
return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
} else if(globalLockAnnotation ! =null) {
returnhandleGlobalLock(methodInvocation, globalLockAnnotation); }}}return methodInvocation.proceed();
}
/** * Global lock handler */
Object handleGlobalLock(final MethodInvocation methodInvocation,
final GlobalLock globalLockAnno) throws Throwable {
return globalLockTemplate.execute(new GlobalLockExecutor() {
@Override
public Object execute(a) throws Throwable {
return methodInvocation.proceed();
}
@Override
public GlobalLockConfig getGlobalLockConfig(a) {
GlobalLockConfig config = new GlobalLockConfig();
config.setLockRetryInterval(globalLockAnno.lockRetryInterval());
config.setLockRetryTimes(globalLockAnno.lockRetryTimes());
returnconfig; }}); }/** * The handler of the global transaction (one-stage commit) */
Object handleGlobalTransaction(final MethodInvocation methodInvocation,
final GlobalTransactional globalTrxAnno) throws Throwable {
boolean succeed = true;
try {
// The template method for things
return transactionalTemplate.execute(new TransactionalExecutor() {
@Override
public Object execute(a) throws Throwable {
return methodInvocation.proceed();
}
public String name(a) {
String name = globalTrxAnno.name();
if(! StringUtils.isNullOrEmpty(name)) {return name;
}
return formatMethod(methodInvocation.getMethod());
}
@Override
public TransactionInfo getTransactionInfo(a) {
// Set the timeout period
int timeout = globalTrxAnno.timeoutMills();
if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {
timeout = defaultGlobalTransactionTimeout;
}
// Build the thing object
TransactionInfo transactionInfo = new TransactionInfo();
transactionInfo.setTimeOut(timeout);
transactionInfo.setName(name());
transactionInfo.setPropagation(globalTrxAnno.propagation());
transactionInfo.setLockRetryInterval(globalTrxAnno.lockRetryInterval());
transactionInfo.setLockRetryTimes(globalTrxAnno.lockRetryTimes());
Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
for(Class<? > rbRule : globalTrxAnno.rollbackFor()) { rollbackRules.add(new RollbackRule(rbRule));
}
for (String rbRule : globalTrxAnno.rollbackForClassName()) {
rollbackRules.add(new RollbackRule(rbRule));
}
for(Class<? > rbRule : globalTrxAnno.noRollbackFor()) { rollbackRules.add(new NoRollbackRule(rbRule));
}
for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
rollbackRules.add(new NoRollbackRule(rbRule));
}
transactionInfo.setRollbackRules(rollbackRules);
returntransactionInfo; }}); }catch (TransactionalExecutor.ExecutionException e) {
// Initiate an exception and roll back the object
TransactionalExecutor.Code code = e.getCode();
switch (code) {
case RollbackDone:
throw e.getOriginalException();
case BeginFailure:
succeed = false;
failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case CommitFailure:
succeed = false;
failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case RollbackFailure:
failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException());
throw e.getOriginalException();
case RollbackRetrying:
failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());
throw e.getOriginalException();
default:
throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code)); }}finally {
if (degradeCheck) {
EVENT_BUS.post(newDegradeCheckEvent(succeed)); }}}/** * get comments */
public <T extends Annotation> T getAnnotation(Method method, Class
targetClass, Class
annotationClass)
{
returnOptional.ofNullable(method).map(m -> m.getAnnotation(annotationClass)) .orElse(Optional.ofNullable(targetClass).map(t -> t.getAnnotation(annotationClass)).orElse(null));
}
private String formatMethod(Method method) {
StringBuilder sb = new StringBuilder(method.getName()).append("("); Class<? >[] params = method.getParameterTypes();int in = 0;
for(Class<? > clazz : params) { sb.append(clazz.getName());if (++in < params.length) {
sb.append(","); }}return sb.append(")").toString();
}
@Override
public void onChangeEvent(ConfigurationChangeEvent event) {
if (ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION.equals(event.getDataId())) {
LOGGER.info("{} config changed, old value:{}, new value:{}", ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
disable, event.getNewValue());
disable = Boolean.parseBoolean(event.getNewValue().trim());
} else if (ConfigurationKeys.CLIENT_DEGRADE_CHECK.equals(event.getDataId())) {
degradeCheck = Boolean.parseBoolean(event.getNewValue());
if(! degradeCheck) { degradeNum =0; }}}/** * Automatic upgrade service detection */
private static void startDegradeCheck(a) {
executor.scheduleAtFixedRate(() -> {
if (degradeCheck) {
try {
String xid = TransactionManagerHolder.get().begin(null.null."degradeCheck".60000);
TransactionManagerHolder.get().commit(xid);
EVENT_BUS.post(new DegradeCheckEvent(true));
} catch (Exception e) {
EVENT_BUS.post(new DegradeCheckEvent(false));
}
}
}, degradeCheckPeriod, degradeCheckPeriod, TimeUnit.MILLISECONDS);
}
@Subscribe
public static void onDegradeCheck(DegradeCheckEvent event) {
if (event.isRequestSuccess()) {
if (degradeNum >= degradeCheckAllowTimes) {
reachNum++;
if (reachNum >= degradeCheckAllowTimes) {
reachNum = 0;
degradeNum = 0;
if (LOGGER.isInfoEnabled()) {
LOGGER.info("the current global transaction has been restored"); }}}else if(degradeNum ! =0) {
degradeNum = 0; }}else {
if (degradeNum < degradeCheckAllowTimes) {
degradeNum++;
if (degradeNum >= degradeCheckAllowTimes) {
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("the current global transaction has been automatically downgraded"); }}}else if(reachNum ! =0) {
reachNum = 0; }}}@Override
public int getOrder(a) {
return order;
}
@Override
public void setOrder(int order) {
this.order = order;
}
@Override
public SeataInterceptorPosition getPosition(a) {
returnSeataInterceptorPosition.BeforeTransaction; }}Copy the code
In TM, the template method mode is used here to initiate one-stage submission, and the core class is as follows
This class contains core business processes such as pre-processing of things, execution of things, abnormal rollback of things, and submission of things. We also perform subsequent operations of global things through the entrance here.
public class TransactionalTemplate {
private static final Logger LOGGER = LoggerFactory.getLogger(TransactionalTemplate.class);
public Object execute(TransactionalExecutor business) throws Throwable {
// 1. Obtain transaction information, implemented in proxy class
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
/ / 1.1 for current things, if not empty, tx is' GlobalTransactionRole. The Participant role.
GlobalTransaction tx = GlobalTransactionContext.getCurrent();
// 1.2 Handle the propagation of things.
Propagation propagation = txInfo.getPropagation();
SuspendedResourcesHolder suspendedResourcesHolder = null;
try {
switch (propagation) {
case NOT_SUPPORTED:
// If transaction is existing, suspend it.
if (existingTransaction(tx)) {
suspendedResourcesHolder = tx.suspend();
}
// Execute without transaction and return.
return business.execute();
case REQUIRES_NEW:
// If transaction is existing, suspend it, and then begin new transaction.
if (existingTransaction(tx)) {
suspendedResourcesHolder = tx.suspend();
tx = GlobalTransactionContext.createNew();
}
// Continue and execute with new transaction
break;
case SUPPORTS:
// If transaction is not existing, execute without transaction.
if (notExistingTransaction(tx)) {
return business.execute();
}
// Continue and execute with new transaction
break;
case REQUIRED:
// If current transaction is existing, execute with current transaction,
// else continue and execute with new transaction.
break;
case NEVER:
// If transaction is existing, throw exception.
if (existingTransaction(tx)) {
throw new TransactionException(
String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
, tx.getXid()));
} else {
// Execute without transaction and return.
return business.execute();
}
case MANDATORY:
// If transaction is not existing, throw exception.
if (notExistingTransaction(tx)) {
throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
}
// Continue and execute with current transaction.
break;
default:
throw new TransactionException("Not Supported Propagation:" + propagation);
}
/ / 1.3 If null, the create new transaction with roles' GlobalTransactionRole. The Launcher '.
if (tx == null) {
tx = GlobalTransactionContext.createNew();
}
// set current tx config to holder
GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
try {
// 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
// else do nothing. Of course, the hooks will still be triggered.
beginTransaction(txInfo, tx);
Object rs;
try {
// Do Your Business
rs = business.execute();
} catch (Throwable ex) {
// 3. The needed business exception to rollback.
completeTransactionAfterThrowing(txInfo, tx, ex);
throw ex;
}
// 4. everything is fine, commit.
commitTransaction(tx);
return rs;
} finally {
//5. clearresumeGlobalLockConfig(previousConfig); triggerAfterCompletion(); cleanUp(); }}finally {
// If the transaction is suspended, resume it.
if(suspendedResourcesHolder ! =null) { tx.resume(suspendedResourcesHolder); }}}private boolean existingTransaction(GlobalTransaction tx) {
returntx ! =null;
}
private boolean notExistingTransaction(GlobalTransaction tx) {
return tx == null;
}
private GlobalLockConfig replaceGlobalLockConfig(TransactionInfo info) {
GlobalLockConfig myConfig = new GlobalLockConfig();
myConfig.setLockRetryInterval(info.getLockRetryInterval());
myConfig.setLockRetryTimes(info.getLockRetryTimes());
return GlobalLockConfigHolder.setAndReturnPrevious(myConfig);
}
private void resumeGlobalLockConfig(GlobalLockConfig config) {
if(config ! =null) {
GlobalLockConfigHolder.setAndReturnPrevious(config);
} else{ GlobalLockConfigHolder.remove(); }}private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException) throws TransactionalExecutor.ExecutionException {
//roll back
if(txInfo ! =null && txInfo.rollbackOn(originalException)) {
try {
rollbackTransaction(tx, originalException);
} catch (TransactionException txe) {
// Failed to rollback
throw newTransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.RollbackFailure, originalException); }}else {
// not roll back on this exception, so commitcommitTransaction(tx); }}private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
try {
triggerBeforeCommit();
tx.commit();
triggerAfterCommit();
} catch (TransactionException txe) {
// 4.1 Failed to commit
throw newTransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.CommitFailure); }}private void rollbackTransaction(GlobalTransaction tx, Throwable originalException) throws TransactionException, TransactionalExecutor.ExecutionException {
triggerBeforeRollback();
tx.rollback();
triggerAfterRollback();
// 3.1 Successfully rolled back
throw new TransactionalExecutor.ExecutionException(tx, GlobalStatus.RollbackRetrying.equals(tx.getLocalStatus())
? TransactionalExecutor.Code.RollbackRetrying : TransactionalExecutor.Code.RollbackDone, originalException);
}
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
try {
triggerBeforeBegin();
tx.begin(txInfo.getTimeOut(), txInfo.getName());
triggerAfterBegin();
} catch (TransactionException txe) {
throw newTransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.BeginFailure); }}private void triggerBeforeBegin(a) {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.beforeBegin();
} catch (Exception e) {
LOGGER.error("Failed execute beforeBegin in hook {}", e.getMessage(), e); }}}private void triggerAfterBegin(a) {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.afterBegin();
} catch (Exception e) {
LOGGER.error("Failed execute afterBegin in hook {}", e.getMessage(), e); }}}private void triggerBeforeRollback(a) {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.beforeRollback();
} catch (Exception e) {
LOGGER.error("Failed execute beforeRollback in hook {}", e.getMessage(), e); }}}private void triggerAfterRollback(a) {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.afterRollback();
} catch (Exception e) {
LOGGER.error("Failed execute afterRollback in hook {}", e.getMessage(), e); }}}private void triggerBeforeCommit(a) {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.beforeCommit();
} catch (Exception e) {
LOGGER.error("Failed execute beforeCommit in hook {}", e.getMessage(), e); }}}private void triggerAfterCommit(a) {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.afterCommit();
} catch (Exception e) {
LOGGER.error("Failed execute afterCommit in hook {}", e.getMessage(), e); }}}private void triggerAfterCompletion(a) {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.afterCompletion();
} catch (Exception e) {
LOGGER.error("Failed execute afterCompletion in hook {}", e.getMessage(), e); }}}private void cleanUp(a) {
TransactionHookManager.clear();
}
private List<TransactionHook> getCurrentHooks(a) {
returnTransactionHookManager.getHooks(); }}Copy the code
The core method is tx.begin(txinfo.getTimeout (), txinfo.getName ()). In the core method, the hook function before and after processing is added respectively. In the core method, TM initiates the global transaction application to TC. TC returns xID to TM, and TM binds xID
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
try {
triggerBeforeBegin();
tx.begin(txInfo.getTimeOut(), txInfo.getName());
triggerAfterBegin();
} catch (TransactionException txe) {
throw newTransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.BeginFailure); }}// The default implementation
@Override
public void begin(int timeout, String name) throws TransactionException {
if(role ! = GlobalTransactionRole.Launcher) { assertXIDNotNull();if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
}
return;
}
assertXIDNull();
String currentXid = RootContext.getXID();
if(currentXid ! =null) {
throw new IllegalStateException("Global transaction already exists," +
" can't begin a new global transaction, currentXid = " + currentXid);
}
// Open it through the global transaction manager
xid = transactionManager.begin(null.null, name, timeout);
status = GlobalStatus.Begin;
RootContext.bind(xid);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Begin new global transaction [{}]", xid); }}Copy the code
5.2 RM executes branch things
The process to comb
- Data source proxy object injection
- Parse the SQL to get the type (UPDATE), table name, condition (where name = ‘TXC’) of the SQL
- According to the condition information obtained by parsing, generate query statements, locate data, or get the front mirror
- Execute the SQL in the corresponding business code.
- According to the result of the front mirror, the data is located by the primary key and the rear mirror is obtained.
- Insert a rollback log record into the UNDO_LOG table composed of front and back mirror data and business SQL related information
- Register branch transactions with TC before submission
- Updates to the business data are committed along with the UNDO LOG generated in the previous steps
- The local transaction submission result is reported to the TC
Source code analysis
The execution of the transaction is also the execution of the business method. When the business method is executed, the undo log will be generated. How is the undo log generated?
Here we also have to go back to seata-spring-boot-starter, where we see that the starter has injected the object of the data source agent, which is the RM pair of SeATA in the data source agent
DataSource, Connection, Statement, PreparedStatement, etc. have proxy enhancements to generate undo logs
@ConditionalOnBean(DataSource.class)
@ConditionalOnExpression("${seata.enabled:true} && ${seata.enableAutoDataSourceProxy:true} && ${seata.enable-auto-data-source-proxy:true}")
@AutoConfigureAfter({SeataCorePropertiesAutoConfiguration.class, SeataClientPropertiesAutoConfiguration.class})
public class SeataDataSourceAutoConfiguration {
@Bean(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR)
@ConditionalOnMissingBean(SeataAutoDataSourceProxyCreator.class)
public SeataAutoDataSourceProxyCreator seataAutoDataSourceProxyCreator(SeataProperties seataProperties) {
return newSeataAutoDataSourceProxyCreator(seataProperties.isUseJdkProxy(), seataProperties.getExcludesForAutoProxying(), seataProperties.getDataSourceProxyMode()); }}Copy the code
When seATA is executed, it determines whether to enable the global transaction first, and then goes to the data source of the SEATA agent. Then it parses the SQL and determines whether to generate undo logs according to the TYPE of THE SQL.
The proxy design principle of the data source is the same as that of the global transaction scanner. It also inherits abstractautoxyCreator class. The judgment on whether to generate the proxy class is discussed
@Override
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
// we only care DataSource bean
if(! (beaninstanceof DataSource)) {
return bean;
}
// when this bean is just a simple DataSource, not SeataDataSourceProxy
if(! (beaninstanceof SeataDataSourceProxy)) {
Object enhancer = super.wrapIfNecessary(bean, beanName, cacheKey);
// this mean this bean is either excluded by user or had been proxy before
if (bean == enhancer) {
return bean;
}
// else, build proxy, put
to holder and return enhancer
,>
DataSource origin = (DataSource) bean;
SeataDataSourceProxy proxy = buildProxy(origin, dataSourceProxyMode);
DataSourceProxyHolder.put(origin, proxy);
return enhancer;
}
LOGGER.warn("Manually register SeataDataSourceProxy(or its subclass) bean is discouraged! bean name: {}", beanName);
SeataDataSourceProxy proxy = (SeataDataSourceProxy) bean;
DataSource origin = proxy.getTargetDataSource();
Object originEnhancer = super.wrapIfNecessary(origin, beanName, cacheKey);
// this mean origin is either excluded by user or had been proxy before
if (origin == originEnhancer) {
return origin;
}
// else, put <origin, proxy> to holder and return originEnhancer
DataSourceProxyHolder.put(origin, proxy);
return originEnhancer;
}
Copy the code
Look at the SeataDataSourceProxy core initialization method of object, here we can see the most critical one line of code DefaultResourceManager. The get () registerResource (this), RM is played here, In addition, the control of data source is also managed by RM.
The resource manager is a singleton object of a static inner class that is registered by the RM during initialization of the data source
private void init(DataSource dataSource, String resourceGroupId) {
this.resourceGroupId = resourceGroupId;
try (Connection connection = dataSource.getConnection()) {
jdbcUrl = connection.getMetaData().getURL();
dbType = JdbcUtils.getDbType(jdbcUrl);
if(JdbcConstants.ORACLE.equals(dbType)) { userName = connection.getMetaData().getUserName(); }}catch (SQLException e) {
throw new IllegalStateException("can not init dataSource", e);
}
DefaultResourceManager.get().registerResource(this);
if (ENABLE_TABLE_META_CHECKER_ENABLE) {
tableMetaExcutor.scheduleAtFixedRate(() -> {
try (Connection connection = dataSource.getConnection()) {
TableMetaCacheFactory.getTableMetaCache(DataSourceProxy.this.getDbType())
.refresh(connection, DataSourceProxy.this.getResourceId());
} catch (Exception ignore) {
}
}, 0, TABLE_META_CHECKER_INTERVAL, TimeUnit.MILLISECONDS);
}
//Set the default branch type to 'AT' in the RootContext.
RootContext.setDefaultBranchType(this.getBranchType());
}
Copy the code
When we need to perform operations on the Dao layer, we need to apply for a connection from the data source agent. At this time, the connection we get is also the Seata proxy connection. When we use this connection to process SQL, the Statement and PreparedStatement we get are proxy objects. Our SQL parsing is done in StatementProxy.
@Override
public boolean execute(String sql) throws SQLException {
this.targetSQL = sql;
return ExecuteTemplate.execute(this, (statement, args) -> statement.execute((String) args[0]), sql);
}
Copy the code
Here comes the template method for SQL execution, where SQL is categorized and different SQL is treated differently
public class ExecuteTemplate {
public static <T, S extends Statement> T execute(StatementProxy statementProxy, StatementCallback
statementCallback, Object... args)
,> throws SQLException {
return execute(null, statementProxy, statementCallback, args);
}
public static <T, S extends Statement> T execute(List
sqlRecognizers, StatementProxy
statementProxy, StatementCallback
statementCallback, Object... args)
,>
throws SQLException {
if(! RootContext.requireGlobalLock() && BranchType.AT ! = RootContext.getBranchType()) {// Just work as original statement
return statementCallback.execute(statementProxy.getTargetStatement(), args);
}
String dbType = statementProxy.getConnectionProxy().getDbType();
if (CollectionUtils.isEmpty(sqlRecognizers)) {
sqlRecognizers = SQLVisitorFactory.get(
statementProxy.getTargetSQL(),
dbType);
}
Executor<T> executor;
if (CollectionUtils.isEmpty(sqlRecognizers)) {
executor = new PlainExecutor<>(statementProxy, statementCallback);
} else {
if (sqlRecognizers.size() == 1) {
SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
switch (sqlRecognizer.getSQLType()) {
case INSERT:
executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
new Object[]{statementProxy, statementCallback, sqlRecognizer});
break;
case UPDATE:
executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
case DELETE:
executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
case SELECT_FOR_UPDATE:
executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
case INSERT_ON_DUPLICATE_UPDATE:
if (JdbcConstants.MYSQL.equals(dbType)) {
executor = new MySQLInsertOrUpdateExecutor(statementProxy,statementCallback,sqlRecognizer);
} else {
throw new NotSupportYetException(dbType + " not support to INSERT_ON_DUPLICATE_UPDATE");
}
break;
default:
executor = new PlainExecutor<>(statementProxy, statementCallback);
break; }}else {
executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
}
}
T rs;
try {
rs = executor.execute(args);
} catch (Throwable ex) {
if(! (exinstanceof SQLException)) {
// Turn other exception into SQLException
ex = new SQLException(ex);
}
throw (SQLException) ex;
}
returnrs; }}Copy the code
The SPI mechanism is introduced for the inserted actuator, and its configuration file provides a variety of ways to dynamically select according to the type of data source
Let’s take a look at how undo logs are generated during SQL execution. The SQL execution requires turning off the automatic submission of food and calling beforeImage(), afterImage(beforeImage) methods to generate TableRecords objects before and after SQL execution. Finally, the front and back images are added to the context of the connectionProxy through prepareUndoLog().
@Override
public T execute(Object... args) throws Throwable {
String xid = RootContext.getXID();
if(xid ! =null) {
statementProxy.getConnectionProxy().bind(xid);
}
statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
return doExecute(args);
}
@Override
public T doExecute(Object... args) throws Throwable {
AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
if (connectionProxy.getAutoCommit()) {
return executeAutoCommitTrue(args);
} else {
returnexecuteAutoCommitFalse(args); }}// Both front and back images are generated when auto commit is disabled
protected T executeAutoCommitFalse(Object[] args) throws Exception {
if(! JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {throw new NotSupportYetException("multi pk only support mysql!");
}
TableRecords beforeImage = beforeImage();
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
TableRecords afterImage = afterImage(beforeImage);
prepareUndoLog(beforeImage, afterImage);
return result;
}
Copy the code
After SQL is executed, we should execute the commit of our connection. If it is judged to be a global thing, we need to register it as a branch thing with TC, and then save the undo log to submit the branch thing.
@Override
public void commit(a) throws SQLException {
try {
lockRetryPolicy.execute(() -> {
doCommit();
return null;
});
} catch (SQLException e) {
if(targetConnection ! =null&&! getAutoCommit() && ! getContext().isAutoCommitChanged()) { rollback(); }throw e;
} catch (Exception e) {
throw newSQLException(e); }}private void doCommit(a) throws SQLException {
if (context.inGlobalTransaction()) {
processGlobalTransactionCommit();
} else if (context.isGlobalLockRequire()) {
processLocalCommitWithGlobalLocks();
} else{ targetConnection.commit(); }}private void processGlobalTransactionCommit(a) throws SQLException {
try {
register();
} catch (TransactionException e) {
recognizeLockKeyConflictException(e, context.buildLockKeys());
}
try {
UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
targetConnection.commit();
} catch (Throwable ex) {
LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
report(false);
throw new SQLException(ex);
}
if (IS_REPORT_SUCCESS_ENABLE) {
report(true);
}
context.reset();
}
private void register(a) throws TransactionException {
if(! context.hasUndoLog() || ! context.hasLockKey()) {return;
}
Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
null, context.getXid(), null, context.buildLockKeys());
context.setBranchId(branchId);
}
Copy the code
5.3 Submission process
The process to comb
- Phase 2 commit initiate
- The TC sends a submission notification to RM
- RM Asynchronous Submission
- Group commits by resource so that the same database can operate together, avoiding the performance cost of establishing multiple connections
- Each group goes or gets a corresponding normal database connection
- Clear corresponding undo logs
Source code analysis
After the transaction is successfully executed in the transaction template, THE commitTransaction method of the transaction will be called, and TM will initiate the transaction submission application to TC
private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
try {
triggerBeforeCommit();
tx.commit();
triggerAfterCommit();
} catch (TransactionException txe) {
// 4.1 Failed to commit
throw newTransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.CommitFailure); }}@Override
public void commit(a) throws TransactionException {
if (role == GlobalTransactionRole.Participant) {
// Participant has no responsibility of committing
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", xid);
}
return;
}
assertXIDNotNull();
int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;
try {
while (retry > 0) {
try {
status = transactionManager.commit(xid);
break;
} catch (Throwable ex) {
LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}".this.getXid(), retry, ex.getMessage());
retry--;
if (retry == 0) {
throw new TransactionException("Failed to report global commit", ex); }}}}finally {
if(xid.equals(RootContext.getXID())) { suspend(); }}if (LOGGER.isInfoEnabled()) {
LOGGER.info("[{}] commit status: {}", xid, status); }}Copy the code
The core commit method is doGlobalCommit for DefaultCore, where the TC drives each branch transaction to complete the commit of the transaction
@Override
public GlobalStatus commit(String xid) throws TransactionException {
GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
if (globalSession == null) {
return GlobalStatus.Finished;
}
globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
// just lock changeStatus
boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {
// Highlight: Firstly, close the session, then no more branch can be registered.
globalSession.closeAndClean();
if (globalSession.getStatus() == GlobalStatus.Begin) {
if (globalSession.canBeCommittedAsync()) {
globalSession.asyncCommit();
return false;
} else {
globalSession.changeStatus(GlobalStatus.Committing);
return true; }}return false;
});
if (shouldCommit) {
boolean success = doGlobalCommit(globalSession, false);
//If successful and all remaining branches can be committed asynchronously, do async commit.
if (success && globalSession.hasBranch() && globalSession.canBeCommittedAsync()) {
globalSession.asyncCommit();
return GlobalStatus.Committed;
} else {
returnglobalSession.getStatus(); }}else {
returnglobalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus(); }}@Override
public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
boolean success = true;
// Start submitting events
eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
globalSession.getTransactionName(), globalSession.getApplicationId(), globalSession.getTransactionServiceGroup(),
globalSession.getBeginTime(), null, globalSession.getStatus()));
if (globalSession.isSaga()) {
success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);
} else {
Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {
// If you do not retry, skip the canBeCommittedAsync branch
if(! retrying && branchSession.canBeCommittedAsync()) {return CONTINUE;
}
BranchStatus currentStatus = branchSession.getStatus();
if (currentStatus == BranchStatus.PhaseOne_Failed) {
globalSession.removeBranch(branchSession);
return CONTINUE;
}
try {
BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);
switch (branchStatus) {
case PhaseTwo_Committed:
globalSession.removeBranch(branchSession);
return CONTINUE;
case PhaseTwo_CommitFailed_Unretryable:
if (globalSession.canBeCommittedAsync()) {
LOGGER.error(
"Committing branch transaction[{}], status: PhaseTwo_CommitFailed_Unretryable, please check the business log.", branchSession.getBranchId());
return CONTINUE;
} else {
SessionHelper.endCommitFailed(globalSession);
LOGGER.error("Committing global transaction[{}] finally failed, caused by branch transaction[{}] commit failed.", globalSession.getXid(), branchSession.getBranchId());
return false;
}
default:
if(! retrying) { globalSession.queueToRetryCommit();return false;
}
if (globalSession.canBeCommittedAsync()) {
LOGGER.error("Committing branch transaction[{}], status:{} and will retry later",
branchSession.getBranchId(), branchStatus);
return CONTINUE;
} else {
LOGGER.error(
"Committing global transaction[{}] failed, caused by branch transaction[{}] commit failed, will retry later.", globalSession.getXid(), branchSession.getBranchId());
return false; }}}catch (Exception ex) {
StackTraceLogger.error(LOGGER, ex, "Committing branch transaction exception: {}".new String[] {branchSession.toString()});
if(! retrying) { globalSession.queueToRetryCommit();throw newTransactionException(ex); }}return CONTINUE;
});
// Return if the result is not null
if(result ! =null) {
return result;
}
//If has branch and not all remaining branches can be committed asynchronously,
//do print log and return false
if(globalSession.hasBranch() && ! globalSession.canBeCommittedAsync()) { LOGGER.info("Committing global transaction is NOT done, xid = {}.", globalSession.getXid());
return false; }}// if it succeeds and there is no branch, retrying=true is the asynchronous state when retrying. EndCommitted is executed to improve concurrency performance, and the global transaction ends..
if (success && globalSession.getBranchSessions().isEmpty() && retrying) {
SessionHelper.endCommitted(globalSession);
// committed event
eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
globalSession.getTransactionName(), globalSession.getApplicationId(), globalSession.getTransactionServiceGroup(),
globalSession.getBeginTime(), System.currentTimeMillis(), globalSession.getStatus()));
LOGGER.info("Committing global transaction is successfully done, xid = {}.", globalSession.getXid());
}
return success;
}
@Override
public BranchStatus branchCommit(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {
try {
BranchCommitRequest request = new BranchCommitRequest();
request.setXid(branchSession.getXid());
request.setBranchId(branchSession.getBranchId());
request.setResourceId(branchSession.getResourceId());
request.setApplicationData(branchSession.getApplicationData());
request.setBranchType(branchSession.getBranchType());
return branchCommitSend(request, globalSession, branchSession);
} catch (IOException | TimeoutException e) {
throw new BranchTransactionException(FailedToSendBranchCommitRequest,
String.format("Send branch commit failed, xid = %s branchId = %s", branchSession.getXid(), branchSession.getBranchId()), e); }}Copy the code
So how does RM perform the commit of things? RM is executed via a callback function, and its core class is AbstractRMHandler, which eventually commits branch things via an asynchronous task.
protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response)
throws TransactionException {
String xid = request.getXid();
long branchId = request.getBranchId();
String resourceId = request.getResourceId();
String applicationData = request.getApplicationData();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Branch committing: " + xid + "" + branchId + "" + resourceId + "" + applicationData);
}
BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId,
applicationData);
response.setXid(xid);
response.setBranchId(branchId);
response.setBranchStatus(status);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Branch commit result: "+ status); }}@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
return asyncWorker.branchCommit(xid, branchId, resourceId);
}
public BranchStatus branchCommit(String xid, long branchId, String resourceId) {
Phase2Context context = new Phase2Context(xid, branchId, resourceId);
addToCommitQueue(context);
return BranchStatus.PhaseTwo_Committed;
}
// Commit asynchronously
private void addToCommitQueue(Phase2Context context) {
if (commitQueue.offer(context)) {
return;
}
CompletableFuture.runAsync(this::doBranchCommitSafely, scheduledExecutor)
.thenRun(() -> addToCommitQueue(context));
}
// Submit method
void doBranchCommitSafely(a) {
try {
doBranchCommit();
} catch (Throwable e) {
LOGGER.error("Exception occur when doing branch commit", e); }}// Submit method
private void doBranchCommit(a) {
if (commitQueue.isEmpty()) {
return;
}
// Transfer all currently received contexts to this list
List<Phase2Context> allContexts = new LinkedList<>();
commitQueue.drainTo(allContexts);
// Group contexts by resource ID
Map<String, List<Phase2Context>> groupedContexts = groupedByResourceId(allContexts);
groupedContexts.forEach(this::dealWithGroupedContexts);
}
// Handle the grouping context
private void dealWithGroupedContexts(String resourceId, List<Phase2Context> contexts) {
DataSourceProxy dataSourceProxy = dataSourceManager.get(resourceId);
if (dataSourceProxy == null) {
LOGGER.warn("Failed to find resource for {}", resourceId);
return;
}
Connection conn;
try {
conn = dataSourceProxy.getPlainConnection();
} catch (SQLException sqle) {
LOGGER.error("Failed to get connection for async committing on {}", resourceId, sqle);
return;
}
UndoLogManager undoLogManager = UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType());
// Split the context into multiple lists, each containing no more than the size limit
List<List<Phase2Context>> splitByLimit = Lists.partition(contexts, UNDOLOG_DELETE_LIMIT_SIZE);
/ / remove UndoLog
splitByLimit.forEach(partition -> deleteUndoLog(conn, undoLogManager, partition));
}
Copy the code
5.4 Roll back the transaction flow
The process to comb
- Phase 2 rollback initiated
- The TC sent a rollback notification to RM
- RM rolls back the things based on the undo logs it stores
- Get the undo log according to xID and branch transaction ID
- Check the status of undo logs to avoid repeated execution
- Based on the undo log, the SQL rollback is parsed and executed. After the execution, the undo status is changed
- Clear corresponding undo logs
- Commit a local transaction. In addition, the execution result of the local transaction (that is, the rollback result of the branch transaction) is reported to the TC.
Source code analysis
Rollback is when things abnormal, when executed in exception handling completeTransactionAfterThrowing method, is also controlled by TM, and performed by the TC.
private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException) throws TransactionalExecutor.ExecutionException {
//roll back
if(txInfo ! =null && txInfo.rollbackOn(originalException)) {
try {
rollbackTransaction(tx, originalException);
} catch (TransactionException txe) {
// Failed to rollback
throw newTransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.RollbackFailure, originalException); }}else {
// not roll back on this exception, so commitcommitTransaction(tx); }}private void rollbackTransaction(GlobalTransaction tx, Throwable originalException) throws TransactionException, TransactionalExecutor.ExecutionException {
triggerBeforeRollback();
tx.rollback();
triggerAfterRollback();
// 3.1 Successfully rolled back
throw new TransactionalExecutor.ExecutionException(tx, GlobalStatus.RollbackRetrying.equals(tx.getLocalStatus())
? TransactionalExecutor.Code.RollbackRetrying : TransactionalExecutor.Code.RollbackDone, originalException);
}
@Override
public void rollback(a) throws TransactionException {
if (role == GlobalTransactionRole.Participant) {
// Participant has no responsibility of rollback
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Rollback(): just involved in global transaction [{}]", xid);
}
return;
}
assertXIDNotNull();
int retry = ROLLBACK_RETRY_COUNT <= 0 ? DEFAULT_TM_ROLLBACK_RETRY_COUNT : ROLLBACK_RETRY_COUNT;
try {
while (retry > 0) {
try {
status = transactionManager.rollback(xid);
break;
} catch (Throwable ex) {
LOGGER.error("Failed to report global rollback [{}],Retry Countdown: {}, reason: {}".this.getXid(), retry, ex.getMessage());
retry--;
if (retry == 0) {
throw new TransactionException("Failed to report global rollback", ex); }}}}finally {
if(xid.equals(RootContext.getXID())) { suspend(); }}if (LOGGER.isInfoEnabled()) {
LOGGER.info("[{}] rollback status: {}", xid, status); }}Copy the code
The core rollback method is doGlobalRollback of DefaultCore, where the TC drives each RM to roll back the thing
@Override
public GlobalStatus rollback(String xid) throws TransactionException {
GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
if (globalSession == null) {
return GlobalStatus.Finished;
}
globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
// just lock changeStatus
boolean shouldRollBack = SessionHolder.lockAndExecute(globalSession, () -> {
globalSession.close(); // Highlight: Firstly, close the session, then no more branch can be registered.
if (globalSession.getStatus() == GlobalStatus.Begin) {
globalSession.changeStatus(GlobalStatus.Rollbacking);
return true;
}
return false;
});
if(! shouldRollBack) {return globalSession.getStatus();
}
doGlobalRollback(globalSession, false);
return globalSession.getStatus();
}
@Override
public boolean doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {
boolean success = true;
// start rollback event
eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(),
GlobalTransactionEvent.ROLE_TC, globalSession.getTransactionName(),
globalSession.getApplicationId(),
globalSession.getTransactionServiceGroup(), globalSession.getBeginTime(),
null, globalSession.getStatus()));
if (globalSession.isSaga()) {
success = getCore(BranchType.SAGA).doGlobalRollback(globalSession, retrying);
} else {
Boolean result = SessionHelper.forEach(globalSession.getReverseSortedBranches(), branchSession -> {
BranchStatus currentBranchStatus = branchSession.getStatus();
if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {
globalSession.removeBranch(branchSession);
return CONTINUE;
}
try {
BranchStatus branchStatus = branchRollback(globalSession, branchSession);
switch (branchStatus) {
case PhaseTwo_Rollbacked:
globalSession.removeBranch(branchSession);
LOGGER.info("Rollback branch transaction successfully, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
return CONTINUE;
case PhaseTwo_RollbackFailed_Unretryable:
SessionHelper.endRollbackFailed(globalSession);
LOGGER.info("Rollback branch transaction fail and stop retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
return false;
default:
LOGGER.info("Rollback branch transaction fail and will retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
if(! retrying) { globalSession.queueToRetryRollback(); }return false; }}catch (Exception ex) {
StackTraceLogger.error(LOGGER, ex,
"Rollback branch transaction exception, xid = {} branchId = {} exception = {}".new String[] {globalSession.getXid(), String.valueOf(branchSession.getBranchId()), ex.getMessage()});
if(! retrying) { globalSession.queueToRetryRollback(); }throw newTransactionException(ex); }});// Return if the result is not null
if(result ! =null) {
return result;
}
GlobalSession globalSessionTwice = SessionHolder.findGlobalSession(globalSession.getXid());
if(globalSessionTwice ! =null && globalSessionTwice.hasBranch()) {
LOGGER.info("Rollbacking global transaction is NOT done, xid = {}.", globalSession.getXid());
return false; }}if (success) {
SessionHelper.endRollbacked(globalSession);
// rollbacked event
eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(),
GlobalTransactionEvent.ROLE_TC, globalSession.getTransactionName(),
globalSession.getApplicationId(),
globalSession.getTransactionServiceGroup(),
globalSession.getBeginTime(), System.currentTimeMillis(),
globalSession.getStatus()));
LOGGER.info("Rollback global transaction successfully, xid = {}.", globalSession.getXid());
}
return success;
}
@Override
public BranchStatus branchRollback(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {
try {
BranchRollbackRequest request = new BranchRollbackRequest();
request.setXid(branchSession.getXid());
request.setBranchId(branchSession.getBranchId());
request.setResourceId(branchSession.getResourceId());
request.setApplicationData(branchSession.getApplicationData());
request.setBranchType(branchSession.getBranchType());
return branchRollbackSend(request, globalSession, branchSession);
} catch (IOException | TimeoutException e) {
throw new BranchTransactionException(FailedToSendBranchRollbackRequest,
String.format("Send branch rollback failed, xid = %s branchId = %s", branchSession.getXid(), branchSession.getBranchId()), e); }}Copy the code
So how does RM perform rollback of things? RM is executed via the callback function, and its core class is AbstractRMHandler
@Override
public BranchRollbackResponse handle(BranchRollbackRequest request) {
BranchRollbackResponse response = new BranchRollbackResponse();
exceptionHandleTemplate(new AbstractCallback<BranchRollbackRequest, BranchRollbackResponse>() {
@Override
public void execute(BranchRollbackRequest request, BranchRollbackResponse response)
throws TransactionException {
doBranchRollback(request, response);
}
}, request, response);
return response;
}
protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response)
throws TransactionException {
String xid = request.getXid();
long branchId = request.getBranchId();
String resourceId = request.getResourceId();
String applicationData = request.getApplicationData();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Branch Rollbacking: " + xid + "" + branchId + "" + resourceId);
}
BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId,
applicationData);
response.setXid(xid);
response.setBranchId(branchId);
response.setBranchStatus(status);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Branch Rollbacked result: "+ status); }}@Override
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
DataSourceProxy dataSourceProxy = get(resourceId);
if (dataSourceProxy == null) {
throw new ShouldNeverHappenException();
}
try {
UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);
} catch (TransactionException te) {
StackTraceLogger.info(LOGGER, te,
"branchRollback failed. branchType:[{}], xid:[{}], branchId:[{}], resourceId:[{}], applicationData:[{}]. reason:[{}]".new Object[]{branchType, xid, branchId, resourceId, applicationData, te.getMessage()});
if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
} else {
returnBranchStatus.PhaseTwo_RollbackFailed_Retryable; }}return BranchStatus.PhaseTwo_Rollbacked;
}
@Override
public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
Connection conn = null;
ResultSet rs = null;
PreparedStatement selectPST = null;
boolean originalAutoCommit = true;
for(; ;) {try {
conn = dataSourceProxy.getPlainConnection();
// The entire undo process should run in a local transaction.
if (originalAutoCommit = conn.getAutoCommit()) {
conn.setAutoCommit(false);
}
// Find UNDO LOG
selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
selectPST.setLong(1, branchId);
selectPST.setString(2, xid);
rs = selectPST.executeQuery();
boolean exists = false;
while (rs.next()) {
exists = true;
// The server may repeatedly send rollback requests for rollback
// Move the same branch transaction to multiple processes, ensuring that only normal undo_ logs are processed.
int state = rs.getInt(ClientTableColumnsName.UNDO_LOG_LOG_STATUS);
if(! canUndo(state)) {if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, ignore {} undo_log", xid, branchId, state);
}
return;
}
String contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);
Map<String, String> context = parseContext(contextString);
byte[] rollbackInfo = getRollbackInfo(rs);
String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);
UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance()
: UndoLogParserFactory.getInstance(serializer);
BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);
try {
// put serializer name to local
setCurrentSerializer(parser.getName());
List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
if (sqlUndoLogs.size() > 1) {
Collections.reverse(sqlUndoLogs);
}
for(SQLUndoLog sqlUndoLog : sqlUndoLogs) { TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta( conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId()); sqlUndoLog.setTableMeta(tableMeta); AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor( dataSourceProxy.getDbType(), sqlUndoLog); undoExecutor.executeOn(conn); }}finally {
// remove serializer nameremoveCurrentSerializer(); }}If undo_log exists, it indicates that the first phase of the branch transaction has been completed. Otherwise, it indicates that there is an exception in the branch transaction and undo_log cannot be written to the database.
if (exists) {
deleteUndoLog(xid, branchId, conn);
conn.commit();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, undo_log deleted with {}", xid, branchId, State.GlobalFinished.name()); }}else {
insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);
conn.commit();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, undo_log added with {}", xid, branchId, State.GlobalFinished.name()); }}return;
} catch (SQLIntegrityConstraintViolationException e) {
// Possible undo_log has been inserted into the database by other processes, retrying rollback undo_log
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, undo_log inserted, retry rollback", xid, branchId); }}catch (Throwable e) {
if(conn ! =null) {
try {
conn.rollback();
} catch (SQLException rollbackEx) {
LOGGER.warn("Failed to close JDBC resource while undo ... ", rollbackEx); }}throw new BranchTransactionException(BranchRollbackFailed_Retriable, String
.format("Branch session rollback failed and try again later xid = %s branchId = %s %s", xid,
branchId, e.getMessage()), e);
} finally {
try {
if(rs ! =null) {
rs.close();
}
if(selectPST ! =null) {
selectPST.close();
}
if(conn ! =null) {
if (originalAutoCommit) {
conn.setAutoCommit(true); } conn.close(); }}catch (SQLException closeEx) {
LOGGER.warn("Failed to close JDBC resource while undo ... ", closeEx); }}}}Copy the code
6 @GlobalLock’s role
When we don’t want to ordinary business method, affect the normal operation of global things, and can use @ GlobalLock annotations logo, so although this method is not globally things branch of things, but it in access to resources, also needs to query the global lock, if global things in the implementation of this method also needs to wait for, so as not to interfere with the operation of global things.
GlobalTransactionalInterceptor will also mark the global lock annotation intercept method, and the configuration in the properties in the design.
Object handleGlobalLock(final MethodInvocation methodInvocation,
final GlobalLock globalLockAnno) throws Throwable {
return globalLockTemplate.execute(new GlobalLockExecutor() {
@Override
public Object execute(a) throws Throwable {
return methodInvocation.proceed();
}
@Override
public GlobalLockConfig getGlobalLockConfig(a) {
GlobalLockConfig config = new GlobalLockConfig();
config.setLockRetryInterval(globalLockAnno.lockRetryInterval());
config.setLockRetryTimes(globalLockAnno.lockRetryTimes());
returnconfig; }}); }Copy the code
Global lock annotations also have corresponding template methods that bind global lock tokens before and after execution
public class GlobalLockTemplate {
public Object execute(GlobalLockExecutor executor) throws Throwable {
boolean alreadyInGlobalLock = RootContext.requireGlobalLock();
if(! alreadyInGlobalLock) { RootContext.bindGlobalLockFlag(); }// set my config to config holder so that it can be access in further execution
// for example, LockRetryController can access it with config holder
GlobalLockConfig myConfig = executor.getGlobalLockConfig();
GlobalLockConfig previousConfig = GlobalLockConfigHolder.setAndReturnPrevious(myConfig);
try {
return executor.execute();
} finally {
// only unbind when this is the root caller.
// otherwise, the outer caller would lose global lock flag
if(! alreadyInGlobalLock) { RootContext.unbindGlobalLockFlag(); }// if previous config is not null, we need to set it back
// so that the outer logic can still use their config
if(previousConfig ! =null) {
GlobalLockConfigHolder.setAndReturnPrevious(previousConfig);
} else{ GlobalLockConfigHolder.remove(); }}}}Copy the code
When the transaction is committed, the binding of the global lock flag is blocked and waits for the lock to be acquired before it can be executed.
private void doCommit(a) throws SQLException {
if (context.inGlobalTransaction()) {
processGlobalTransactionCommit();
} else if (context.isGlobalLockRequire()) {
processLocalCommitWithGlobalLocks();
} else{ targetConnection.commit(); }}private void processLocalCommitWithGlobalLocks(a) throws SQLException {
checkLock(context.buildLockKeys());
try {
targetConnection.commit();
} catch (Throwable ex) {
throw new SQLException(ex);
}
context.reset();
}
Copy the code
Thank you for reading, if you feel helpful, please click a thumbs-up, thanks a million!!