>>>> 😜😜😜 Github: 👉 github.com/black-ant CASE backup: 👉 gitee.com/antblack/ca…
A. The preface
In this article, we will look at the Seata Client’s undo log operation.
Undo-log is the core part of AT mode. It is done in RM. When each database unit is processed, one undoLog data is generated.
2. Undo – log tables
Take a look at the table structure of undo-log
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=17 DEFAULT CHARSET=utf8;
// Take a look at the parameters:- branch_id: branch ID - Context: mirror data - rollback_info: - log_status:Copy the code
The SQL statement
INSERT INTO `seata`.`undo_log`(
`id`, `branch_id`, `xid`,
`context`, `rollback_info`,
`log_status`, `log_created`, `log_modified`,
`ext`
) VALUES (
1.5116237355214458898.'192.168.181.2:8091:5116237355214458897'.'serializer=jackson'.0x7B7D.1.'the 2021-06-25 23:26:06'.'the 2021-06-25 23:26:06'.NULL
);
Copy the code
This method can be used to roll back the SQL statement:
// Take a look at the details of the currently inserted undoLog
private void insertUndoLog(String xid, long branchId, String rollbackCtx,
byte[] undoLogContent, State state, Connection conn) throws SQLException {
try (PreparedStatement pst = conn.prepareStatement(INSERT_UNDO_LOG_SQL)) {
pst.setLong(1.4386660905323926071);
pst.setString(2."192.168.181.2:8091:4386660905323926065");
pst.setString(3."serializer=jackson");
pst.setBlob(4, BlobUtils.bytes2Blob(undoLogContent));
pst.setInt(5, State.Normal(0));
pst.executeUpdate();
} catch (Exception e) {
if(! (einstanceof SQLException)) {
e = new SQLException(e);
}
throw(SQLException) e; }}/ / undoLogContent parameters
{
"@class": "io.seata.rm.datasource.undo.BranchUndoLog"."xid": "192.168.181.2:8091:4386660905323926065"."branchId": 4386660905323926071."sqlUndoLogs": ["java.util.ArrayList"[{"@class": "io.seata.rm.datasource.undo.SQLUndoLog"."sqlType": "INSERT"."tableName": "t_order"."beforeImage": {
"@class": "io.seata.rm.datasource.sql.struct.TableRecords$EmptyTableRecords"."tableName": "t_order"."rows": ["java.util.ArrayList", [the]]},"afterImage": {
"@class": "io.seata.rm.datasource.sql.struct.TableRecords"."tableName": "t_order"."rows": ["java.util.ArrayList"[{"@class": "io.seata.rm.datasource.sql.struct.Row"."fields": ["java.util.ArrayList"[{"@class": "io.seata.rm.datasource.sql.struct.Field"."name": "id"."keyType": "PRIMARY_KEY"."type": 4."value": 31
}, {
"@class": "io.seata.rm.datasource.sql.struct.Field"."name": "order_no"."keyType": "NULL"."type": 12."value": "63098e74e93b49bba77f1957e8fdab39"
}, {
"@class": "io.seata.rm.datasource.sql.struct.Field"."name": "user_id"."keyType": "NULL"."type": 12."value": "1"
}, {
"@class": "io.seata.rm.datasource.sql.struct.Field"."name": "commodity_code"."keyType": "NULL"."type": 12."value": "C201901140001"
}, {
"@class": "io.seata.rm.datasource.sql.struct.Field"."name": "count"."keyType": "NULL"."type": 4."value": 50
}, {
"@class": "io.seata.rm.datasource.sql.struct.Field"."name": "amount"."keyType": "NULL"."type": 8."value": 100.0}]}]}Copy the code
UndoLogContent is an array of bytes converted by blobutils. bytes2Blob, Xid and BranchId store global transaction ID(xid) and branch transaction ID(BranchId), and record tableName (tableName) and operation type (sqlType) in sqlUndoLogs attribute.
BeforeImage and afterImage are used for beforeand afterImage data.
3. Processing flow of Client undo-log
The Client provides three implementations for storing undo-logs. As you can see, all of them are persisted to libraries, depending on the specific library type
3.1 AbstractUndoLogManager parsing
AbstractUndoLogManager implements UndoLogManager, which is a major management tool to implement undo-log management. This class mainly implements the following methods
public interface UndoLogManager {
void flushUndoLogs(ConnectionProxy cp) throws SQLException;
void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException;
void deleteUndoLog(String xid, long branchId, Connection conn) throws SQLException;
void batchDeleteUndoLog(Set<String> xids, Set<Long> branchIds, Connection conn) throws SQLException;
int deleteUndoLogByLogCreated(Date logCreated, int limitRows, Connection conn) throws SQLException;
}
Copy the code
There are three RM’s (Order, Account, Storage) in the whole case
3.2 Undo-log Initiated Process (Order)
- ConnectionProxy # doCommit: Initiates the entire COMMIT process
- ConnectionProxy # processGlobalTransactionCommit: global transaction commit operation
- UndoLogManagerFactory # getUndoLogManager: Gets the undoLog manager
- AbstractUndoLogManager # flushUndoLogs
- MySQLUndoLogManager # insertUndoLogWithNormal
- MySQLUndoLogManager # insertUndoLog: insertUndoLog
3.2.1 Main process of undo-log
FlushUndoLogs is the core process where the query is created for BranchUndoLog
public void flushUndoLogs(ConnectionProxy cp) throws SQLException {
ConnectionContext connectionContext = cp.getContext();
if(! connectionContext.hasUndoLog()) {return;
}
String xid = connectionContext.getXid();
long branchId = connectionContext.getBranchId();
// Create undo-log object -> 3.2.2 mirror query and obtain
BranchUndoLog branchUndoLog = new BranchUndoLog();
branchUndoLog.setXid(xid);
branchUndoLog.setBranchId(branchId);
branchUndoLog.setSqlUndoLogs(connectionContext.getUndoItems());
UndoLogParser parser = UndoLogParserFactory.getInstance();
byte[] undoLogContent = parser.encode(branchUndoLog);
// Insert data -> 3.2.3 Insert final data
insertUndoLogWithNormal(xid, branchId, buildContext(parser.getName()), undoLogContent,
cp.getTargetConnection());
}
// branchUndoLog is stored in the image before and after
Copy the code
3.2.2 Querying and Obtaining a Mirror
The image is processed beforeImage and AfterImage. Let’s see where the image is queried
// The starting point of the Image is obtained from the Context
public class ConnectionProxy extends AbstractConnectionProxy {
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionProxy.class);
private ConnectionContext context = new ConnectionContext();
}
// Let's look at the structure of ConnectionContext:
public class ConnectionContext {
private String xid;
private Long branchId;
private boolean isGlobalLockRequire;
/** * Table and primary key should not be duplicated. */
private Set<String> lockKeysBuffer = new HashSet<>();
private List<SQLUndoLog> sqlUndoItemsBuffer = new ArrayList<>();
}
// Query process:
C- ExecuteTemplate # execute
C- AbstractDMLBaseExecutor # executeAutoCommitFalse
C- BaseTransactionalExecutor # prepareUndoLog
C- ConnectionContext # appendUndoItem
Copy the code
Step Start: Master logic, in which before and after images are queried
// In this process, most of the data operations are done
protected T executeAutoCommitFalse(Object[] args) throws Exception {
if(! JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {throw new NotSupportYetException("multi pk only support mysql!");
}
// Query the front image
TableRecords beforeImage = beforeImage();
// Execute the SQL method
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
// Mirror after query
TableRecords afterImage = afterImage(beforeImage);
/ / save Undo - log
prepareUndoLog(beforeImage, afterImage);
return result;
}
// Added: TableRecords object
public class TableRecords implements java.io.Serializable {
// Support serialization capability
private static final long serialVersionUID = 4441667803166771721L;
private transient TableMeta tableMeta;
private String tableName;
private List<Row> rows = new ArrayList<Row>();
Copy the code
Step 1: Obtain beforeImage
AbstractDMLBaseExecutor has multiple implementation classes depending on the processing
If you want to do an Update, you don’t want to do an INSERT.
// c-baseinsertexecutor: how to handle Insert cases
protected TableRecords beforeImage(a) throws SQLException {
return TableRecords.empty(getTableMeta());
}
// c-updateExecutor: specifies how to query an Image when an Image is updated
protected TableRecords beforeImage(a) throws SQLException {
ArrayList<List<Object>> paramAppenderList = new ArrayList<>();
TableMeta tmeta = getTableMeta();
String selectSQL = buildBeforeImageSQL(tmeta, paramAppenderList);
return buildTableRecords(tmeta, selectSQL, paramAppenderList);
}
// Step 1-1 :
private String buildBeforeImageSQL(TableMeta tableMeta, ArrayList<List<Object>> paramAppenderList) {
SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer) sqlRecognizer;
List<String> updateColumns = recognizer.getUpdateColumns();
assertContainsPKColumnName(updateColumns);
StringBuilder prefix = new StringBuilder("SELECT ");
StringBuilder suffix = new StringBuilder(" FROM ").append(getFromTableInSQL());
String whereCondition = buildWhereCondition(recognizer, paramAppenderList);
if (StringUtils.isNotBlank(whereCondition)) {
suffix.append(WHERE).append(whereCondition);
}
String orderBy = recognizer.getOrderBy();
if (StringUtils.isNotBlank(orderBy)) {
suffix.append(orderBy);
}
ParametersHolder parametersHolder = statementProxy instanceof ParametersHolder ? (ParametersHolder)statementProxy : null;
String limit = recognizer.getLimit(parametersHolder, paramAppenderList);
if (StringUtils.isNotBlank(limit)) {
suffix.append(limit);
}
suffix.append(" FOR UPDATE");
StringJoiner selectSQLJoin = new StringJoiner(",", prefix.toString(), suffix.toString());
// Whether to update only columns
if (ONLY_CARE_UPDATE_COLUMNS) {
if(! containsPK(updateColumns)) { selectSQLJoin.add(getColumnNamesInSQL(tableMeta.getEscapePkNameList(getDbType()))); }// Query the updated columns
for(String columnName : updateColumns) { selectSQLJoin.add(columnName); }}else {
for(String columnName : tableMeta.getAllColumns().keySet()) { selectSQLJoin.add(ColumnUtils.addEscape(columnName, getDbType())); }}// SELECT id, count FROM t_storage WHERE commodity_code = ? FOR UPDATE
return selectSQLJoin.toString();
}
Copy the code
TableMeta Table metadata
Step 2: Query AfterImages
protected TableRecords afterImage(TableRecords beforeImage) throws SQLException {
TableMeta tmeta = getTableMeta();
if (beforeImage == null || beforeImage.size() == 0) {
return TableRecords.empty(getTableMeta());
}
String selectSQL = buildAfterImageSQL(tmeta, beforeImage);
ResultSet rs = null;
try (PreparedStatement pst = statementProxy.getConnection().prepareStatement(selectSQL)) {
SqlGenerateUtils.setParamForPk(beforeImage.pkRows(), getTableMeta().getPrimaryKeyOnlyName(), pst);
rs = pst.executeQuery();
return TableRecords.buildRecords(tmeta, rs);
} finally{ IOUtil.close(rs); }}// I won't go into detail here
Copy the code
Step 3: Add undo-log to build Context ()
C- BaseTransactionalExecutor
protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
// Undo-log is created only when the image is changed
if (beforeImage.getRows().isEmpty() && afterImage.getRows().isEmpty()) {
return;
}
// Get the proxy connector
ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
// Insert entity -> see figure below
TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
String lockKeys = buildLockKey(lockKeyRecords);
connectionProxy.appendLockKey(lockKeys);
SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
connectionProxy.appendUndoLog(sqlUndoLog);
}
Copy the code
After querying the Image, let’s take a look at the insert process of the Image again
3.2.3 Insertion of final data
protected void insertUndoLogWithGlobalFinished(String xid, long branchId, UndoLogParser parser, Connection conn) throws SQLException {
insertUndoLog(xid, branchId, buildContext(parser.getName()),
parser.getDefaultContent(), State.GlobalFinished, conn);
}
// The data insertion logic is shown in Section 2
Copy the code
Update mirror undo-log (Storage)
The main process is the same as the Order. If you look at the undo-log data during the insert, you can see that instead of generating a SQL, the fields and data are mirrored
And the image here is dealing with the nodes that are moving
{
"@class": "io.seata.rm.datasource.undo.BranchUndoLog"."xid": "192.168.181.2:8091:4386660905323926147"."branchId": 4386660905323926150."sqlUndoLogs": ["java.util.ArrayList"[{"@class": "io.seata.rm.datasource.undo.SQLUndoLog"."sqlType": "UPDATE"."tableName": "t_storage"."beforeImage": {
"@class": "io.seata.rm.datasource.sql.struct.TableRecords"."tableName": "t_storage"."rows": ["java.util.ArrayList"[{"@class": "io.seata.rm.datasource.sql.struct.Row"."fields": ["java.util.ArrayList"[{"@class": "io.seata.rm.datasource.sql.struct.Field"."name": "id"."keyType": "PRIMARY_KEY"."type": 4."value": 1
}, {
"@class": "io.seata.rm.datasource.sql.struct.Field"."name": "count"."keyType": "NULL"."type": 4."value": -800}}}]]]]."afterImage": {
"@class": "io.seata.rm.datasource.sql.struct.TableRecords"."tableName": "t_storage"."rows": ["java.util.ArrayList"[{"@class": "io.seata.rm.datasource.sql.struct.Row"."fields": ["java.util.ArrayList"[{"@class": "io.seata.rm.datasource.sql.struct.Field"."name": "id"."keyType": "PRIMARY_KEY"."type": 4."value": 1
}, {
"@class": "io.seata.rm.datasource.sql.struct.Field"."name": "count"."keyType": "NULL"."type": 4."value": -850}]}]}Copy the code
The same thing with Account, but I won’t leave it there
4. Client undo-log rollback process
The process of creating an undo-log is shown in the preceding section. The process of undo-log is shown in the following section
It is important to note that the undo-log is created in each RM, but is rolled back in
4.1 Undo log Rollback Process
Rollback Main process:
- RmBranchRollbackProcessor # process: to receive back processing the request
- RmBranchRollbackProcessor # handleBranchRollback
- AbstractRMHandler # onRequest
- AbstractRMHandler # handle
- AbstractExceptionHandler # exceptionHandleTemplate
- AbstractRMHandler # handle
- AbstractRMHandler # doBranchRollback: Branch rollback
- DataSourceManager # branchRollback
- AbstractUndoLogManager # undo: Execute the undo logic
- AbstractUndoLogManager # deleteUndoLog: Delete branch
We can see here that the core logic is undo, the code of this logic is relatively long, I here divided into callback and delete undo-log 2 logic to see:
4.2 Calling back the main logic
C- AbstractUndoLogManager
public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
Connection conn = null;
ResultSet rs = null;
PreparedStatement selectPST = null;
boolean originalAutoCommit = true;
for(; ;) { conn = dataSourceProxy.getPlainConnection();// The entire undo process should run in a local transaction.
if (originalAutoCommit = conn.getAutoCommit()) {
conn.setAutoCommit(false);
}
// Query undo-log by branchId and xID
selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
selectPST.setLong(1, branchId);
selectPST.setString(2, xid);
rs = selectPST.executeQuery();
boolean exists = false;
// Loop the undo-log
while (rs.next()) {
exists = true;
// The server may repeatedly send rollback requests to roll back the same branch transaction to multiple processes, thus ensuring that only the undo_log is processed in the normal state
int state = rs.getInt(ClientTableColumnsName.UNDO_LOG_LOG_STATUS);
if(! canUndo(state)) {return;
}
String contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);
Map<String, String> context = parseContext(contextString);
byte[] rollbackInfo = getRollbackInfo(rs);
String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);
UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance()
: UndoLogParserFactory.getInstance(serializer);
// Deserialize to BranchUndoLog
BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);
try {
// put serializer name to local
setCurrentSerializer(parser.getName());
List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
if (sqlUndoLogs.size() > 1) {
// Reverse the order
Collections.reverse(sqlUndoLogs);
}
// Run the undo-log command to perform the rollback
for(SQLUndoLog sqlUndoLog : sqlUndoLogs) { TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta( conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId()); sqlUndoLog.setTableMeta(tableMeta); AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor( dataSourceProxy.getDbType(), sqlUndoLog); undoExecutor.executeOn(conn); }}finally {
// remove serializer nameremoveCurrentSerializer(); }}/ /... Omit the rollback logic}}Copy the code
AbstractUndoExecutor rolls back the executeOn
public void executeOn(Connection conn) throws SQLException {
if(IS_UNDO_DATA_VALIDATION_ENABLE && ! dataValidationAndGoOn(conn)) {return;
}
try {
// UPDATE t_storage SET count = ? WHERE id = ?
String undoSQL = buildUndoSQL();
PreparedStatement undoPST = conn.prepareStatement(undoSQL);
TableRecords undoRows = getUndoRows();
// Get the affected columns
for (Row undoRow : undoRows.getRows()) {
ArrayList<Field> undoValues = new ArrayList<>();
List<Field> pkValueList = getOrderedPkList(undoRows, undoRow, getDbType(conn));
for (Field field : undoRow.getFields()) {
if (field.getKeyType() != KeyType.PRIMARY_KEY) {
undoValues.add(field);
}
}
// Parse the value of the field to be rolled back (that is, the original value)
undoPrepare(undoPST, undoValues, pkValueList);
// Run the undo-log command to roll back the valueundoPST.executeUpdate(); }}catch (Exception ex) {
}
}
Copy the code
5. Client undo-log delete process
After the rollback is complete, take a look at the undo-log delete processing. The delete logic is processed after the rollback logic
5.1 undo-log Main logic
public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
Connection conn = null;
ResultSet rs = null;
PreparedStatement selectPST = null;
boolean originalAutoCommit = true;
for(; ;) {try {
/ /... Omit rollback logic
// If the undo_log exists, it means that the branch transaction has completed the first phase, we can simply roll back and clean up the undo_log
// Otherwise, it indicates that there is an exception in the branch transaction, causing undo_log not to be written to the database.
// For example, the global transaction is rolled back by the initiator when the business processing times out.
// To ensure data consistency, we can insert an undo_log with GlobalFinished status to prevent the first phase local transactions of other programs from being committed correctly.
if (exists) {
deleteUndoLog(xid, branchId, conn);
conn.commit();
} else {
insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);
conn.commit();
}
return;
} catch (SQLIntegrityConstraintViolationException e) {
// Possible undo_log has been inserted into the database by other processes, retrying rollback undo_log
} catch (Throwable e) {
if(conn ! =null) {
try {
conn.rollback();
} catch (SQLException rollbackEx) {
LOGGER.warn("Failed to close JDBC resource while undo ... ", rollbackEx); }}throw new BranchTransactionException(BranchRollbackFailed_Retriable, String
.format("Branch session rollback failed and try again later xid = %s branchId = %s %s", xid,
branchId, e.getMessage()), e);
} finally {
/ /...}}}Copy the code
5.2 delete undo – log
public void deleteUndoLog(String xid, long branchId, Connection conn) throws SQLException {
try (PreparedStatement deletePST = conn.prepareStatement(DELETE_UNDO_LOG_SQL)) {
deletePST.setLong(1, branchId);
deletePST.setString(2, xid);
deletePST.executeUpdate();
} catch (Exception e) {
if(! (einstanceof SQLException)) {
e = new SQLException(e);
}
throw(SQLException) e; }}Copy the code
conclusion
This article just summarizes the undo-log logic, mainly through BeforeImage and AfterImage to save before and after logic, used for rollback processing
But that’s not all. There are lock mechanisms and remote invocation mechanisms to complete the process, while sorting out the TCC logic