>>>> 😜😜😜 Making: 👉 github.com/black-ant
A. The preface
Seata: Session initialization for server-side transactions. Seata: Session initialization for server-side transactions
The entire Session will be processed for two operations, one for global_table and the other for branch_table.
Pro 1: Function of global_table Global_table is used to persist global transactions and can be configured using store.db.global.table
CREATE TABLE `global_table` (
`xid` varchar(128) NOT NULL,
`transaction_id` bigint(20) DEFAULT NULL,
`status` tinyint(4) NOT NULL,
`application_id` varchar(32) DEFAULT NULL,
`transaction_service_group` varchar(32) DEFAULT NULL,
`transaction_name` varchar(128) DEFAULT NULL,
`timeout` int(11) DEFAULT NULL,
`begin_time` bigint(20) DEFAULT NULL,
`application_data` varchar(2000) DEFAULT NULL,
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL,
PRIMARY KEY (`xid`),
KEY `idx_gmt_modified_status` (`gmt_modified`,`status`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Copy the code
Pro 2: Branch_table function Branch_table is used to identify branch transactions and can be configured using store.db.branch
CREATE TABLE `branch_table` (
`branch_id` bigint(20) NOT NULL,
`xid` varchar(128) NOT NULL,
`transaction_id` bigint(20) DEFAULT NULL,
`resource_group_id` varchar(32) DEFAULT NULL,
`resource_id` varchar(256) DEFAULT NULL,
`branch_type` varchar(8) DEFAULT NULL,
`status` tinyint(4) DEFAULT NULL,
`client_id` varchar(64) DEFAULT NULL,
`application_data` varchar(2000) DEFAULT NULL,
`gmt_create` datetime(6) DEFAULT NULL,
`gmt_modified` datetime(6) DEFAULT NULL,
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Copy the code
Global_table and branch_TABLE are related to primary and branch transactions. When a service is running, a piece of data is created in global_table, and each database operation unit (RM transaction), a piece of branch_TABLE data is created.
2. Session data trend
To look at the data flow of the entire process, first look at the overall processing logic
C1-netty Request Send a Request c2-AbstracttCinboundHandler process the Request A Handler C4 C3 - GlobalSession open affairs - AbstractSessionManager abstract level management Session C5 - DataBaseTransactionStoreManager actually Session management C6- LogStoreDataBaseDAO persistence processingCopy the code
2.1 Session data entry
In the previous document, have learned through DataBaseTransactionStoreManager # writeSession opens the Session handling
Each if has a specific operation type, which is divided into two parts: Global and Branch
public boolean writeSession(LogOperation logOperation, SessionStorable session) {
if (LogOperation.GLOBAL_ADD.equals(logOperation)) {
return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
} else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {
return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
} else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {
return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
// Handle BRANCH operations
} else if (LogOperation.BRANCH_ADD.equals(logOperation)) {
return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
} else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {
return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
} else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {
return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
} else {
throw new StoreException("Unknown LogOperation:"+ logOperation.name()); }}// You can see that there are multiple types of judgment by if, and look at the overall call logic
Copy the code
2.2 Session Data Management
Session management is implemented through abstractssession Manager and its corresponding classes. The abstractssession Manager provides the following interfaces to process sessions
public interface SessionManager extends SessionLifecycleListener.Disposable {
/ / Global section
void addGlobalSession(GlobalSession session) throws TransactionException;
GlobalSession findGlobalSession(String xid) ;
GlobalSession findGlobalSession(String xid, boolean withBranchSessions);
void updateGlobalSessionStatus(GlobalSession session, GlobalStatus status) throws TransactionException;
void removeGlobalSession(GlobalSession session) throws TransactionException;
/ / Branch
void addBranchSession(GlobalSession globalSession, BranchSession session) throws TransactionException;
void updateBranchSessionStatus(BranchSession session, BranchStatus status) throws TransactionException;
void removeBranchSession(GlobalSession globalSession, BranchSession session) throws TransactionException;
Collection<GlobalSession> allSessions(a);
// Timed processing
List<GlobalSession> findGlobalSessions(SessionCondition condition);
// Execute after locking
<T> T lockAndExecute(GlobalSession globalSession, GlobalSession.LockCallable<T> lockCallable)
throws TransactionException;
/ / time lock
default boolean scheduledLock(String key) {
return true;
}
default boolean unScheduledLock(String key) {
return true; }}In contrast, Seata provides multiple implementation classes for SessionManger
@LoadLevel(name = "db", scope = Scope.PROTOTYPE)
public class DataBaseSessionManager extends AbstractSessionManager implements Initialize
@LoadLevel(name = "file", scope = Scope.PROTOTYPE)
public class FileSessionManager extends AbstractSessionManager implements Reloadable
@LoadLevel(name = "redis", scope = Scope.PROTOTYPE)
public class RedisSessionManager extends AbstractSessionManager implements Initialize
Copy the code
2.3 Session Creation
When the first service is executed, the logical process of addGlobalSession is initiated. The Session is created using the SessionManager
- C- AbstractTCInboundHandler # handle
- C- DefaultCoordinator # doGlobalBegin
- C- DefaultCore # begin
- C- GlobalSession # begin
- C- AbstractSessionManager # onBegin
- C- DataBaseSessionManager # addGlobalSession
// Step temporary: onRequest
AbstractResultMessage onRequest(AbstractMessage request, RpcContext context)
// Step 1: Enable global transactions -> business-service-seata-service-group
C- DefaultCoordinator
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
throws TransactionException {
response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
request.getTransactionName(), request.getTimeout()));
}
// Step 2 : DefaultCore
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
/ / open GlobalSession
GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,
timeout);
MDC.put(RootContext.MDC_KEY_XID, session.getXid());
session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
session.begin();
// Publish the transaction GlobalTransactionEvent
eventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
session.getTransactionName(), applicationId, transactionServiceGroup, session.getBeginTime(), null, session.getStatus()));
return session.getXid();
}
// Step 3: Add the GlobalSession C-DatabasesessionManager
public void addGlobalSession(GlobalSession session) throws TransactionException {
if (StringUtils.isBlank(taskName)) {
boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_ADD, session);
if(! ret) {throw new StoreException("addGlobalSession failed."); }}else {
boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_UPDATE, session);
if(! ret) {throw new StoreException("addGlobalSession failed."); }}}/ / Step 4: build convertGlobalTransactionDO
return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
{
"applicationId": "business-seata-example"."beginTime": 1626253527508."status": 1."timeout": 300000."transactionId": 4386660905323928286."transactionName": "dubbo-gts-seata-example"."transactionServiceGroup": "business-service-seata-service-group"."xid": "192.168.181.2:8091:4386660905323928286"
}
// Step 5: Insert into SQL with the following parameters
public boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) {
String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertGlobalTransactionSQL(globalTable);
Connection conn = null;
PreparedStatement ps = null;
try {
conn = logStoreDataSource.getConnection();
conn.setAutoCommit(true);
ps = conn.prepareStatement(sql);
ps.setString(1."192.168.181.2:8091:4386660905323928286");
ps.setLong(2.4386660905323928286);
ps.setInt(3.1);
ps.setString(4."business-seata-example");
ps.setString(5."business-service-seata-service-group");
String transactionName = globalTransactionDO.getTransactionName();
transactionName = transactionName.length() > transactionNameColumnSize ? transactionName.substring(0,
transactionNameColumnSize) : transactionName;
ps.setString(6."dubbo-gts-seata-example");
ps.setInt(7.300000);
ps.setLong(8.1626253527508);
ps.setString(9.null);
return ps.executeUpdate() > 0;
} catch (SQLException e) {
throw new StoreException(e);
} finally{ IOUtil.close(ps, conn); }}Copy the code
2.4 Usage of Session data
After looking at the management, look at the whole process process, how to implement the Session read and use processAbstractssession Manager: abstractsSession Manager: AbstractsSession Manager: AbstractsSession Manager: AbstractsSession Manager: AbstractsSession Manager: AbstractsSession Manager: AbstractsSession Manager
// Path 1: RetryRollback Rollback
1. DefaultCoordinator # init
2.DefaultCoordinator # handleRetryRollbacking: Processes the Rollback logic for retry3.DataBaseSessionManager # allSessions: Process sessionHolder. RETRY_ROLLBACKING_SESSION_MANAGER_NAME4. DataBaseSessionManager # findGlobalSessions
// The same process is used for the following:
1. retryCommitting
2. timeoutCheck
3. asyncCommitting
// Approach 2: Session business processWhat happens in normal logic, and we'll see later, in the business logic as a whole, how does the Session functionCopy the code
2.4.1 Obtaining the Session
(DataBaseSessionManager # readSession); (DataBaseSessionManager # readSession)
- GlobalSession findGlobalSession(String xid)
- GlobalSession findGlobalSession(String xid, boolean withBranchSessions)
- List findGlobalSessions(SessionCondition Condition) : Based on scheduled tasks
// Step 1: findSession logic
public GlobalSession findGlobalSession(String xid, boolean withBranchSessions) {
// This is primarily a call to ReadSession
return transactionStoreManager.readSession(xid, withBranchSessions);
// transactionStoreManager.readSession(condition)
}
// Step 2: readSession operation
public GlobalSession readSession(String xid, boolean withBranchSessions) {
/ / query GlobalTransactionDO
GlobalTransactionDO globalTransactionDO = logStore.queryGlobalTransactionDO(xid);
if (globalTransactionDO == null) {
return null;
}
//branch transactions
List<BranchTransactionDO> branchTransactionDOs = null;
//reduce rpc with db when branchRegister and getGlobalStatus
if (withBranchSessions) {
branchTransactionDOs = logStore.queryBranchTransactionDO(globalTransactionDO.getXid());
}
return getGlobalSession(globalTransactionDO, branchTransactionDOs);
}
// Step 2-1: Call the specific queryBranchTransactionDO
// The LogStore implementation class LogStoreDataBaseDAO is used here
public List<GlobalTransactionDO> queryGlobalTransactionDO(int[] statuses, int limit) {
conn = logStoreDataSource.getConnection();
// Automatically commit transactions
conn.setAutoCommit(true);
String paramsPlaceHolder = org.apache.commons.lang.StringUtils.repeat("?".",", statuses.length);
// select xid, transaction_id, status, application_id, transaction_service_group,
// transaction_name, timeout, begin_time, application_data, gmt_create, gmt_modified
// from global_table where status in (? ,? ,? ,?) order by gmt_modified limit ?
String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getQueryGlobalTransactionSQLByStatus(globalTable, paramsPlaceHolder);
// Initiate SQL processing
ps = conn.prepareStatement(sql);
}
// Step 2-2: Convert GlobalTransactionDO to GlobalSession
private GlobalSession getGlobalSession(GlobalTransactionDO globalTransactionDO, List
branchTransactionDOs)
{
GlobalSession globalSession = SessionConverter.convertGlobalSession(globalTransactionDO);
//branch transactions
if (CollectionUtils.isNotEmpty(branchTransactionDOs)) {
// Join branch transaction BranchTransactionDO
for(BranchTransactionDO branchTransactionDO : branchTransactionDOs) { globalSession.add(SessionConverter.convertBranchSession(branchTransactionDO)); }}return globalSession;
}
// A simple Session query process is finished, and the corresponding SessionCondition and statuses query
public List<GlobalSession> readSession(GlobalStatus[] statuses) {
int[] states = new int[statuses.length];
for (int i = 0; i < statuses.length; i++) {
states[i] = statuses[i].getCode();
}
// Query all globalTransactionDos corresponding to status
List<GlobalTransactionDO> globalTransactionDOs = logStore.queryGlobalTransactionDO(states, logQueryLimit);
if (CollectionUtils.isEmpty(globalTransactionDOs)) {
return null;
}
List<String> xids = globalTransactionDOs.stream().map(GlobalTransactionDO::getXid).collect(Collectors.toList());
// Get all branch transactions
List<BranchTransactionDO> branchTransactionDOs = logStore.queryBranchTransactionDO(xids);
Map<String, List<BranchTransactionDO>> branchTransactionDOsMap = branchTransactionDOs.stream()
.collect(Collectors.groupingBy(BranchTransactionDO::getXid, LinkedHashMap::new, Collectors.toList()));
// Return the Gloabl collection
return globalTransactionDOs.stream().map(globalTransactionDO ->
getGlobalSession(globalTransactionDO, branchTransactionDOsMap.get(globalTransactionDO.getXid())))
.collect(Collectors.toList());
}
public List<GlobalSession> readSession(SessionCondition sessionCondition) {
// The Xid or TransactionId is provided for the timing case
if (StringUtils.isNotBlank(sessionCondition.getXid())) {
GlobalSession globalSession = readSession(sessionCondition.getXid());
if(globalSession ! =null) {
List<GlobalSession> globalSessions = new ArrayList<>();
globalSessions.add(globalSession);
returnglobalSessions; }}else if(sessionCondition.getTransactionId() ! =null) {
GlobalSession globalSession = readSession(sessionCondition.getTransactionId());
if(globalSession ! =null) {
List<GlobalSession> globalSessions = new ArrayList<>();
globalSessions.add(globalSession);
returnglobalSessions; }}else if (CollectionUtils.isNotEmpty(sessionCondition.getStatuses())) {
return readSession(sessionCondition.getStatuses());
}
return null;
}
Copy the code
2.4.2 Adding a Session
// After the Session is created, the subsequent BranchSession will join the GlobalSession -> see 3.1.3 Registering the master logic
Copy the code
2.5 Session Data Destruction
The main process is as follows:
- C- SessionHelper # endRollbacked
- C- GlobalSession # end()
- C- GlobalSession # onEnd()
- C- DataBaseSessionManager # removeGlobalSession
// DataBaseSessionManager
// Step 1: Destroy the entry DefaultCore # doGlobalCommit
public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
boolean success = true;
// start committing event
eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
globalSession.getTransactionName(), globalSession.getApplicationId(), globalSession.getTransactionServiceGroup(),
globalSession.getBeginTime(), null, globalSession.getStatus()));
/ /... Omit the COMMIT logic
//If success and there is no branch, end the global transaction.
if (success && globalSession.getBranchSessions().isEmpty()) {
SessionHelper.endCommitted(globalSession);
// committed event
eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
globalSession.getTransactionName(), globalSession.getApplicationId(), globalSession.getTransactionServiceGroup(),
globalSession.getBeginTime(), System.currentTimeMillis(), globalSession.getStatus()));
}
return success;
}
// Step 2: Commit
public static void endCommitted(GlobalSession globalSession) throws TransactionException {
globalSession.changeStatus(GlobalStatus.Committed);
globalSession.end();
}
// Step 3: Close the lifecycle
public void end(a) throws TransactionException {
// Clean locks first
clean();
for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onEnd(this); }}// Step 4: Remove global transactions
public void removeGlobalSession(GlobalSession session) throws TransactionException {
boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_REMOVE, session);
if(! ret) {throw new StoreException("removeGlobalSession failed."); }}Copy the code
3. Branch_table process
3.1 Branch_TABLE Creation Process
3.1.1 Registration entry
AbstractTCInboundHandler is an important class that processes all Branch logic. This class has an implementation class: DefaultCoordinator
C- AbstractTCInboundHandler # handle
public BranchRegisterResponse handle(BranchRegisterRequest request, final RpcContext rpcContext) {
BranchRegisterResponse response = new BranchRegisterResponse();
// Note that an AbstractCallback is created
exceptionHandleTemplate(new AbstractCallback<BranchRegisterRequest, BranchRegisterResponse>() {
@Override
public void execute(BranchRegisterRequest request, BranchRegisterResponse response)
throws TransactionException {
try {
// Register Branch
doBranchRegister(request, response, rpcContext);
} catch (StoreException e) {
throw new TransactionException(TransactionExceptionCode.FailedStore, String
.format("branch register request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()), e);
}
}
}, request, response);
return response;
}
Copy the code
3.1.2 doBranchRegister Main registration process
Each transaction unit registers a Branch,
C- DefaultCoordinator # doBranchRegister
protected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response, RpcContext rpcContext) throws TransactionException {
MDC.put(RootContext.MDC_KEY_XID, request.getXid());
response.setBranchId(
core.branchRegister(request.getBranchType(), request.getResourceId(), rpcContext.getClientId(),
request.getXid(), request.getApplicationData(), request.getLockKey()));
}
Copy the code
3.1.3 Registering the master logic
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {
GlobalSession globalSession = assertGlobalSessionNotNull(xid, false);
return SessionHolder.lockAndExecute(globalSession, () -> {
globalSessionStatusCheck(globalSession);
globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
// Build the Branch business
BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,
applicationData, lockKeys, clientId);
MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId()));
branchSessionLock(globalSession, branchSession);
try {
// Add the Branch service
globalSession.addBranch(branchSession);
} catch (RuntimeException ex) {
branchSessionUnlock(branchSession);
throw newBranchTransactionException(........) ; }return branchSession.getBranchId();
});
}
Copy the code
3.1.4 add Branch
public void addBranch(BranchSession branchSession) throws TransactionException {
for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onAddBranch(this, branchSession);
}
branchSession.setStatus(BranchStatus.Registered);
add(branchSession);
}
public void addBranchSession(GlobalSession globalSession, BranchSession session) throws TransactionException {
if (StringUtils.isNotBlank(taskName)) {
return;
}
// At the core node again, call writeSession
boolean ret = transactionStoreManager.writeSession(LogOperation.BRANCH_ADD, session);
if(! ret) {throw new StoreException("addBranchSession failed."); }}// Omit the writeSession logic, but that's all
Copy the code
Take a look at the saved data structure
C- LogStoreDataBaseDAO
public boolean insertBranchTransactionDO(BranchTransactionDO branchTransactionDO) {
String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertBranchTransactionSQL(branchTable);
Connection conn = null;
PreparedStatement ps = null;
try {
conn = logStoreDataSource.getConnection();
conn.setAutoCommit(true);
ps = conn.prepareStatement(sql);
ps.setString(1."192.168.181.2:8091:7214921829625028621");
ps.setLong(2."7214921829625028621");
ps.setLong(3."7214921829625028637");
ps.setString(4.null);
ps.setString(5."JDBC: mysql: / / 127.0.0.1:3306 / seata." ");
ps.setString(6."AT");
ps.setInt(7.0);
ps.setString(8."The account - seata - example: 192.168.181.2:49676");
ps.setString(9.null);
return ps.executeUpdate() > 0;
} catch (SQLException e) {
throw new StoreException(e);
} finally{ IOUtil.close(ps, conn); }}Copy the code
5. Supplement for other links
5.1 Deleting a BranchSession
BranchSession will be deleted in asyncCommit logic. The main process is as follows:
// Step 1: asyncCommit commit globalCommit
protected void handleAsyncCommitting(a) {
SessionHelper.forEach(asyncCommittingSessions, asyncCommittingSession -> {
asyncCommittingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
core.doGlobalCommit(asyncCommittingSession, true);
});
}
// Commit the Global Commit transaction
public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
boolean success = true;
// start committing event
eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
globalSession.getTransactionName(), globalSession.getApplicationId(), globalSession.getTransactionServiceGroup(),
globalSession.getBeginTime(), null, globalSession.getStatus()));
if (globalSession.isSaga()) {
success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);
} else {
/ / to omit...
BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);
switch (branchStatus) {
case PhaseTwo_Committed:
globalSession.removeBranch(branchSession);
return CONTINUE;
case PhaseTwo_CommitFailed_Unretryable:
if (globalSession.canBeCommittedAsync()) {
return CONTINUE;
} else {
SessionHelper.endCommitFailed(globalSession);
return false;
}
default:
if(! retrying) { globalSession.queueToRetryCommit();return false;
}
if (globalSession.canBeCommittedAsync()) {
return CONTINUE;
} else {
return false; }}}catch (Exception ex) {
}
return CONTINUE;
});
}
//If success and there is no branch, end the global transaction.
if (success && globalSession.getBranchSessions().isEmpty()) {
SessionHelper.endCommitted(globalSession);
// committed event
eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
globalSession.getTransactionName(), globalSession.getApplicationId(), globalSession.getTransactionServiceGroup(),
globalSession.getBeginTime(), System.currentTimeMillis(), globalSession.getStatus()));
}
return success;
}
// Step 3: Remove the Session
public void removeBranch(BranchSession branchSession) throws TransactionException {
// do not unlock if global status in (Committing, CommitRetrying, AsyncCommitting),
// because it's already unlocked in 'DefaultCore.commit()'
if(status ! = Committing && status ! = CommitRetrying && status ! = AsyncCommitting) {if(! branchSession.unlock()) {throw new TransactionException("Unlock branch lock failed, xid = " + this.xid + ", branchId = "+ branchSession.getBranchId()); }}for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onRemoveBranch(this, branchSession);
}
remove(branchSession);
}
/ / Step 4: delete DataBaseTransactionStoreManager # writeSession processing Branch
Copy the code
5.2 Comparing Rollback Processes
As with Global, the asynchronous approach is different
TODO: Follow-up improvementsCopy the code
5.3 Description of DefaultCoordinator
So let’s see what the DefaultCoordinator class has. Okay?
// Internal objects
C- DefaultCoordinator
F- int TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS = 5000; // The maximum waiting time for the scheduled task to be closed
F- long COMMITTING_RETRY_PERIOD // Commit retry time, default 1000L
F- long ASYNC_COMMITTING_RETRY_PERIOD // Asynchronous commit retry time
F- long ROLLBACKING_RETRY_PERIOD // Rollback retry time
F- long TIMEOUT_RETRY_PERIOD // Retry after timeout
F- long UNDO_LOG_DELETE_PERIOD // UNDO_LOG Delete period
F- long UNDO_LOG_DELAY_DELETE_PERIOD = 3 * 60 * 1000 // Undo_log Delay deletion period
F- int ALWAYS_RETRY_BOUNDARY = 0 // Always retry
F- Duration MAX_COMMIT_RETRY_TIMEOUT // Maximum commit retry timeout
F- Duration MAX_ROLLBACK_RETRY_TIMEOUT // Maximum rollback retry timeout
F- boolean ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE // Enable timeout unlock
F- ScheduledThreadPoolExecutor retryRollbacking
F- ScheduledThreadPoolExecutor retryCommitting
F- ScheduledThreadPoolExecutor asyncCommitting
F- ScheduledThreadPoolExecutor timeoutCheck
F- ScheduledThreadPoolExecutor undoLogDelete
F- RemotingServer remotingServer
F- DefaultCore core
F-EventBus eventBus
Copy the code
5.4 What is GlobalSession
When a global transaction is created, a global Session is created to implement the SessionLifecycle interface and provide begin, changeStatus, changeBranchStatus, addBranch, Methods such as removeBranch to operate on session and branchSession
- GlobalSessionLock: This object holds the ReentrantLock object internally, using the Lock and unlock mechanisms of ReentrantLock
- BranchSession: BranchSession, which manages branch data. It is managed by globalSession. Its lock and unlock methods are implemented by lockManger
- DefaultLockManager: DefaultLockManager is the default implementation of LockManager, which gets the lockKey of the branchSession
Talk about what lifecycleListeners do
private Set lifecycleListeners = new HashSet<>()
In each method, the logic is called through the loop Listeners
- begin() -> lifecycleListener.onBegin(this)
- changeStatus(GlobalStatus status) -> lifecycleListener.onStatusChange(this, status)
- changeBranchStatus(BranchSession branchSession, BranchStatus status) -> lifecycleListener.onBranchStatusChange(this, branchSession, status)
- close() -> lifecycleListener.onClose(this)
- end() -> lifecycleListener.onEnd(this)
SessionLifecycleListener module
As you can see, there are three main implementations, DataBaseSessionMananger, FileSessionManager, and RedisSessionManager
Each will correspond to its own Store manager: FileSessionManager -> FileTransactionStoreManager RedisSessionManager -> RedisTransactionStoreManager DataBaseSessionManager -> DataBaseTransactionStoreManager
5.5 Main Logic flowchart
conclusion
The content of this article is relatively simple, mainly to complete the map of Seata, which is a sort of comb
Then the undo-log is sorted out and the whole process is almost finished
Seata complete pure source analysis can also come to an end, the follow-up slowly to open the application, wait and see