I. Introduction to Seata

In January 2019, Alibaba middleware team launched the open source project Fescar (Fast & EaSy Commit And Rollback), And Ant Financial later contributed TCC mode in Fescar 0.4.0 version. Later renamed Seata, which stands for Simple Extensible Autonomous Transaction Architecture, is a one-stop distributed Transaction solution.

Seata has three basic components:

Transaction Coordinator (TC) : A Transaction Coordinator that maintains the running status of global transactions and coordinates and drives the commit or rollback of global transactions.

Transaction Manager (TM) : Controls the boundaries of a global Transaction, is responsible for starting a global Transaction, and ultimately initiating a global commit or rollback resolution.

Resource Manager (RM) : Controls branch transactions, is responsible for branch registration, status reporting, receives transaction coordinator instructions, and drives the commit and rollback of branch (local) transactions.

Seata official call flow chart:

Ii. Advantages and disadvantages of Fescar compared with XA in the second stage:

Advantages:

  • Automatic compensation is realized based on SQL parsing to reduce service intrusion.
  • The first phase of a local transaction commits, and the second phase of a COMMIT is an asynchronous operation that is more efficient than XA holding all resources in two segments.
  • Fescar offers two modes, AT and MT. In AT mode, the transaction resource can be any DATABASE that supports ACID. In MT mode, the transaction resource is unlimited. It can be cache, file, etc. Of course, the two patterns can also be mixed.
  • Global Lock The global lock implements write and read isolation.
  • Undolog logs are automatically cleared

Disadvantages:

  • Code intrusion is the configuration and annotation of Fescar’s data broker, and each business library requires an Undolog table.
  • The performance loss can be seen from the call diagram: Insert undo log (Undolog blob), before image (query), before image (query), before image (query), before commit (query) Determine lock conflicts); In order to automatically compensate in the Undolog table to spend a lot of overhead, and the trigger probability is relatively low.
  • The two-stage COMMIT also consumes system resources.
  • In the two-phase rollback, Undolog of each node must be deleted to release the global lock.

Third, the experiment

In this experiment, the official SpringCloud-Eureka-Feign-Mybatis – Seata project was used to simulate the remote call timeout exception. Through the localhost: 8180 / order/create? UserId =1&productId=1&count=10&money=100 Trigger the process, order local create order call, remote storage deduction inventory, remote deduction account balance simulate the timeout exception. The following shows the log information in abnormal cases:

Strengthen the blocker GlobalTransactionalInterceptor OrderServerApplication log shows the affairs

i.seata.tm.api.DefaultGlobalTransaction  : Begin new global transaction [192.1683.2:8091:2044579200] i. eata. Sample. Service. OrderServiceImpl: -- -- -- -- -- -- -- > trading began i. eata. Sample. Service. OrderServiceImpl: -- -- -- -- -- -- -- > deductions account to start the order in i.s.core.rpc.net. Ty RmMessageListener: onMessage: xid =192.1683.2:8091:2044579200,branchId=2044579202,branchType=AT,resourceId=jdbc:mysql:/ / 127.0.0.1 / seat - order, applicationData = null
 io.seata.rm.AbstractRMHandler            : Branch Rollbacking: 192.1683.2:8091:2044579200 2044579202 jdbc:mysql:/ / 127.0.0.1 / seat - order
 i.s.r.d.undo.AbstractUndoLogManager      : xid 192.1683.2:8091:2044579200 branch 2044579202, undo_log deleted with GlobalFinished
 io.seata.rm.AbstractRMHandler            : Branch Rollbacked result: PhaseTwo_Rollbacked
 i.seata.tm.api.DefaultGlobalTransaction  : [192.1683.2:8091:2044579200] rollback status: Rollbacked
 o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is feign.RetryableException: Read timed out executing GET http://account-server/account/decrease? userId=1&money=100] with root cause

java.net.SocketTimeoutException: Read timed out
	at java.net.SocketInputStream.socketRead0(Native Method) ~[na:1.8.0_231]
	at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) ~[na:1.8.0_231]
	at java.net.SocketInputStream.read(SocketInputStream.java:171) ~[na:1.8.0_231]
	at java.net.SocketInputStream.read(SocketInputStream.java:141) ~[na:1.8.0_231]
	at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) ~[na:1.8.0_231]
	at java.io.BufferedInputStream.read1(BufferedInputStream.java:286) ~[na:1.8.0_231]
	at java.io.BufferedInputStream.read(BufferedInputStream.java:345) ~[na:1.8.0_231]
	at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:735) ~[na:1.8.0_231]
	at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:678) ~[na:1.8.0_231]
	at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1593) ~[na:1.8.0_231]
	at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1498) ~[na:1.8.0_231]
	at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480) ~[na:1.8.0_231]
	at feign.Client$Default.convertResponse(Client.java:143) ~[feign-core-10.2.3.jar:na]
	at feign.Client$Default.execute(Client.java:68) ~[feign-core-10.2.3.jar:na]
	at com.alibaba.cloud.seata.feign.SeataFeignClient.execute(SeataFeignClient.java:57) ~[spring-cloud-alibaba-seata-2.1.0.RELEASE.jar:2.1.0.RELEASE]
	at org.springframework.cloud.openfeign.ribbon.FeignLoadBalancer.execute(FeignLoadBalancer.java:93) ~[spring-cloud-openfeign-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
	at org.springframework.cloud.openfeign.ribbon.FeignLoadBalancer.execute(FeignLoadBalancer.java:56) ~[spring-cloud-openfeign-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
	at com.netflix.client.AbstractLoadBalancerAwareClient$1.call(AbstractLoadBalancerAwareClient.java:104) ~[ribbon-loadbalancer-2.3.0.jar:2.3.0]
	at com.netflix.loadbalancer.reactive.LoadBalancerCommand$3$1.call(LoadBalancerCommand.java:303) ~[ribbon-loadbalancer-2.3.0.jar:2.3.0]
	at com.netflix.loadbalancer.reactive.LoadBalancerCommand$3$1.call(LoadBalancerCommand.java:287) ~[ribbon-loadbalancer-2.3.0.jar:2.3.0]
	at rx.internal.util.ScalarSynchronousObservable$3.call(ScalarSynchronousObservable.java:231) ~[rxjava-1.3.8.jar:1.3.8]
	at rx.internal.util.ScalarSynchronousObservable$3.call(ScalarSynchronousObservable.java:228) ~[rxjava-1.3.8.jar:1.3.8]
	at rx.Observable.unsafeSubscribe(Observable.java:10327) ~[rxjava-1.3.8.jar:1.3.8]
	at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.drain(OnSubscribeConcatMap.java:286) ~[rxjava-1.3.8.jar:1.3.8]
	at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.onNext(OnSubscribeConcatMap.java:144) ~[rxjava-1.3.8.jar:1.3.8]
	at com.netflix.loadbalancer.reactive.LoadBalancerCommand$1.call(LoadBalancerCommand.java:185) ~[ribbon-loadbalancer-2.3.0.jar:2.3.0]
	at com.netflix.loadbalancer.reactive.LoadBalancerCommand$1.call(LoadBalancerCommand.java:180) ~[ribbon-loadbalancer-2.3.0.jar:2.3.0]
	at rx.Observable.unsafeSubscribe(Observable.java:10327) ~[rxjava-1.3.8.jar:1.3.8]
	at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:94) ~[rxjava-1.3.8.jar:1.3.8]
	at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:42) ~[rxjava-1.3.8.jar:1.3.8]
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48) ~[rxjava-1.3.8.jar:1.3.8]
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) ~[rxjava-1.3.8.jar:1.3.8]
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48) ~[rxjava-1.3.8.jar:1.3.8]
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) ~[rxjava-1.3.8.jar:1.3.8]
	at rx.Observable.subscribe(Observable.java:10423) ~[rxjava-1.3.8.jar:1.3.8]
	at rx.Observable.subscribe(Observable.java:10390) ~[rxjava-1.3.8.jar:1.3.8]
	at rx.observables.BlockingObservable.blockForSingle(BlockingObservable.java:443) ~[rxjava-1.3.8.jar:1.3.8]
	at rx.observables.BlockingObservable.single(BlockingObservable.java:340) ~[rxjava-1.3.8.jar:1.3.8]
	at com.netflix.client.AbstractLoadBalancerAwareClient.executeWithLoadBalancer(AbstractLoadBalancerAwareClient.java:112) ~[ribbon-loadbalancer-2.3.0.jar:2.3.0]
	at org.springframework.cloud.openfeign.ribbon.LoadBalancerFeignClient.execute(LoadBalancerFeignClient.java:83) ~[spring-cloud-openfeign-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
	at com.alibaba.cloud.seata.feign.SeataLoadBalancerFeignClient.execute(SeataLoadBalancerFeignClient.java:56) ~[spring-cloud-alibaba-seata-2.1.0.RELEASE.jar:2.1.0.RELEASE]
	at feign.SynchronousMethodHandler.executeAndDecode(SynchronousMethodHandler.java:108) ~[feign-core-10.2.3.jar:na]
	at feign.SynchronousMethodHandler.invoke(SynchronousMethodHandler.java:78) ~[feign-core-10.2.3.jar:na]
	at feign.ReflectiveFeign$FeignInvocationHandler.invoke(ReflectiveFeign.java:103) ~[feign-core-10.2.3.jar:na]
	at com.sun.proxy.$Proxy111.decrease(Unknown Source) ~[na:na]
	at io.seata.sample.service.OrderServiceImpl.create(OrderServiceImpl.java:50) ~[classes/:na] at io.seata.sample.service.OrderServiceImpl? FastClassBySpringCGLIB?3d2d368a.invoke(<generated>) ~[classes/:na]
	at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) ~[spring-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:749) ~[spring-aop-5.1.9.RELEASE.jar:5.1.9.RELEASE]
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.1.9.RELEASE.jar:5.1.9.RELEASE]
	at io.seata.spring.annotation.GlobalTransactionalInterceptor$1.execute(GlobalTransactionalInterceptor.java:109) ~[seata-all-1.2.0.jar:1.2.0]
	at io.seata.tm.api.TransactionalTemplate.execute(TransactionalTemplate.java:104) ~[seata-all-1.2.0.jar:1.2.0]
	at io.seata.spring.annotation.GlobalTransactionalInterceptor.handleGlobalTransaction(GlobalTransactionalInterceptor.java:106) ~[seata-all-1.2.0.jar:1.2.0]
	at io.seata.spring.annotation.GlobalTransactionalInterceptor.invoke(GlobalTransactionalInterceptor.java:83) ~[seata-all-1.2.0.jar:1.2.0]
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.1.9.RELEASE.jar:5.1.9.RELEASE]
	at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688) ~[spring-aop-5.1.9.RELEASE.jar:5.1.9.RELEASE] at io.seata.sample.service.OrderServiceImpl? EnhancerBySpringCGLIB?9c1f4d2e.create(<generated>) ~[classes/:na]
	at io.seata.sample.controller.OrderController.create(OrderController.java:29) ~[classes/:na] ........ Omit abnormalCopy the code

StorageServerApplication Log displays the transaction Branch Branch Rollbacked

I.S.S ample. Service. StorageServiceImpl: -- -- -- -- -- -- -- > deductions inventory start I.S.S ample. Service. StorageServiceImpl: -- -- -- -- -- -- -- > deductions inventory end C.A.C.S eata. Web. SeataHandlerInterceptor: xid in change during the RPC the from192.1683.2:8091:2044579200 to null
i.s.core.rpc.netty.RmMessageListener     : onMessage:xid=192.1683.2:8091:2044579200,branchId=2044579204,branchType=AT,resourceId=jdbc:mysql:/ / 127.0.0.1 / seat - storage, applicationData = null
io.seata.rm.AbstractRMHandler            : Branch Rollbacking: 192.1683.2:8091:2044579200 2044579204 jdbc:mysql:/ / 127.0.0.1 / seat - storage
i.s.r.d.undo.AbstractUndoLogManager      : xid 192.1683.2:8091:2044579200 branch 2044579204, undo_log deleted with GlobalFinished           : Branch Rollbacked result: PhaseTwo_Rollbacked
Copy the code

An SQL exception occurs in the AccountServerApplication log

I.S.S ample. Service. AccountServiceImpl: -- -- -- -- -- -- -- > deductions in the account to start the account I.S.R.D.E xec. AbstractDMLBaseExecutor: execute executeAutoCommitTrue error:io.seata.core.exception.RmTransactionException: The Response [TransactionException [192.168.3.2:8091-2044579200]]. Java SQL. SQLException: IO. Seata. Core. Exception. RmTransactionException: Response [TransactionException [192.168.3.2:8091-2044579200]]... Omit some important exception stack information 23:31:56 2020-05-30. 652 WARN 5960 - [nio - 8181 - exec - 2] C.A.C.S eata. Web. SeataHandlerInterceptor: Xid in change during RPC from 192.168.3.2:8091:2044579200 to null 23:31:56 2020-05-30. 5960-654 ERROR [nio-8181-exec-2] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.jdbc.UncategorizedSQLException: ### Error updating database. Cause: java.sql.SQLException: io.seata.core.exception.RmTransactionException: The Response [TransactionException [192.168.3.2:8091-2044579200]] # # # The error may exist in The file [E:\document\GitHub\seata-samples-master\springcloud-eureka-feign-mybatis-seata\account-server\target\classes\mapper\Acc ountMapper.xml] ### The error may involve defaultParameterMap ### The error occurred while setting parameters ### SQL: UPDATE account SET residue = residue - ?,used = used + ? where user_id = ?; ### Cause: java.sql.SQLException: io.seata.core.exception.RmTransactionException: Response [TransactionException [192.168.3.2:8091-2044579200]]; uncategorized SQLException; SQL state (null), the error code  [0]; io.seata.core.exception.RmTransactionException: Response [TransactionException [192.168.3.2:8091-2044579200]]; nested exception is Java, SQL. SQLException: io.seata.core.exception.RmTransactionException: The Response [TransactionException [192.168.3.2:8091-2044579200]]] with the root causeCopy the code

Distributed transaction common module

1, create project common_fescar, import dependencies

<properties>
    <fescar.version>0.4.2</fescar.version>
</properties>
<dependencies>
    <dependency>
        <groupId>com.alibaba.fescar</groupId>
        <artifactId>fescar-tm</artifactId>
        <version>${fescar.version}</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba.fescar</groupId>
        <artifactId>fescar-spring</artifactId>
        <version>${fescar.version}</version>
    </dependency>
</dependencies>

Copy the code

2,Fescar configuration fileCopy it to the Resources project

Resource providers bind one XID per thread

public class FescarRMRequestFilter extends OncePerRequestFilter {

    private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger( FescarRMRequestFilter.class);

    /** * bind an XID * to each thread request@param request
     * @param response
     * @param filterChain
     */
    @Override
    protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {
        String currentXID = request.getHeader( FescarAutoConfiguration.FESCAR_XID);
        if(! StringUtils.isEmpty(currentXID)){ RootContext.bind(currentXID); LOGGER.info("Current thread bound XID :" + currentXID);
        }
        try{
            filterChain.doFilter(request, response);
        } finally {
            String unbindXID = RootContext.unbind();
            if(unbindXID ! =null){
                LOGGER.info("Current thread unbinds XID from specified XID :" + unbindXID);
                if(! currentXID.equals(unbindXID)){ LOGGER.info("XID of current thread changed"); }}if(currentXID ! =null){
                LOGGER.info("XID of current thread changed"); }}}}Copy the code

RestInterceptor filters forward xids to other microservices on each request

public class FescarRestInterceptor implements RequestInterceptor.ClientHttpRequestInterceptor {

    @Override
    public void apply(RequestTemplate requestTemplate) {
        String xid = RootContext.getXID();
        if(!StringUtils.isEmpty(xid)){
            requestTemplate.header( FescarAutoConfiguration.FESCAR_XID, xid);
        }
    }

    @Override
    public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) throws IOException {
        String xid = RootContext.getXID();
        if(! StringUtils.isEmpty(xid)){ HttpHeaders headers = request.getHeaders(); headers.put( FescarAutoConfiguration.FESCAR_XID, Collections.singletonList(xid)); }returnexecution.execute(request, body); }}Copy the code

5. Create FescarAutoConfiguration

/** ** Create a data source ** define a global transaction manager scan object ** Add headers to all restTemplates to prevent invocation issues between microservices */
@Configuration
public class FescarAutoConfiguration {

    public static final String FESCAR_XID = "fescarXID";

    /*** * Create the proxy database *@param environment
     * @return* /
    @Bean
    public DataSource dataSource(Environment environment){
        DruidDataSource dataSource = new DruidDataSource();
        dataSource.setUrl(environment.getProperty("spring.datasource.url"));
        try {
            dataSource.setDriver(DriverManager.getDriver(environment.getProperty("spring.datasource.url")));
        } catch (SQLException e) {
            throw new RuntimeException("can't recognize dataSource Driver");
        }
        dataSource.setUsername(environment.getProperty("spring.datasource.username"));
        dataSource.setPassword(environment.getProperty("spring.datasource.password"));
        return new DataSourceProxy(dataSource);
    }

    /*** * Global transaction scanner * is used to parse with@GlobalTransactionalAnnotation method, and then use AOP mechanisms to control transactions *@param environment
     * @return* /
    @Bean
    public GlobalTransactionScanner globalTransactionScanner(Environment environment){
        String applicationName = environment.getProperty("spring.application.name");
        String groupName = environment.getProperty("fescar.group.name");
        if(applicationName == null) {return new GlobalTransactionScanner(groupName == null ? "my_test_tx_group" : groupName);
        }else{
            return new GlobalTransactionScanner(applicationName, groupName == null ? "my_test_tx_group": groupName); }}/*** * Every time microservices and microservices call each other * To control the global transaction, TM will request TC to generate an XID every time, and this XID will be passed to the next transaction, that is, to call other microservices * so we can get the XID in the header every time we request. And passes the XID to the next microservice *@param restTemplates
     * @return* /
    @ConditionalOnBean({RestTemplate.class})
    @Bean
    public Object addFescarInterceptor(Collection<RestTemplate> restTemplates){
        restTemplates.stream()
                .forEach(restTemplate -> {
                    List<ClientHttpRequestInterceptor> interceptors = restTemplate.getInterceptors();
                    if(interceptors ! =null){ interceptors.add(fescarRestInterceptor()); }});return new Object();
    }

    @Bean
    public FescarRMRequestFilter fescarRMRequestFilter(a){
        return new FescarRMRequestFilter();
    }

    @Bean
    public FescarRestInterceptor fescarRestInterceptor(a){
        return newFescarRestInterceptor(); }}Copy the code

6. Remember to create a new Undolog table for each database involved in distributed transactions

5. Reference materials

  1. Official experimental Demo collection
  2. This blog experiment used Demo
  3. Application practice of distributed transaction framework Fescar in SpringCloud environment