This is the 13th day of my participation in the More text Challenge. For details, see more text Challenge

>>>> ๐Ÿ˜œ๐Ÿ˜œ๐Ÿ˜œ Making: ๐Ÿ‘‰ github.com/black-ant

A. The preface

This article looks at the configuration file of the Seate Client, and how and how to configure it.

#====================================Seata Config===============================================
seata:
  enabled: true
  application-id: business-seata-example
  tx-service-group: business-service-seata-service-group # Transaction group (can be named independently for each application or use the same name)
  client:
    rm-report-success-enable: true
    rm-table-meta-check-enable: false # Automatically refresh table structures in cache (default false)
    rm-report-retry-count: 5 # Number of TC retries reported in phase 1 (default: 5)
    rm-async-commit-buffer-limit: 10000 # Asynchronous commit cache queue length (default: 10000)
    rm:
      lock:
        lock-retry-internal: 10 # Verify or occupy the global lock retry interval (default: 10ms)
        lock-retry-times:    30 Number of retries to verify or occupy a global lock (default: 30)
        lock-retry-policy-branch-rollback-on-conflict: true Lock policy when branch transaction conflicts with other global rollback transactions (release local lock first to make rollback successful)
    tm-commit-retry-count:   3 # Number of TC retries to report the global submission result in phase 1 (default: 1, recommended to be greater than 1)
    tm-rollback-retry-count: 3 # Number of TC retries to report the global rollback result in phase 1 (1 by default, more than 1 is recommended)
    undo:
      undo-data-validation: true # Rollback image verification in phase 2 (default: true)
      undo-log-serialization: jackson # Undo serialization (Jackson by default)
      undo-log-table: undo_log  # Customizable undo table name (default undo_log)
    log:
      exceptionRate: 100 # Log output probability (default: 100)
    support:
      spring:
        datasource-autoproxy: true
  service:
    vgroup-mapping:
      my_test_tx_group: default # TC cluster (must be consistent with setA-server)
    enable-degrade: false # Degrade switch
    disable-global-transaction: false # Disable global transactions (default false)
    grouplist:
      default: 127.0. 01.: 8091
  transport:
    shutdown:
      wait: 3
    thread-factory:
      boss-thread-prefix: NettyBoss
      worker-thread-prefix: NettyServerNIOWorker
      server-executor-thread-prefix: NettyServerBizHandler
      share-boss-worker: false
      client-selector-thread-prefix: NettyClientSelector
      client-selector-thread-size: 1
      client-worker-thread-prefix: NettyClientWorkerThread
    type: TCP
    server: NIO
    heartbeat: true
    serialization: seata
    compressor: none
    enable-client-batch-send-request: true Whether client transaction message requests are sent in batches (default true)
  registry:
    file:
      name: file.conf
    type: nacos
    nacos:
      server-addr: localhost:8848
      namespace:
      cluster: default
  config:
    file:
      name: file.conf
    type: nacos
    nacos:
      namespace:
      server-addr: localhost:8848
Copy the code

2. Configure objects

2.1 Seata top-level objects

This object corresponds to the seata. XXX configuration

@ConfigurationProperties(prefix = "seata")
@EnableConfigurationProperties(SpringCloudAlibabaConfiguration.class)
public class SeataProperties {
    /** whether to enable automatic configuration */
    private boolean enabled = true;
    /** * application id */
    private String applicationId;
    /** * Transaction Service Group */
    private String txServiceGroup;
    /** whether to enable automatic proxy for data source beans */
    private boolean enableAutoDataSourceProxy = true;
    /** * Data source proxy mode */
    private String dataSourceProxyMode = DefaultValues.DEFAULT_DATA_SOURCE_PROXY_MODE;
    /** * Whether to use JDK proxy instead of CGLIB proxy */
    private boolean useJdkProxy = false;
    /** * specifies which data source bean does not qualify for automatic proxy */
    private String[] excludesForAutoProxying = {};
    
}

Copy the code

2.2 Seata. Client Configures objects

 client:
    rm-report-success-enable: true
    rm-table-meta-check-enable: false# Automatically refresh table structures in the cache (defaultfalse) the report rm - retry - count:5# The number of TC retries reported in phase 1 (default)5- commit - retry) tm - count:3Number of TC retries to report global submission results in phase 1 (default1Times, the recommendation is greater than1) the rollback tm - retry - count:3Number of TC retries reported for global rollback in phase 1 (default1Times, the recommendation is greater than1)// The level-1 directory has the following configuration, and the corresponding object is:
public class RmProperties {
    private int asyncCommitBufferLimit = 10000;
    private int reportRetryCount = 5;
    private boolean tableMetaCheckEnable = false;
    private boolean reportSuccessEnable = false;
    private boolean sagaBranchRegisterEnable = false;
    private String sagaJsonParser = fastjson;   
}    
  
Copy the code

Above is the client level configuration corresponding to the class, let’s take a look at its sub-configuration

// The secondary directory is seata.client.log
public class LogProperties {
    private int exceptionRate = 100;
}

// Secondary directory: seata.client.undo
public class UndoProperties {
    private boolean dataValidation = true;
    private String logSerialization = "jackson";
    private String logTable = "undo_log";
    private boolean onlyCareUpdateColumns = true;
}
      
// Secondary directory: seata.client.support
Copy the code

2.3 seata. The service object

@Component
@ConfigurationProperties(prefix = "seata.service")
public class ServiceProperties implements InitializingBean {
    /** * vgroup->rgroup */
    private Map<String, String> vgroupMapping = new HashMap<>();
    /** * group list */
    private Map<String, String> grouplist = new HashMap<>();
    /** * degrade current not support */
    private boolean enableDegrade = false;
    /** * disable globalTransaction */
    private boolean disableGlobalTransaction = false;

}
Copy the code

2.4 seata. Transport

Let’s look at the configuration class and the default parameters

@Component
@ConfigurationProperties(prefix = TRANSPORT_PREFIX)
public class TransportProperties {
    /** * tcp, unix-domain-socket */
    private String type = "TCP";
    /** * NIO, NATIVE */
    private String server = "NIO";
    /** * enable heartbeat */
    private boolean heartbeat = true;
    /** * serialization */
    private String serialization = "seata";
    /** * compressor */
    private String compressor = "none";

    /** * enable client batch send request */
    private boolean enableClientBatchSendRequest = true;
    
}


@Component
@ConfigurationProperties(prefix = "seata.transport.thread-factory")
public class ThreadFactoryProperties {
    private String bossThreadPrefix = "NettyBoss";
    private String workerThreadPrefix = "NettyServerNIOWorker";
    private String serverExecutorThreadPrefix = "NettyServerBizHandler";
    private boolean shareBossWorker = false;
    private String clientSelectorThreadPrefix = "NettyClientSelector";
    private int clientSelectorThreadSize = 1;
    private String clientWorkerThreadPrefix = "NettyClientWorkerThread";
}    

Copy the code

2.5 seata. Config

@Component
@ConfigurationProperties(prefix = CONFIG_PREFIX)
public class ConfigProperties {
    /** * file, nacos, apollo, zk, consul, etcd3, springCloudConfig */
    private String type = "file"; } ConfigNacosProperties ConfigXXXProperties, etcCopy the code

Note that there are multiple implementation classes, each corresponding to a configuration class

2.6 seata. Registry

@Component
@ConfigurationProperties(prefix = "seata.registry")
public class RegistryProperties {
    /** * file, nacos, eureka, redis, zk, consul, etcd3, sofa */
    private String type = "file";
    /** * the load balance */
    private String loadBalance = DEFAULT_LOAD_BALANCE;
    /** * Load balancing virtual node */
    private int loadBalanceVirtualNodes = VIRTUAL_NODES_DEFAULT;
    
}
Copy the code

3. Client initialization process

3.1 Initial Configuration Process

Take a look at the master configuration class SeataAutoConfiguration,. Here will scan the IO seata. Spring. The boot. Autoconfigure. The properties under the package of all the configuration class

// The configuration of the Client is based on the SeataAutoConfiguration class

@ComponentScan(basePackages = "io.seata.spring.boot.autoconfigure.properties")
@ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
@Configuration
@EnableConfigurationProperties({SeataProperties.class})
public class SeataAutoConfiguration {
    private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoConfiguration.class);

    @Bean(BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER)
    @ConditionalOnMissingBean(name = {BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER})
    public SpringApplicationContextProvider springApplicationContextProvider(a) {
        return new SpringApplicationContextProvider();
    }

    @Bean(BEAN_NAME_FAILURE_HANDLER)
    @ConditionalOnMissingBean(FailureHandler.class)
    public FailureHandler failureHandler(a) {
        return new DefaultFailureHandlerImpl();
    }

    @Bean
    @DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
    @ConditionalOnMissingBean(GlobalTransactionScanner.class)
    public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Automatically configure Seata");
        }
        return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
    }

    /** * Data source configuration */
    @Configuration
    @ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, name = {"enableAutoDataSourceProxy", "enable-auto-data-source-proxy"}, havingValue = "true", matchIfMissing = true)
    static class SeataDataSourceConfiguration {

        /** * The bean seataDataSourceBeanPostProcessor. */
        @Bean(BEAN_NAME_SEATA_DATA_SOURCE_BEAN_POST_PROCESSOR)
        @ConditionalOnMissingBean(SeataDataSourceBeanPostProcessor.class)
        public SeataDataSourceBeanPostProcessor seataDataSourceBeanPostProcessor(SeataProperties seataProperties) {
            return new SeataDataSourceBeanPostProcessor(seataProperties.getExcludesForAutoProxying(), seataProperties.getDataSourceProxyMode());
        }

        /** * The bean seataAutoDataSourceProxyCreator. */
        @Bean(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR)
        @ConditionalOnMissingBean(SeataAutoDataSourceProxyCreator.class)
        public SeataAutoDataSourceProxyCreator seataAutoDataSourceProxyCreator(SeataProperties seataProperties) {
            return newSeataAutoDataSourceProxyCreator(seataProperties.isUseJdkProxy(), seataProperties.getExcludesForAutoProxying(), seataProperties.getDataSourceProxyMode()); }}}Copy the code

The following objects are involved:

  • C- SpringApplicationContextProvider
  • C- FailureHandler
  • C- GlobalTransactionScanner
  • C- SeataDataSourceBeanPostProcessor
  • C- SeataAutoDataSourceProxyCreator

3.1.1 SpringApplicationContextProvider

public class SpringApplicationContextProvider implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { ObjectHolder.INSTANCE.setObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT, applicationContext); }}Copy the code

You can see here that the ObjectHolder is called to store the applicationContext

As I mentioned earlier in my analysis of Spring, Aware can be used for post-creation notification, in this case by setting a parameter to an enumeration class.

This is the first time I’ve seen an enumeration class used like this. Does this implement a singleton?

public enum ObjectHolder {

    INSTANCE;
    private static final int MAP_SIZE = 8;
    private static final Map<String, Object> OBJECT_MAP = new ConcurrentHashMap<>(MAP_SIZE);

    public Object getObject(String objectKey) {
        return OBJECT_MAP.get(objectKey);
    }

    public <T> T getObject(Class<T> clasz) {
        return clasz.cast(OBJECT_MAP.values().stream().filter(clasz::isInstance).findAny().orElseThrow(() -> new ShouldNeverHappenException("Can't find any object of class " + clasz.getName())));
    }

    public Object setObject(String objectKey, Object object) {
        returnOBJECT_MAP.putIfAbsent(objectKey, object); }}Copy the code

3.1.2 FailureHandler

FailureHandler provides the following methods:

public interface FailureHandler {

    // Startup error
    void onBeginFailure(GlobalTransaction tx, Throwable cause);
    // Submit exception
    void onCommitFailure(GlobalTransaction tx, Throwable cause);
    // The rollback is abnormal
    void onRollbackFailure(GlobalTransaction tx, Throwable originalException);
    Retry the operation
    void onRollbackRetrying(GlobalTransaction tx, Throwable originalException);
}
Copy the code

The default implementation class for DefaultFailureHandlerImpl, configurable here means can realize and extension

3.1.3 GlobalTransactionScanner

There are several abstractions and interfaces involved in this class:

  • AbstractAutoProxyCreator: A BeanPostProcessor implementation that wraps beans using an AOP proxy
  • ConfigurationChangeListener: configuration changes to monitor
  • InitializingBean: Initializes the call
  • ApplicationContextAware: Aware of listener handling
  • DisposableBean: Disposal

C- AbstractAutoProxyCreator # wrapIfNecessary

AbstractAutoProxyCreator applicable to agency postProcess, mainly are running in postProcessAfterInitialization stage calls:

    // This method is used for verification
    protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
        try {
            synchronized (PROXYED_SET) {
                if (PROXYED_SET.contains(beanName)) {
                    return bean;
                }
                interceptor = null;
                // Check whether a TCC agent exists
                if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
                    // Create a TccActionInterceptor and add it to the listener
                    interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
                    ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                        (ConfigurationChangeListener)interceptor);
                } else {
                    // Get the target classClass<? > serviceInterface = SpringProxyUtils.findTargetClass(bean);// Class<? >[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);The GlobalTransactional annotation does not exist
                    if(! existsAnnotation(newClass[]{serviceInterface}) && ! existsAnnotation(interfacesIfJdk)) {return bean;
                    }
					
                    / / if there is a note, to create a GlobalTransactionalInterceptor for transaction interception
                    if (interceptor == null) {
                        if (globalTransactionalInterceptor == null) {
                            globalTransactionalInterceptor = newGlobalTransactionalInterceptor(failureHandlerHook); ConfigurationCache.addConfigListener( ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)globalTransactionalInterceptor); } interceptor = globalTransactionalInterceptor; }}// If it is not an AOP proxy, put it back directly, if not, do the actual AOP processing
                if(! AopUtils.isAopProxy(bean)) { bean =super.wrapIfNecessary(bean, beanName, cacheKey);
                } else {
                    AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
                    Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null.null.null));
                    for (Advisor avr : advisor) {
                        advised.addAdvisor(0, avr);
                    }
                }
                PROXYED_SET.add(beanName);
                returnbean; }}catch (Exception exx) {
            throw newRuntimeException(exx); }}Copy the code

PS: The role of AdvisedSupport

In short, this class is proxied. An AOP proxy is used to wrap the BeanPostProcessor implementation of each qualified bean, delegating to a specified interceptor before calling the bean itself.

ConfigurationChangeListener

@Override
public void onChangeEvent(ConfigurationChangeEvent event) {
    if (ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION.equals(event.getDataId())) {
        disableGlobalTransaction = Boolean.parseBoolean(event.getNewValue().trim());
        if(! disableGlobalTransaction && initialized.compareAndSet(false.true)) {
            // As you can see, after the modification, the client will be reinitializedinitClient(); }}}Copy the code

InitializingBean

public void afterPropertiesSet(a) {
	ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
            (ConfigurationChangeListener)this);
	if (disableGlobalTransaction) {
            return;
	}
	if (initialized.compareAndSet(false.true)) {
            // initClient is called during initializationinitClient(); }}Copy the code

You can see here that the TM and RM Client are initialized

private void initClient(a) {

    if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
            throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
    }
    //init TM
    TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);

    //init RM
    RMClient.init(applicationId, txServiceGroup);

    // Register the destruct hook
    registerSpringShutdownHook();

}
Copy the code

ApplicationContextAware

Notification setting the ApplicationContext is done here with Aware

public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
	this.applicationContext = applicationContext;
	this.setBeanFactory(applicationContext);
}
Copy the code

DisposableBean

public void destroy(a) {
    ShutdownHook.getInstance().destroyAll();
}

private void registerSpringShutdownHook(a) {
    if (applicationContext instanceof ConfigurableApplicationContext) {
        ((ConfigurableApplicationContext) applicationContext).registerShutdownHook();
        ShutdownHook.removeRuntimeShutdownHook();
    }
    ShutdownHook.getInstance().addDisposable(TmNettyRemotingClient.getInstance(applicationId, txServiceGroup));
    ShutdownHook.getInstance().addDisposable(RmNettyRemotingClient.getInstance(applicationId, txServiceGroup));
}

Copy the code

3.1.4 SeataDataSourceBeanPostProcessor

public class SeataDataSourceBeanPostProcessor implements BeanPostProcessor {

    private static final Logger LOGGER = LoggerFactory.getLogger(SeataDataSourceBeanPostProcessor.class);

    private final List<String> excludes;
    private final BranchType dataSourceProxyMode;

    public SeataDataSourceBeanPostProcessor(String[] excludes, String dataSourceProxyMode) {
        this.excludes = Arrays.asList(excludes);
        this.dataSourceProxyMode = BranchType.XA.name().equalsIgnoreCase(dataSourceProxyMode) ? BranchType.XA : BranchType.AT;
    }

    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) {
        return bean;
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if (bean instanceof DataSource) {
            // When not excluded, place and initialize the proxy
            if(! excludes.contains(bean.getClass().getName())) {// Only put and initialize the agent, do not return the agent
                DataSourceProxyHolder.get().putDataSource((DataSource) bean, dataSourceProxyMode);
            }

            // If it is SeataDataSourceProxy, the original data source is returned
            if (bean instanceof SeataDataSourceProxy) {
                return((SeataDataSourceProxy) bean).getTargetDataSource(); }}returnbean; }} SeataDataSourceProxy is an interface that provides multiple methods:public interface SeataDataSourceProxy extends DataSource {

    // Get the target data source.
    DataSource getTargetDataSource(a);

    // Get the branching type.
    BranchType getBranchType(a); } And the proxy class has the following implementations:Copy the code

SeataAutoDataSourceProxyCreator

public class SeataAutoDataSourceProxyCreator extends AbstractAutoProxyCreator {
    private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoDataSourceProxyCreator.class);
    private final List<String> excludes;
    private final Advisor advisor;

    public SeataAutoDataSourceProxyCreator(boolean useJdkProxy, String[] excludes, String dataSourceProxyMode) {
        this.excludes = Arrays.asList(excludes);
        this.advisor = new DefaultIntroductionAdvisor(newSeataAutoDataSourceProxyAdvice(dataSourceProxyMode)); setProxyTargetClass(! useJdkProxy); }@Override
    protectedObject[] getAdvicesAndAdvisorsForBean(Class<? > beanClass, String beanName, TargetSource customTargetSource)throws BeansException {
        return new Object[]{advisor};
    }

    @Override
    protected boolean shouldSkip(Class
        beanClass, String beanName) {
        return !DataSource.class.isAssignableFrom(beanClass) ||
            SeataProxy.class.isAssignableFrom(beanClass) ||
            excludes.contains(beanClass.getName());
    }
}

Copy the code

conclusion

In fact the whole article is one of the most important operation is to construct a globalTransactionalInterceptor, in subsequent main process we will use the interceptor

Note that each annotated method is handled by an interceptor.