>>>> 😜😜😜 Making: 👉 github.com/black-ant

A. The preface

Seata: Session initialization for server-side transactions. Seata: Session initialization for server-side transactions


The entire Session will be processed for two operations, one for global_table and the other for branch_table.

Pro 1: Function of global_table Global_table is used to persist global transactions and can be configured using store.db.global.table

CREATE TABLE `global_table` (
  `xid` varchar(128) NOT NULL,
  `transaction_id` bigint(20) DEFAULT NULL,
  `status` tinyint(4) NOT NULL,
  `application_id` varchar(32) DEFAULT NULL,
  `transaction_service_group` varchar(32) DEFAULT NULL,
  `transaction_name` varchar(128) DEFAULT NULL,
  `timeout` int(11) DEFAULT NULL,
  `begin_time` bigint(20) DEFAULT NULL,
  `application_data` varchar(2000) DEFAULT NULL,
  `gmt_create` datetime DEFAULT NULL,
  `gmt_modified` datetime DEFAULT NULL,
  PRIMARY KEY (`xid`),
  KEY `idx_gmt_modified_status` (`gmt_modified`,`status`),
  KEY `idx_transaction_id` (`transaction_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Copy the code

Pro 2: Branch_table function Branch_table is used to identify branch transactions and can be configured using store.db.branch

CREATE TABLE `branch_table` (
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(128) NOT NULL,
  `transaction_id` bigint(20) DEFAULT NULL,
  `resource_group_id` varchar(32) DEFAULT NULL,
  `resource_id` varchar(256) DEFAULT NULL,
  `branch_type` varchar(8) DEFAULT NULL,
  `status` tinyint(4) DEFAULT NULL,
  `client_id` varchar(64) DEFAULT NULL,
  `application_data` varchar(2000) DEFAULT NULL,
  `gmt_create` datetime(6) DEFAULT NULL,
  `gmt_modified` datetime(6) DEFAULT NULL,
  PRIMARY KEY (`branch_id`),
  KEY `idx_xid` (`xid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Copy the code

Global_table and branch_TABLE are related to primary and branch transactions. When a service is running, a piece of data is created in global_table, and each database operation unit (RM transaction), a piece of branch_TABLE data is created.

2. Session data trend

To look at the data flow of the entire process, first look at the overall processing logic

C1-netty Request Send a Request c2-AbstracttCinboundHandler process the Request A Handler C4 C3 - GlobalSession open affairs - AbstractSessionManager abstract level management Session C5 - DataBaseTransactionStoreManager actually Session management C6- LogStoreDataBaseDAO persistence processingCopy the code

2.1 Session data entry

In the previous document, have learned through DataBaseTransactionStoreManager # writeSession opens the Session handling

Each if has a specific operation type, which is divided into two parts: Global and Branch
public boolean writeSession(LogOperation logOperation, SessionStorable session) {
    if (LogOperation.GLOBAL_ADD.equals(logOperation)) {
        return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
    } else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {
        return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
    } else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {
        return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
    // Handle BRANCH operations
    } else if (LogOperation.BRANCH_ADD.equals(logOperation)) {
        return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
    } else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {
        return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
    } else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {
        return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
    } else {
    
        throw new StoreException("Unknown LogOperation:"+ logOperation.name()); }}// You can see that there are multiple types of judgment by if, and look at the overall call logic
Copy the code

2.2 Session Data Management

Session management is implemented through abstractssession Manager and its corresponding classes. The abstractssession Manager provides the following interfaces to process sessions

public interface SessionManager extends SessionLifecycleListener.Disposable {

    / / Global section
    void addGlobalSession(GlobalSession session) throws TransactionException;
    GlobalSession findGlobalSession(String xid) ;
    GlobalSession findGlobalSession(String xid, boolean withBranchSessions);
    void updateGlobalSessionStatus(GlobalSession session, GlobalStatus status) throws TransactionException;
    void removeGlobalSession(GlobalSession session) throws TransactionException;
    
    / / Branch
    void addBranchSession(GlobalSession globalSession, BranchSession session) throws TransactionException;
    void updateBranchSessionStatus(BranchSession session, BranchStatus status) throws TransactionException;
    void removeBranchSession(GlobalSession globalSession, BranchSession session) throws TransactionException;

    Collection<GlobalSession> allSessions(a);
    // Timed processing
    List<GlobalSession> findGlobalSessions(SessionCondition condition);
    // Execute after locking
    <T> T lockAndExecute(GlobalSession globalSession, GlobalSession.LockCallable<T> lockCallable)
            throws TransactionException;
    
    / / time lock
    default boolean scheduledLock(String key) {
        return true;
    }

    default boolean unScheduledLock(String key) {
        return true; }}In contrast, Seata provides multiple implementation classes for SessionManger
@LoadLevel(name = "db", scope = Scope.PROTOTYPE)
public class DataBaseSessionManager extends AbstractSessionManager implements Initialize 

@LoadLevel(name = "file", scope = Scope.PROTOTYPE)
public class FileSessionManager extends AbstractSessionManager implements Reloadable 

@LoadLevel(name = "redis", scope = Scope.PROTOTYPE)
public class RedisSessionManager extends AbstractSessionManager implements Initialize 
    
    
    
Copy the code

2.3 Session Creation

When the first service is executed, the logical process of addGlobalSession is initiated. The Session is created using the SessionManager

  • C- AbstractTCInboundHandler # handle
  • C- DefaultCoordinator # doGlobalBegin
  • C- DefaultCore # begin
  • C- GlobalSession # begin
  • C- AbstractSessionManager # onBegin
  • C- DataBaseSessionManager # addGlobalSession
// Step temporary: onRequest
AbstractResultMessage onRequest(AbstractMessage request, RpcContext context)        
        
// Step 1: Enable global transactions -> business-service-seata-service-group
C- DefaultCoordinator
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
        throws TransactionException {
	response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
            request.getTransactionName(), request.getTimeout()));
}     

// Step 2 : DefaultCore
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
        throws TransactionException {
    / / open GlobalSession
	GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,
            timeout);
	MDC.put(RootContext.MDC_KEY_XID, session.getXid());
	session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());

	session.begin();

	// Publish the transaction GlobalTransactionEvent
	eventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
            session.getTransactionName(), applicationId, transactionServiceGroup, session.getBeginTime(), null, session.getStatus()));

	return session.getXid();
}



// Step 3: Add the GlobalSession C-DatabasesessionManager
public void addGlobalSession(GlobalSession session) throws TransactionException {
    if (StringUtils.isBlank(taskName)) {
        boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_ADD, session);
        if(! ret) {throw new StoreException("addGlobalSession failed."); }}else {
        boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_UPDATE, session);
        if(! ret) {throw new StoreException("addGlobalSession failed."); }}}/ / Step 4: build convertGlobalTransactionDO
return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
{
	"applicationId": "business-seata-example"."beginTime": 1626253527508."status": 1."timeout": 300000."transactionId": 4386660905323928286."transactionName": "dubbo-gts-seata-example"."transactionServiceGroup": "business-service-seata-service-group"."xid": "192.168.181.2:8091:4386660905323928286"
}


// Step 5: Insert into SQL with the following parameters
public boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) {
    String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertGlobalTransactionSQL(globalTable);
    Connection conn = null;
    PreparedStatement ps = null;
    try {
        conn = logStoreDataSource.getConnection();
        conn.setAutoCommit(true);
        ps = conn.prepareStatement(sql);
        ps.setString(1."192.168.181.2:8091:4386660905323928286");
        ps.setLong(2.4386660905323928286);
        ps.setInt(3.1);
        ps.setString(4."business-seata-example");
        ps.setString(5."business-service-seata-service-group");
        String transactionName = globalTransactionDO.getTransactionName();
        transactionName = transactionName.length() > transactionNameColumnSize ? transactionName.substring(0,
            transactionNameColumnSize) : transactionName;
        ps.setString(6."dubbo-gts-seata-example");
        ps.setInt(7.300000);
        ps.setLong(8.1626253527508);
        ps.setString(9.null);
        return ps.executeUpdate() > 0;
    } catch (SQLException e) {
        throw new StoreException(e);
    } finally{ IOUtil.close(ps, conn); }}Copy the code

2.4 Usage of Session data

After looking at the management, look at the whole process process, how to implement the Session read and use processAbstractssession Manager: abstractsSession Manager: AbstractsSession Manager: AbstractsSession Manager: AbstractsSession Manager: AbstractsSession Manager: AbstractsSession Manager: AbstractsSession Manager
    
// Path 1: RetryRollback Rollback
1. DefaultCoordinator # init
2.DefaultCoordinator # handleRetryRollbacking: Processes the Rollback logic for retry3.DataBaseSessionManager # allSessions: Process sessionHolder. RETRY_ROLLBACKING_SESSION_MANAGER_NAME4. DataBaseSessionManager # findGlobalSessions 
    
// The same process is used for the following:
1. retryCommitting
2. timeoutCheck
3. asyncCommitting
    
    
// Approach 2: Session business processWhat happens in normal logic, and we'll see later, in the business logic as a whole, how does the Session functionCopy the code

2.4.1 Obtaining the Session

(DataBaseSessionManager # readSession); (DataBaseSessionManager # readSession)

  • GlobalSession findGlobalSession(String xid)
  • GlobalSession findGlobalSession(String xid, boolean withBranchSessions)
  • List findGlobalSessions(SessionCondition Condition) : Based on scheduled tasks

// Step 1: findSession logic
public GlobalSession findGlobalSession(String xid, boolean withBranchSessions) {
    // This is primarily a call to ReadSession
    return transactionStoreManager.readSession(xid, withBranchSessions);
    // transactionStoreManager.readSession(condition)
}


// Step 2: readSession operation
public GlobalSession readSession(String xid, boolean withBranchSessions) {
        / / query GlobalTransactionDO
        GlobalTransactionDO globalTransactionDO = logStore.queryGlobalTransactionDO(xid);
        if (globalTransactionDO == null) {
            return null;
        }
        //branch transactions
        List<BranchTransactionDO> branchTransactionDOs = null;
        //reduce rpc with db when branchRegister and getGlobalStatus
        if (withBranchSessions) {
            branchTransactionDOs = logStore.queryBranchTransactionDO(globalTransactionDO.getXid());
        }
        return getGlobalSession(globalTransactionDO, branchTransactionDOs);
}

// Step 2-1: Call the specific queryBranchTransactionDO
// The LogStore implementation class LogStoreDataBaseDAO is used here
public List<GlobalTransactionDO> queryGlobalTransactionDO(int[] statuses, int limit) {

    conn = logStoreDataSource.getConnection();
    // Automatically commit transactions
    conn.setAutoCommit(true);
        
    String paramsPlaceHolder = org.apache.commons.lang.StringUtils.repeat("?".",", statuses.length);
    // select xid, transaction_id, status, application_id, transaction_service_group, 
    // transaction_name, timeout, begin_time, application_data, gmt_create, gmt_modified  
    // from global_table where status in (? ,? ,? ,?) order by gmt_modified limit ?
    String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getQueryGlobalTransactionSQLByStatus(globalTable, paramsPlaceHolder);
    // Initiate SQL processing
    ps = conn.prepareStatement(sql);
        
}

// Step 2-2: Convert GlobalTransactionDO to GlobalSession
private GlobalSession getGlobalSession(GlobalTransactionDO globalTransactionDO, List
       
         branchTransactionDOs)
        {
        GlobalSession globalSession = SessionConverter.convertGlobalSession(globalTransactionDO);
        //branch transactions
        if (CollectionUtils.isNotEmpty(branchTransactionDOs)) {
            // Join branch transaction BranchTransactionDO
            for(BranchTransactionDO branchTransactionDO : branchTransactionDOs) { globalSession.add(SessionConverter.convertBranchSession(branchTransactionDO)); }}return globalSession;
}

// A simple Session query process is finished, and the corresponding SessionCondition and statuses query

public List<GlobalSession> readSession(GlobalStatus[] statuses) {
        int[] states = new int[statuses.length];
        for (int i = 0; i < statuses.length; i++) {
            states[i] = statuses[i].getCode();
        }
        // Query all globalTransactionDos corresponding to status
        List<GlobalTransactionDO> globalTransactionDOs = logStore.queryGlobalTransactionDO(states, logQueryLimit);
        if (CollectionUtils.isEmpty(globalTransactionDOs)) {
            return null;
        }
        List<String> xids = globalTransactionDOs.stream().map(GlobalTransactionDO::getXid).collect(Collectors.toList());
        // Get all branch transactions
        List<BranchTransactionDO> branchTransactionDOs = logStore.queryBranchTransactionDO(xids);
        Map<String, List<BranchTransactionDO>> branchTransactionDOsMap = branchTransactionDOs.stream()
            .collect(Collectors.groupingBy(BranchTransactionDO::getXid, LinkedHashMap::new, Collectors.toList()));
       // Return the Gloabl collection
       return globalTransactionDOs.stream().map(globalTransactionDO ->
            getGlobalSession(globalTransactionDO, branchTransactionDOsMap.get(globalTransactionDO.getXid())))
            .collect(Collectors.toList());
}

public List<GlobalSession> readSession(SessionCondition sessionCondition) {
        // The Xid or TransactionId is provided for the timing case
        if (StringUtils.isNotBlank(sessionCondition.getXid())) {
            GlobalSession globalSession = readSession(sessionCondition.getXid());
            if(globalSession ! =null) {
                List<GlobalSession> globalSessions = new ArrayList<>();
                globalSessions.add(globalSession);
                returnglobalSessions; }}else if(sessionCondition.getTransactionId() ! =null) {
            GlobalSession globalSession = readSession(sessionCondition.getTransactionId());
            if(globalSession ! =null) {
                List<GlobalSession> globalSessions = new ArrayList<>();
                globalSessions.add(globalSession);
                returnglobalSessions; }}else if (CollectionUtils.isNotEmpty(sessionCondition.getStatuses())) {
            return readSession(sessionCondition.getStatuses());
        }
        return null;
}






Copy the code

2.4.2 Adding a Session

 // After the Session is created, the subsequent BranchSession will join the GlobalSession -> see 3.1.3 Registering the master logic
Copy the code

2.5 Session Data Destruction

The main process is as follows:

  • C- SessionHelper # endRollbacked
  • C- GlobalSession # end()
  • C- GlobalSession # onEnd()
  • C- DataBaseSessionManager # removeGlobalSession
// DataBaseSessionManager

// Step 1: Destroy the entry DefaultCore # doGlobalCommit
 public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
    boolean success = true;
    // start committing event
    eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
        globalSession.getTransactionName(), globalSession.getApplicationId(), globalSession.getTransactionServiceGroup(),
        globalSession.getBeginTime(), null, globalSession.getStatus()));

    / /... Omit the COMMIT logic
    //If success and there is no branch, end the global transaction.
    if (success && globalSession.getBranchSessions().isEmpty()) {
        SessionHelper.endCommitted(globalSession);

        // committed event
        eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
            globalSession.getTransactionName(), globalSession.getApplicationId(), globalSession.getTransactionServiceGroup(),
            globalSession.getBeginTime(), System.currentTimeMillis(), globalSession.getStatus()));
    }
    return success;
}


// Step 2: Commit
public static void endCommitted(GlobalSession globalSession) throws TransactionException {
	globalSession.changeStatus(GlobalStatus.Committed);
	globalSession.end();
}

// Step 3: Close the lifecycle
public void end(a) throws TransactionException {
	// Clean locks first
	clean();

	for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
		lifecycleListener.onEnd(this); }}// Step 4: Remove global transactions
public void removeGlobalSession(GlobalSession session) throws TransactionException {
	boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_REMOVE, session);
	if(! ret) {throw new StoreException("removeGlobalSession failed."); }}Copy the code

3. Branch_table process

3.1 Branch_TABLE Creation Process

3.1.1 Registration entry

AbstractTCInboundHandler is an important class that processes all Branch logic. This class has an implementation class: DefaultCoordinator

C- AbstractTCInboundHandler # handle

public BranchRegisterResponse handle(BranchRegisterRequest request, final RpcContext rpcContext) {
        BranchRegisterResponse response = new BranchRegisterResponse();
        // Note that an AbstractCallback is created
        exceptionHandleTemplate(new AbstractCallback<BranchRegisterRequest, BranchRegisterResponse>() {
            @Override
            public void execute(BranchRegisterRequest request, BranchRegisterResponse response)
                throws TransactionException {
                try {
                    // Register Branch
                    doBranchRegister(request, response, rpcContext);
                } catch (StoreException e) {
                    throw new TransactionException(TransactionExceptionCode.FailedStore, String
                        .format("branch register request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()), e);
                }
            }
        }, request, response);
        return response;
}
 
Copy the code

3.1.2 doBranchRegister Main registration process

Each transaction unit registers a Branch,

C- DefaultCoordinator # doBranchRegister
protected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response, RpcContext rpcContext) throws TransactionException {
        MDC.put(RootContext.MDC_KEY_XID, request.getXid());
        response.setBranchId(
                core.branchRegister(request.getBranchType(), request.getResourceId(), rpcContext.getClientId(),
                        request.getXid(), request.getApplicationData(), request.getLockKey()));
}
Copy the code

3.1.3 Registering the master logic

public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {
    GlobalSession globalSession = assertGlobalSessionNotNull(xid, false);
    return SessionHolder.lockAndExecute(globalSession, () -> {
        globalSessionStatusCheck(globalSession);
        globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
        // Build the Branch business
        BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,
                    applicationData, lockKeys, clientId);
        MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId()));
        branchSessionLock(globalSession, branchSession);
        try {
            // Add the Branch service
            globalSession.addBranch(branchSession);
        } catch (RuntimeException ex) {
            branchSessionUnlock(branchSession);
            throw newBranchTransactionException(........) ; }return branchSession.getBranchId();
    });
}
Copy the code

3.1.4 add Branch

public void addBranch(BranchSession branchSession) throws TransactionException {
        for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
            lifecycleListener.onAddBranch(this, branchSession);
        }
        branchSession.setStatus(BranchStatus.Registered);
        add(branchSession);
}
    
    
public void addBranchSession(GlobalSession globalSession, BranchSession session) throws TransactionException {
        if (StringUtils.isNotBlank(taskName)) {
            return;
        }
        // At the core node again, call writeSession
        boolean ret = transactionStoreManager.writeSession(LogOperation.BRANCH_ADD, session);
        if(! ret) {throw new StoreException("addBranchSession failed."); }}// Omit the writeSession logic, but that's all
Copy the code

Take a look at the saved data structure

C- LogStoreDataBaseDAO 
public boolean insertBranchTransactionDO(BranchTransactionDO branchTransactionDO) {
        String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertBranchTransactionSQL(branchTable);
        Connection conn = null;
        PreparedStatement ps = null;
        try {
            conn = logStoreDataSource.getConnection();
            conn.setAutoCommit(true);
            ps = conn.prepareStatement(sql);
            ps.setString(1."192.168.181.2:8091:7214921829625028621");
            ps.setLong(2."7214921829625028621");
            ps.setLong(3."7214921829625028637");
            ps.setString(4.null);
            ps.setString(5."JDBC: mysql: / / 127.0.0.1:3306 / seata." ");
            ps.setString(6."AT");
            ps.setInt(7.0);
            ps.setString(8."The account - seata - example: 192.168.181.2:49676");
            ps.setString(9.null);
            return ps.executeUpdate() > 0;
        } catch (SQLException e) {
            throw new StoreException(e);
        } finally{ IOUtil.close(ps, conn); }}Copy the code

5. Supplement for other links

5.1 Deleting a BranchSession

BranchSession will be deleted in asyncCommit logic. The main process is as follows:

// Step 1: asyncCommit commit globalCommit
protected void handleAsyncCommitting(a) {
     SessionHelper.forEach(asyncCommittingSessions, asyncCommittingSession -> {
         asyncCommittingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
         core.doGlobalCommit(asyncCommittingSession, true);
    });
}


// Commit the Global Commit transaction
public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
    boolean success = true;
    // start committing event
    eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
        globalSession.getTransactionName(), globalSession.getApplicationId(), globalSession.getTransactionServiceGroup(),
        globalSession.getBeginTime(), null, globalSession.getStatus()));

    if (globalSession.isSaga()) {
        success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);
    } else {
        / / to omit...
                BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);

                switch (branchStatus) {
                    case PhaseTwo_Committed:
                        globalSession.removeBranch(branchSession);
                        return CONTINUE;
                    case PhaseTwo_CommitFailed_Unretryable:
                        if (globalSession.canBeCommittedAsync()) {

                            return CONTINUE;
                        } else {
                            SessionHelper.endCommitFailed(globalSession);
                            return false;
                        }
                    default:
                        if(! retrying) { globalSession.queueToRetryCommit();return false;
                        }
                        if (globalSession.canBeCommittedAsync()) {

                            return CONTINUE;
                        } else {
                            return false; }}}catch (Exception ex) {
               
            }
            return CONTINUE;
        });
      
    }
    //If success and there is no branch, end the global transaction.
    if (success && globalSession.getBranchSessions().isEmpty()) {
        SessionHelper.endCommitted(globalSession);

        // committed event
        eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
            globalSession.getTransactionName(), globalSession.getApplicationId(), globalSession.getTransactionServiceGroup(),
            globalSession.getBeginTime(), System.currentTimeMillis(), globalSession.getStatus()));

    }
    return success;
}


// Step 3: Remove the Session
public void removeBranch(BranchSession branchSession) throws TransactionException {
    // do not unlock if global status in (Committing, CommitRetrying, AsyncCommitting),
    // because it's already unlocked in 'DefaultCore.commit()'
    if(status ! = Committing && status ! = CommitRetrying && status ! = AsyncCommitting) {if(! branchSession.unlock()) {throw new TransactionException("Unlock branch lock failed, xid = " + this.xid + ", branchId = "+ branchSession.getBranchId()); }}for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
        lifecycleListener.onRemoveBranch(this, branchSession);
    }
    remove(branchSession);
}


/ / Step 4: delete DataBaseTransactionStoreManager # writeSession processing Branch


Copy the code

5.2 Comparing Rollback Processes

As with Global, the asynchronous approach is different

TODO: Follow-up improvementsCopy the code

5.3 Description of DefaultCoordinator

So let’s see what the DefaultCoordinator class has. Okay?



// Internal objects
C- DefaultCoordinator
	F- int TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS = 5000; // The maximum waiting time for the scheduled task to be closed
	F- long COMMITTING_RETRY_PERIOD  // Commit retry time, default 1000L
	F- long ASYNC_COMMITTING_RETRY_PERIOD // Asynchronous commit retry time
	F- long ROLLBACKING_RETRY_PERIOD // Rollback retry time
	F- long TIMEOUT_RETRY_PERIOD // Retry after timeout
	F- long UNDO_LOG_DELETE_PERIOD // UNDO_LOG Delete period
	F- long UNDO_LOG_DELAY_DELETE_PERIOD = 3 * 60 * 1000 // Undo_log Delay deletion period
	F- int ALWAYS_RETRY_BOUNDARY = 0 // Always retry
	F- Duration MAX_COMMIT_RETRY_TIMEOUT  // Maximum commit retry timeout
	F- Duration MAX_ROLLBACK_RETRY_TIMEOUT  // Maximum rollback retry timeout
	F- boolean ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE // Enable timeout unlock
	F- ScheduledThreadPoolExecutor retryRollbacking 
	F- ScheduledThreadPoolExecutor retryCommitting
	F- ScheduledThreadPoolExecutor asyncCommitting
	F- ScheduledThreadPoolExecutor timeoutCheck
	F- ScheduledThreadPoolExecutor undoLogDelete
	F- RemotingServer remotingServer
	F- DefaultCore core
	F-EventBus eventBus
Copy the code

5.4 What is GlobalSession

When a global transaction is created, a global Session is created to implement the SessionLifecycle interface and provide begin, changeStatus, changeBranchStatus, addBranch, Methods such as removeBranch to operate on session and branchSession

  • GlobalSessionLock: This object holds the ReentrantLock object internally, using the Lock and unlock mechanisms of ReentrantLock
  • BranchSession: BranchSession, which manages branch data. It is managed by globalSession. Its lock and unlock methods are implemented by lockManger
  • DefaultLockManager: DefaultLockManager is the default implementation of LockManager, which gets the lockKey of the branchSession

Talk about what lifecycleListeners do

private Set lifecycleListeners = new HashSet<>()

In each method, the logic is called through the loop Listeners

  • begin() -> lifecycleListener.onBegin(this)
  • changeStatus(GlobalStatus status) -> lifecycleListener.onStatusChange(this, status)
  • changeBranchStatus(BranchSession branchSession, BranchStatus status) -> lifecycleListener.onBranchStatusChange(this, branchSession, status)
  • close() -> lifecycleListener.onClose(this)
  • end() -> lifecycleListener.onEnd(this)

SessionLifecycleListener module

As you can see, there are three main implementations, DataBaseSessionMananger, FileSessionManager, and RedisSessionManager

Each will correspond to its own Store manager: FileSessionManager -> FileTransactionStoreManager RedisSessionManager -> RedisTransactionStoreManager DataBaseSessionManager -> DataBaseTransactionStoreManager

5.5 Main Logic flowchart

conclusion

The content of this article is relatively simple, mainly to complete the map of Seata, which is a sort of comb

Then the undo-log is sorted out and the whole process is almost finished

Seata complete pure source analysis can also come to an end, the follow-up slowly to open the application, wait and see