1 XA mode example
Example reference: The seata-SAMPLE service code level on Github is the same as XA, only the database agent level can be replaced with DataSourceProxyXA. For more information, please refer to the example.
2 architecture
Note: This figure is from seATA website.
3 Source code Analysis
3.1 TM Enable global transactions
This process is the same as the AT pattern, using @GlobalTransactional.
3.2 RM Executes branch transactions
Because DataSource uses a proxy, all DB operations are done by DataSourceProxyXA, and when DB operations are performed, the request will be executed by ExecuteTemplateXA.
3.2.1 execute SQL: ExecuteTemplateXA#execute
The main logic can be seen in the following code:
- Record the transaction commit mode, auto-commit or non-auto-commit
- Transaction commit way to automatically submit, see: connectionProxyXA. SetAutoCommit method. This step registers the branch transaction with the TC.
- Perform DB operations.
- Failure, so the rollback, see: connectionProxyXA. The rollback ()
- If the transaction starts as an auto-commit, commit, see: connectionProxyXA.com MIT ()
public static <T, S extends Statement> T execute(AbstractConnectionProxyXA connectionProxyXA, StatementCallback<T, S> statementCallback, S targetStatement, Object... args) throws SQLException {
boolean autoCommitStatus = connectionProxyXA.getAutoCommit();
if (autoCommitStatus) {
// XA Start
connectionProxyXA.setAutoCommit(false);
}
try {
T res = null;
try {
// execute SQL
res = statementCallback.execute(targetStatement, args);
} catch (Throwable ex) {
if (autoCommitStatus) {
// XA End & Rollback
try {
connectionProxyXA.rollback();
} catch (SQLException sqle) {
// log and ignore the rollback failure.
LOGGER.warn(
"Failed to rollback xa branch of " + connectionProxyXA.xid +
"(caused by SQL execution failure(" + ex.getMessage() + ") since " + sqle.getMessage(),
sqle);
}
}
if (ex instanceof SQLException) {
throw ex;
} else {
throw new SQLException(ex);
}
}
if (autoCommitStatus) {
try {
// XA End & Prepare
connectionProxyXA.commit();
} catch (Throwable ex) {
LOGGER.warn(
"Failed to commit xa branch of " + connectionProxyXA.xid + ") since " + ex.getMessage(),
ex);
// XA End & Rollback
if (!(ex instanceof SQLException) || !AbstractConnectionProxyXA.SQLSTATE_XA_NOT_END.equalsIgnoreCase(((SQLException) ex).getSQLState())) {
try {
connectionProxyXA.rollback();
} catch (SQLException sqle) {
// log and ignore the rollback failure.
LOGGER.warn(
"Failed to rollback xa branch of " + connectionProxyXA.xid +
"(caused by commit failure(" + ex.getMessage() + ") since " + sqle.getMessage(),
sqle);
}
}
if (ex instanceof SQLException) {
throw ex;
} else {
throw new SQLException(ex);
}
}
}
return res;
} finally {
if (autoCommitStatus) {
connectionProxyXA.setAutoCommit(true);
}
}
}
Copy the code
3.2.2 setting the automatic commit mode: ConnectionProxyXA#setAutoCommit
- Register branch transactions with TCS.
- Bind the current branch transaction ID to xaResource: xaresource.start
public void setAutoCommit(boolean autoCommit) throws SQLException { if (currentAutoCommitStatus == autoCommit) { return; } if (autoCommit) { // According to JDBC spec: // If this method is called during a transaction and the // auto-commit mode is changed, the transaction is committed. if (xaActive) { commit(); } } else { if (xaActive) { throw new SQLException("should NEVER happen: setAutoCommit from true to false while xa branch is active"); } // Start a XA branch long branchId = 0L; try { // 1. register branch to TC then get the branchId branchId = DefaultResourceManager.get().branchRegister(BranchType.XA, resource.getResourceId(), null, xid, null, null); } catch (TransactionException te) { cleanXABranchContext(); throw new SQLException("failed to register xa branch " + xid + " since " + te.getCode() + ":" + te.getMessage(), te); } // 2. build XA-Xid with xid and branchId this.xaBranchXid = XAXidBuilder.build(xid, branchId); try { // 3. XA Start xaResource.start(this.xaBranchXid, XAResource.TMNOFLAGS); } catch (XAException e) { cleanXABranchContext(); throw new SQLException("failed to start xa branch " + xid + " since " + e.getMessage(), e); } // 4. XA is active this.xaActive = true; } currentAutoCommitStatus = autoCommit; }Copy the code
3.2.3 setting the automatic commit mode: ConnectionProxyXA#commit
- Terminates the work performed by the xaBranchXid branch, xaresource.end
- The request resource manager prepares the commit for the xaBranchXid transaction: xaresource.prepare
public void commit() throws SQLException {
if (currentAutoCommitStatus) {
// Ignore the committing on an autocommit session.
return;
}
if (!xaActive || this.xaBranchXid == null) {
throw new SQLException("should NOT commit on an inactive session", SQLSTATE_XA_NOT_END);
}
try {
// XA End: Success
xaResource.end(xaBranchXid, XAResource.TMSUCCESS);
// XA Prepare
xaResource.prepare(xaBranchXid);
// Keep the Connection if necessary
keepIfNecessary();
} catch (XAException xe) {
try {
// Branch Report to TC: Failed
DefaultResourceManager.get().branchReport(BranchType.XA, xid, xaBranchXid.getBranchId(),
BranchStatus.PhaseOne_Failed, null);
} catch (TransactionException te) {
LOGGER.warn("Failed to report XA branch commit-failure on " + xid + "-" + xaBranchXid.getBranchId()
+ " since " + te.getCode() + ":" + te.getMessage() + " and XAException:" + xe.getMessage());
}
throw new SQLException(
"Failed to end(TMSUCCESS)/prepare xa branch on " + xid + "-" + xaBranchXid.getBranchId() + " since " + xe
.getMessage(), xe);
} finally {
cleanXABranchContext();
}
}
Copy the code
3.2.4 Setting the automatic submission mode: ConnectionProxyX #rollback
- Report transaction execution failure to TC.
- Terminates the work performed by the xaBranchXid branch, xaresource.end
public void rollback() throws SQLException {
if (currentAutoCommitStatus) {
// Ignore the committing on an autocommit session.
return;
}
if (!xaActive || this.xaBranchXid == null) {
throw new SQLException("should NOT rollback on an inactive session");
}
try {
// Branch Report to TC
DefaultResourceManager.get().branchReport(BranchType.XA, xid, xaBranchXid.getBranchId(), BranchStatus.PhaseOne_Failed, null);
} catch (TransactionException te) {
// log and ignore the report failure
LOGGER.warn("Failed to report XA branch rollback on " + xid + "-" + xaBranchXid.getBranchId()
+ " since " + te.getCode() + ":" + te.getMessage());
}
try {
// XA End: Fail
xaResource.end(xaBranchXid, XAResource.TMFAIL);
} catch (XAException xe) {
throw new SQLException(
"Failed to end(TMFAIL) xa branch on " + xid + "-" + xaBranchXid.getBranchId() + " since " + xe
.getMessage(), xe);
} finally {
cleanXABranchContext();
}
}
Copy the code
3.3 TM sends a request to TC to perform Global Commit/Rollback
This process is exactly the same as the AT pattern.
3.4 TCS send Branch Commit/Rollback requests to Branches
This process is exactly the same as the AT pattern.
3.5 RM Performed Branch Commit/Rollback
AbstractRMHandler receives the request and sends it to ResourceManagerXA for processing. Commit and Rollback to xaCommit and xaRollback. They directly trigger the COMMIT, rollback methods of xaResource.
public void xaCommit(String xid, long branchId, String applicationData) throws XAException {
XAXid xaXid = XAXidBuilder.build(xid, branchId);
xaResource.commit(xaXid, false);
releaseIfNecessary();
}
public void xaRollback(String xid, long branchId, String applicationData) throws XAException {
XAXid xaXid = XAXidBuilder.build(xid, branchId);
xaResource.rollback(xaXid);
releaseIfNecessary();
}
Copy the code
3.6 Abnormal Compensation
The exception compensation process is exactly the same as the AT mode.
4 XAResource
Seata’s XA mode is implemented based on the XAResource interface of the JDK. Below is a description of this interface from the JDK documentation.
4.1 the XAResource profile
The XAResource interface is a Java mapping of the industry standard XA interface based on the X/Open CAE specification (Distributed Transaction Processing: XA specification).
In a distributed transaction processing (DTP) environment, an XA interface defines the agreement between a resource manager and a transaction manager. The JDBC driver or JMS provider implements this interface to support association between global transactions and database or messaging service connections.
The XAResource interface is supported by any transaction resource that can be used by an application in an environment where an external transaction manager controls a transaction. Database management systems are such resources. Applications can access data through multiple database connections. Add each database connection to the list as a transaction resource through the transaction manager. The transaction manager gets the XAResource for each connection that participates in a global transaction. The transaction manager uses the start method to establish associations between global transactions and resources, and the end method to disassociate transactions and resources. The resource manager is responsible for associating global transactions with all the work performed on their data between the start and end method calls.
At transaction commit time, the transaction manager notifies the resource manager to prepare, commit, or roll back the transaction under the two-phase commit protocol.
4.2 Method Summary
Returns the result | The method signature | describe |
---|---|---|
void | commit(Xid xid, boolean onePhase) | Commits the global transaction specified by xID. |
void | end(Xid xid, int flags) | Terminates work performed on behalf of the transaction branch. |
void | forget(Xid xid) | Tells the resource manager to ignore transaction branches that are completed heuristically. |
int | getTransactionTimeout() | Gets the current transaction timeout value set for this XAResource instance. |
boolean | isSameRM(XAResource xares) | Call this method to determine whether the resource manager instance represented by the target object is the same as that represented by the parameter Xares. |
int | prepare(Xid xid) | Request the resource manager to prepare the transaction commit work for the transaction specified in the XID. |
Xid[] | recover(int flag) | Gets a list of prepared transaction branches from the resource manager. |
void | rollback(Xid xid) | Notifies the resource manager to roll back work performed on behalf of the transaction branch. |
boolean | setTransactionTimeout(int seconds) | Set the current transaction timeout value for this XAResource instance. |
void | start(Xid xid, int flags) | Starts work on behalf of the transaction branch specified in the XID. |
4.3 Field Summary
type | attribute | describe |
---|---|---|
static int | TMENDRSCAN | The recovery scan is terminated. |
static int | TMFAIL | Unassociate the caller and mark the transaction branch as rollback only. |
static int | TMJOIN | The caller is connecting to an existing transaction branch. |
static int | TMNOFLAGS | Use TMNOFLAGS to indicate that no flag value is selected. |
static int | TMONEPHASE | The caller is using one-stage optimization. |
static int | TMRESUME | The caller is restoring the association with the pending transaction branch. |
static int | TMSTARTRSCAN | Start recovery scan. |
static int | TMSUCCESS | Disassociate the caller from the transaction branch. |
static int | TMSUSPEND | The caller is suspending (not terminating) its association with the transaction branch. |
static int | XA_OK | Transaction work is ready to go. |
static int | XA_RDONLY | The transaction branch is read-only and committed. |
5 Reference Documents
Seata XA mode
Principle of Seata
Seata – AT mode
Seata – TCC mode
Seata – Saga mode
Seata – XA mode
TCC ws-transaction principle