>>>> 😜😜😜 Github: 👉 github.com/black-ant CASE backup: 👉 gitee.com/antblack/ca…

A. The preface

In this article, we will look at the Seata Client’s undo log operation.

Undo-log is the core part of AT mode. It is done in RM. When each database unit is processed, one undoLog data is generated.

2. Undo – log tables

Take a look at the table structure of undo-log

CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  `ext` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=17 DEFAULT CHARSET=utf8;

// Take a look at the parameters:- branch_id: branch ID - Context: mirror data - rollback_info: - log_status:Copy the code

The SQL statement

INSERT INTO `seata`.`undo_log`(
    `id`, `branch_id`, `xid`, 
    `context`, `rollback_info`, 
    `log_status`, `log_created`, `log_modified`, 
    `ext`
) VALUES (
    1.5116237355214458898.'192.168.181.2:8091:5116237355214458897'.'serializer=jackson'.0x7B7D.1.'the 2021-06-25 23:26:06'.'the 2021-06-25 23:26:06'.NULL
);

Copy the code

This method can be used to roll back the SQL statement:

// Take a look at the details of the currently inserted undoLog
private void insertUndoLog(String xid, long branchId, String rollbackCtx,
                               byte[] undoLogContent, State state, Connection conn) throws SQLException {
    try (PreparedStatement pst = conn.prepareStatement(INSERT_UNDO_LOG_SQL)) {
        pst.setLong(1.4386660905323926071);
        pst.setString(2."192.168.181.2:8091:4386660905323926065");
        pst.setString(3."serializer=jackson");
        pst.setBlob(4, BlobUtils.bytes2Blob(undoLogContent));
        pst.setInt(5, State.Normal(0));
        pst.executeUpdate();
    } catch (Exception e) {
        if(! (einstanceof SQLException)) {
                e = new SQLException(e);
        }
        throw(SQLException) e; }}/ / undoLogContent parameters
{
    "@class": "io.seata.rm.datasource.undo.BranchUndoLog"."xid": "192.168.181.2:8091:4386660905323926065"."branchId": 4386660905323926071."sqlUndoLogs": ["java.util.ArrayList"[{"@class": "io.seata.rm.datasource.undo.SQLUndoLog"."sqlType": "INSERT"."tableName": "t_order"."beforeImage": {
            "@class": "io.seata.rm.datasource.sql.struct.TableRecords$EmptyTableRecords"."tableName": "t_order"."rows": ["java.util.ArrayList", [the]]},"afterImage": {
            "@class": "io.seata.rm.datasource.sql.struct.TableRecords"."tableName": "t_order"."rows": ["java.util.ArrayList"[{"@class": "io.seata.rm.datasource.sql.struct.Row"."fields": ["java.util.ArrayList"[{"@class": "io.seata.rm.datasource.sql.struct.Field"."name": "id"."keyType": "PRIMARY_KEY"."type": 4."value": 31
                }, {
                    "@class": "io.seata.rm.datasource.sql.struct.Field"."name": "order_no"."keyType": "NULL"."type": 12."value": "63098e74e93b49bba77f1957e8fdab39"
                }, {
                    "@class": "io.seata.rm.datasource.sql.struct.Field"."name": "user_id"."keyType": "NULL"."type": 12."value": "1"
                }, {
                    "@class": "io.seata.rm.datasource.sql.struct.Field"."name": "commodity_code"."keyType": "NULL"."type": 12."value": "C201901140001"
                }, {
                    "@class": "io.seata.rm.datasource.sql.struct.Field"."name": "count"."keyType": "NULL"."type": 4."value": 50
                }, {
                    "@class": "io.seata.rm.datasource.sql.struct.Field"."name": "amount"."keyType": "NULL"."type": 8."value": 100.0}]}]}Copy the code

UndoLogContent is an array of bytes converted by blobutils. bytes2Blob, Xid and BranchId store global transaction ID(xid) and branch transaction ID(BranchId), and record tableName (tableName) and operation type (sqlType) in sqlUndoLogs attribute.

BeforeImage and afterImage are used for beforeand afterImage data.


3. Processing flow of Client undo-log

The Client provides three implementations for storing undo-logs. As you can see, all of them are persisted to libraries, depending on the specific library type

3.1 AbstractUndoLogManager parsing

AbstractUndoLogManager implements UndoLogManager, which is a major management tool to implement undo-log management. This class mainly implements the following methods

public interface UndoLogManager {

    void flushUndoLogs(ConnectionProxy cp) throws SQLException;

    void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException;

    void deleteUndoLog(String xid, long branchId, Connection conn) throws SQLException;

    void batchDeleteUndoLog(Set<String> xids, Set<Long> branchIds, Connection conn) throws SQLException;

    int deleteUndoLogByLogCreated(Date logCreated, int limitRows, Connection conn) throws SQLException;

}

Copy the code

There are three RM’s (Order, Account, Storage) in the whole case

3.2 Undo-log Initiated Process (Order)

  1. ConnectionProxy # doCommit: Initiates the entire COMMIT process
  2. ConnectionProxy # processGlobalTransactionCommit: global transaction commit operation
  3. UndoLogManagerFactory # getUndoLogManager: Gets the undoLog manager
  4. AbstractUndoLogManager # flushUndoLogs
  5. MySQLUndoLogManager # insertUndoLogWithNormal
  6. MySQLUndoLogManager # insertUndoLog: insertUndoLog

3.2.1 Main process of undo-log

FlushUndoLogs is the core process where the query is created for BranchUndoLog

public void flushUndoLogs(ConnectionProxy cp) throws SQLException {
    ConnectionContext connectionContext = cp.getContext();
    if(! connectionContext.hasUndoLog()) {return;
    }

    String xid = connectionContext.getXid();
    long branchId = connectionContext.getBranchId();
    
    // Create undo-log object -> 3.2.2 mirror query and obtain
    BranchUndoLog branchUndoLog = new BranchUndoLog();
    branchUndoLog.setXid(xid);
    branchUndoLog.setBranchId(branchId);
    branchUndoLog.setSqlUndoLogs(connectionContext.getUndoItems());

    UndoLogParser parser = UndoLogParserFactory.getInstance();
    byte[] undoLogContent = parser.encode(branchUndoLog);
    
    // Insert data -> 3.2.3 Insert final data
    insertUndoLogWithNormal(xid, branchId, buildContext(parser.getName()), undoLogContent,
        cp.getTargetConnection());
}

// branchUndoLog is stored in the image before and after
Copy the code

3.2.2 Querying and Obtaining a Mirror

The image is processed beforeImage and AfterImage. Let’s see where the image is queried

// The starting point of the Image is obtained from the Context
public class ConnectionProxy extends AbstractConnectionProxy {

    private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionProxy.class);

    private ConnectionContext context = new ConnectionContext();
    
}

// Let's look at the structure of ConnectionContext:
public class ConnectionContext {
    private String xid;
    private Long branchId;
    private boolean isGlobalLockRequire;

    /** * Table and primary key should not be duplicated. */
    private Set<String> lockKeysBuffer = new HashSet<>();
    private List<SQLUndoLog> sqlUndoItemsBuffer = new ArrayList<>();
    
}


// Query process:
C- ExecuteTemplate # execute
C- AbstractDMLBaseExecutor # executeAutoCommitFalse    
C- BaseTransactionalExecutor # prepareUndoLog
C- ConnectionContext # appendUndoItem

Copy the code

Step Start: Master logic, in which before and after images are queried

// In this process, most of the data operations are done
protected T executeAutoCommitFalse(Object[] args) throws Exception {
    if(! JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {throw new NotSupportYetException("multi pk only support mysql!");
    }
    // Query the front image
    TableRecords beforeImage = beforeImage();
    // Execute the SQL method
    T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
    // Mirror after query
    TableRecords afterImage = afterImage(beforeImage);
    / / save Undo - log
    prepareUndoLog(beforeImage, afterImage);
    return result;
}

// Added: TableRecords object
public class TableRecords implements java.io.Serializable {
    // Support serialization capability
    private static final long serialVersionUID = 4441667803166771721L;

    private transient TableMeta tableMeta;
    private String tableName;
    private List<Row> rows = new ArrayList<Row>();
    
Copy the code

Step 1: Obtain beforeImage

AbstractDMLBaseExecutor has multiple implementation classes depending on the processing

If you want to do an Update, you don’t want to do an INSERT.

// c-baseinsertexecutor: how to handle Insert cases
protected TableRecords beforeImage(a) throws SQLException {
    return TableRecords.empty(getTableMeta());
}

// c-updateExecutor: specifies how to query an Image when an Image is updated
protected TableRecords beforeImage(a) throws SQLException {
    ArrayList<List<Object>> paramAppenderList = new ArrayList<>();
    TableMeta tmeta = getTableMeta();
    String selectSQL = buildBeforeImageSQL(tmeta, paramAppenderList);
    return buildTableRecords(tmeta, selectSQL, paramAppenderList);
}

// Step 1-1 : 
private String buildBeforeImageSQL(TableMeta tableMeta, ArrayList<List<Object>> paramAppenderList) {
    SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer) sqlRecognizer;
    List<String> updateColumns = recognizer.getUpdateColumns();
    assertContainsPKColumnName(updateColumns);
    StringBuilder prefix = new StringBuilder("SELECT ");
    StringBuilder suffix = new StringBuilder(" FROM ").append(getFromTableInSQL());
    String whereCondition = buildWhereCondition(recognizer, paramAppenderList);
    if (StringUtils.isNotBlank(whereCondition)) {
        suffix.append(WHERE).append(whereCondition);
    }
    String orderBy = recognizer.getOrderBy();
    if (StringUtils.isNotBlank(orderBy)) {
        suffix.append(orderBy);
    }
    ParametersHolder parametersHolder = statementProxy instanceof ParametersHolder ? (ParametersHolder)statementProxy : null;
    String limit = recognizer.getLimit(parametersHolder, paramAppenderList);
    if (StringUtils.isNotBlank(limit)) {
        suffix.append(limit);
    }
    suffix.append(" FOR UPDATE");
    StringJoiner selectSQLJoin = new StringJoiner(",", prefix.toString(), suffix.toString());
    // Whether to update only columns
    if (ONLY_CARE_UPDATE_COLUMNS) {
        if(! containsPK(updateColumns)) { selectSQLJoin.add(getColumnNamesInSQL(tableMeta.getEscapePkNameList(getDbType()))); }// Query the updated columns
        for(String columnName : updateColumns) { selectSQLJoin.add(columnName); }}else {
        for(String columnName : tableMeta.getAllColumns().keySet()) { selectSQLJoin.add(ColumnUtils.addEscape(columnName, getDbType())); }}// SELECT id, count FROM t_storage WHERE commodity_code = ? FOR UPDATE
    return selectSQLJoin.toString();
}
Copy the code

TableMeta Table metadata


Step 2: Query AfterImages

protected TableRecords afterImage(TableRecords beforeImage) throws SQLException {
        TableMeta tmeta = getTableMeta();
        if (beforeImage == null || beforeImage.size() == 0) {
            return TableRecords.empty(getTableMeta());
        }
        String selectSQL = buildAfterImageSQL(tmeta, beforeImage);
        ResultSet rs = null;
        try (PreparedStatement pst = statementProxy.getConnection().prepareStatement(selectSQL)) {
            SqlGenerateUtils.setParamForPk(beforeImage.pkRows(), getTableMeta().getPrimaryKeyOnlyName(), pst);
            rs = pst.executeQuery();
            return TableRecords.buildRecords(tmeta, rs);
        } finally{ IOUtil.close(rs); }}// I won't go into detail here
  
Copy the code

Step 3: Add undo-log to build Context ()

C- BaseTransactionalExecutor
protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
    // Undo-log is created only when the image is changed
    if (beforeImage.getRows().isEmpty() && afterImage.getRows().isEmpty()) {
        return;
    }
    // Get the proxy connector
    ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
    // Insert entity -> see figure below
    TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
    String lockKeys = buildLockKey(lockKeyRecords);
    connectionProxy.appendLockKey(lockKeys);

    SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
    connectionProxy.appendUndoLog(sqlUndoLog);
}    
Copy the code

After querying the Image, let’s take a look at the insert process of the Image again

3.2.3 Insertion of final data

protected void insertUndoLogWithGlobalFinished(String xid, long branchId, UndoLogParser parser, Connection conn) throws SQLException {
	insertUndoLog(xid, branchId, buildContext(parser.getName()),
		parser.getDefaultContent(), State.GlobalFinished, conn);
}

// The data insertion logic is shown in Section 2
Copy the code

Update mirror undo-log (Storage)

The main process is the same as the Order. If you look at the undo-log data during the insert, you can see that instead of generating a SQL, the fields and data are mirrored

And the image here is dealing with the nodes that are moving

{
    "@class": "io.seata.rm.datasource.undo.BranchUndoLog"."xid": "192.168.181.2:8091:4386660905323926147"."branchId": 4386660905323926150."sqlUndoLogs": ["java.util.ArrayList"[{"@class": "io.seata.rm.datasource.undo.SQLUndoLog"."sqlType": "UPDATE"."tableName": "t_storage"."beforeImage": {
        	"@class": "io.seata.rm.datasource.sql.struct.TableRecords"."tableName": "t_storage"."rows": ["java.util.ArrayList"[{"@class": "io.seata.rm.datasource.sql.struct.Row"."fields": ["java.util.ArrayList"[{"@class": "io.seata.rm.datasource.sql.struct.Field"."name": "id"."keyType": "PRIMARY_KEY"."type": 4."value": 1
                }, {
                	"@class": "io.seata.rm.datasource.sql.struct.Field"."name": "count"."keyType": "NULL"."type": 4."value": -800}}}]]]]."afterImage": {
        	"@class": "io.seata.rm.datasource.sql.struct.TableRecords"."tableName": "t_storage"."rows": ["java.util.ArrayList"[{"@class": "io.seata.rm.datasource.sql.struct.Row"."fields": ["java.util.ArrayList"[{"@class": "io.seata.rm.datasource.sql.struct.Field"."name": "id"."keyType": "PRIMARY_KEY"."type": 4."value": 1
                }, {
                	"@class": "io.seata.rm.datasource.sql.struct.Field"."name": "count"."keyType": "NULL"."type": 4."value": -850}]}]}Copy the code

The same thing with Account, but I won’t leave it there

4. Client undo-log rollback process

The process of creating an undo-log is shown in the preceding section. The process of undo-log is shown in the following section

It is important to note that the undo-log is created in each RM, but is rolled back in

4.1 Undo log Rollback Process

Rollback Main process:

  1. RmBranchRollbackProcessor # process: to receive back processing the request
  2. RmBranchRollbackProcessor # handleBranchRollback
  3. AbstractRMHandler # onRequest
  4. AbstractRMHandler # handle
  5. AbstractExceptionHandler # exceptionHandleTemplate
  6. AbstractRMHandler # handle
  7. AbstractRMHandler # doBranchRollback: Branch rollback
  8. DataSourceManager # branchRollback
  9. AbstractUndoLogManager # undo: Execute the undo logic
  10. AbstractUndoLogManager # deleteUndoLog: Delete branch

We can see here that the core logic is undo, the code of this logic is relatively long, I here divided into callback and delete undo-log 2 logic to see:

4.2 Calling back the main logic

C- AbstractUndoLogManager
public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
    Connection conn = null;
    ResultSet rs = null;
    PreparedStatement selectPST = null;
    boolean originalAutoCommit = true;

    for(; ;) { conn = dataSourceProxy.getPlainConnection();// The entire undo process should run in a local transaction.
        if (originalAutoCommit = conn.getAutoCommit()) {
            conn.setAutoCommit(false);
        }

        // Query undo-log by branchId and xID
        selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
        selectPST.setLong(1, branchId);
        selectPST.setString(2, xid);
        rs = selectPST.executeQuery();

        boolean exists = false;
        // Loop the undo-log
        while (rs.next()) {
            exists = true;

            // The server may repeatedly send rollback requests to roll back the same branch transaction to multiple processes, thus ensuring that only the undo_log is processed in the normal state
            int state = rs.getInt(ClientTableColumnsName.UNDO_LOG_LOG_STATUS);
            if(! canUndo(state)) {return;
            }

            String contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);
            Map<String, String> context = parseContext(contextString);
            byte[] rollbackInfo = getRollbackInfo(rs);

            String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);
            UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance()
                    : UndoLogParserFactory.getInstance(serializer);
            // Deserialize to BranchUndoLog
            BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);

            try {
                // put serializer name to local
                setCurrentSerializer(parser.getName());
                List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
                if (sqlUndoLogs.size() > 1) {
                    // Reverse the order
                    Collections.reverse(sqlUndoLogs);
                }
                
                // Run the undo-log command to perform the rollback
                for(SQLUndoLog sqlUndoLog : sqlUndoLogs) { TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta( conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId()); sqlUndoLog.setTableMeta(tableMeta); AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor( dataSourceProxy.getDbType(), sqlUndoLog); undoExecutor.executeOn(conn); }}finally {
                // remove serializer nameremoveCurrentSerializer(); }}/ /... Omit the rollback logic}}Copy the code

AbstractUndoExecutor rolls back the executeOn

public void executeOn(Connection conn) throws SQLException {
    if(IS_UNDO_DATA_VALIDATION_ENABLE && ! dataValidationAndGoOn(conn)) {return;
    }
    try {
        // UPDATE t_storage SET count = ? WHERE id = ?  
        String undoSQL = buildUndoSQL();
        PreparedStatement undoPST = conn.prepareStatement(undoSQL);
        TableRecords undoRows = getUndoRows();
        // Get the affected columns
        for (Row undoRow : undoRows.getRows()) {
            ArrayList<Field> undoValues = new ArrayList<>();
            List<Field> pkValueList = getOrderedPkList(undoRows, undoRow, getDbType(conn));
            for (Field field : undoRow.getFields()) {
                if (field.getKeyType() != KeyType.PRIMARY_KEY) {
                    undoValues.add(field);
                }
            }
            // Parse the value of the field to be rolled back (that is, the original value)
            undoPrepare(undoPST, undoValues, pkValueList);
            // Run the undo-log command to roll back the valueundoPST.executeUpdate(); }}catch (Exception ex) {

    }

}
Copy the code

5. Client undo-log delete process

After the rollback is complete, take a look at the undo-log delete processing. The delete logic is processed after the rollback logic

5.1 undo-log Main logic

public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
    Connection conn = null;
    ResultSet rs = null;
    PreparedStatement selectPST = null;
    boolean originalAutoCommit = true;

    for(; ;) {try {
            / /... Omit rollback logic
 
            // If the undo_log exists, it means that the branch transaction has completed the first phase, we can simply roll back and clean up the undo_log
            // Otherwise, it indicates that there is an exception in the branch transaction, causing undo_log not to be written to the database.

            // For example, the global transaction is rolled back by the initiator when the business processing times out.
            // To ensure data consistency, we can insert an undo_log with GlobalFinished status to prevent the first phase local transactions of other programs from being committed correctly.

            if (exists) {
                deleteUndoLog(xid, branchId, conn);
                conn.commit();
            } else {
                insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);
                conn.commit();
            }

            return;
        } catch (SQLIntegrityConstraintViolationException e) {
            // Possible undo_log has been inserted into the database by other processes, retrying rollback undo_log
        } catch (Throwable e) {
            if(conn ! =null) {
                try {
                    conn.rollback();
                } catch (SQLException rollbackEx) {
                    LOGGER.warn("Failed to close JDBC resource while undo ... ", rollbackEx); }}throw new BranchTransactionException(BranchRollbackFailed_Retriable, String
                .format("Branch session rollback failed and try again later xid = %s branchId = %s %s", xid,
                    branchId, e.getMessage()), e);

        } finally {
            / /...}}}Copy the code

5.2 delete undo – log

    public void deleteUndoLog(String xid, long branchId, Connection conn) throws SQLException {
        try (PreparedStatement deletePST = conn.prepareStatement(DELETE_UNDO_LOG_SQL)) {
            deletePST.setLong(1, branchId);
            deletePST.setString(2, xid);
            deletePST.executeUpdate();
        } catch (Exception e) {
            if(! (einstanceof SQLException)) {
                e = new SQLException(e);
            }
            throw(SQLException) e; }}Copy the code

conclusion

This article just summarizes the undo-log logic, mainly through BeforeImage and AfterImage to save before and after logic, used for rollback processing

But that’s not all. There are lock mechanisms and remote invocation mechanisms to complete the process, while sorting out the TCC logic