We continue the previous TCC transaction model, today Xiao Ge “play” Seata TCC model core design principle code landing scheme, whether it is tuning or interview is very helpful.
Seata TCC model implementation architecture diagram
The core code of TCC transaction model architecture in the figure above is described as follows (the following code corresponds to the structure in the figure) :
TM core architecture code
TMClient client implementation
public final class TmRpcClient extends AbstractRpcRemotingClient {
/** * Get transaction management client instance */
public static TmRpcClient getInstance(String applicationId, String transactionServiceGroup) {
TmRpcClient tmRpcClient = getInstance();
// Set the application number
tmRpcClient.setApplicationId(applicationId);
// Set the TC cluster key
tmRpcClient.setTransactionServiceGroup(transactionServiceGroup);
return tmRpcClient;
}
/** * Get transaction management client instance */
public static TmRpcClient getInstance(a) {
if (null == instance) {
synchronized (TmRpcClient.class) {
if (null == instance) {
NettyClientConfig nettyClientConfig = new NettyClientConfig();
final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(
nettyClientConfig.getClientWorkerThreads(), nettyClientConfig.getClientWorkerThreads(),
KEEP_ALIVE_TIME, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),
new NamedThreadFactory(nettyClientConfig.getTmDispatchThreadPrefix(),
nettyClientConfig.getClientWorkerThreads()),
RejectedPolicies.runsOldestTaskPolicy());
instance = new TmRpcClient(nettyClientConfig, null, messageExecutor); }}}returninstance; }}Copy the code
TransactionManager TransactionManager contract
public interface TransactionManager {
/** * start a new global transaction */
String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException;
/** * Commit global transaction */
GlobalStatus commit(String xid) throws TransactionException;
/** * rollback global transaction */
GlobalStatus rollback(String xid) throws TransactionException;
/** * gets the specified global transaction state */
GlobalStatus getStatus(String xid) throws TransactionException;
/** * Global transaction report */
GlobalStatus globalReport(String xid, GlobalStatus globalStatus) throws TransactionException;
}
Copy the code
DefaultTransactionManager manager to achieve things
Pseudocode that provides core implementation points
public class DefaultTransactionManager implements TransactionManager {
// Start global things
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
// Build the global transaction request
GlobalBeginRequest request = new GlobalBeginRequest();
request.setTransactionName(name);
request.setTimeout(timeout);
// Synchronous send global transaction enabled
GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request);
if (response.getResultCode() == ResultCode.Failed) {
throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
}
return response.getXid();
}
private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
try {
// Send protocol requests to the TC through the client
return (AbstractTransactionResponse)TmRpcClient.getInstance().sendMsgWithResponse(request);
} catch (TimeoutException toe) {
throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe); }}}Copy the code
GlobalTransaction GlobalTransaction contract
public interface GlobalTransaction {
/** * start global transaction */
void begin(a) throws TransactionException;
/** * start global transaction, specify timeout */
void begin(int timeout) throws TransactionException;
/** * starts the global transaction, specifying the transaction name */
void begin(int timeout, String name) throws TransactionException;
/** * Global transaction commit */
void commit(a) throws TransactionException;
/** * Global transaction rollback */
void rollback(a) throws TransactionException;
/** * get global transaction status */
GlobalStatus getStatus(a) throws TransactionException;
/** * global transaction number */
String getXid(a);
/** * Reports global transaction status */
void globalReport(GlobalStatus globalStatus) throws TransactionException;
}
Copy the code
DefaultGlobalTransaction Global transaction implementation
public class DefaultGlobalTransaction implements GlobalTransaction {
// Start the global transaction
public void begin(a) throws TransactionException {
begin(DEFAULT_GLOBAL_TX_TIMEOUT);
}
// Start the global transaction
public void begin(int timeout) throws TransactionException {
begin(timeout, DEFAULT_GLOBAL_TX_NAME);
}
// Start the global transaction
public void begin(int timeout, String name) throws TransactionException {
if(role ! = GlobalTransactionRole.Launcher) { check();return;
}
if(xid ! =null) {
throw new IllegalStateException();
}
if(RootContext.getXID() ! =null) {
throw new IllegalStateException();
}
// Start a global transaction through the transaction manager
xid = transactionManager.begin(null.null, name, timeout);
status = GlobalStatus.Begin;
// The global context binds the global transaction number
RootContext.bind(xid);
}
// Commit the global transaction
public void commit(a) throws TransactionException {
if (role == GlobalTransactionRole.Participant) {
return;
}
if (xid == null) {
throw new IllegalStateException();
}
int retry = COMMIT_RETRY_COUNT;
try {
// Maximum number of failed retries
while (retry > 0) {
try {
// Commit the transaction
status = transactionManager.commit(xid);
break;
} catch (Throwable ex) {
retry--;
if (retry == 0) {
throw new TransactionException("Failed to report global commit", ex); }}}}finally {
if(RootContext.getXID() ! =null&& xid.equals(RootContext.getXID())) { RootContext.unbind(); }}}... The other methods are basically the same, you can see the source code}Copy the code
The interceptor GlobalTransactionalInterceptor global transaction method
public class GlobalTransactionalInterceptor implements ConfigurationChangeListener.MethodInterceptor {
// @globalTransaction annotation method interception
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
// Get the class object of the target methodClass<? > targetClass = methodInvocation.getThis() ! =null ? AopUtils.getTargetClass(methodInvocation.getThis())
: null;
// Get the method object
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
// Get the specific method
final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
// Get the GlobalTransactional annotation
final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
// Get the GlobalLock annotation
final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);
// Global transactions are enabled and methods are marked with global transaction annotations
if(! disable && globalTransactionalAnnotation ! =null) {
// Make the global transaction call process
return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
} else if(! disable && globalLockAnnotation ! =null) { // Enable global transactions and the global lock annotation is not null
// Run the global lock process
return handleGlobalLock(methodInvocation);
} else {
// Plain method calls
returnmethodInvocation.proceed(); }}Copy the code
RM Core architecture code
RMClient client core code
public class RMClient {
/** * Resource management client initialization */
public static void init(String applicationId, String transactionServiceGroup) {
// Create a resource management client.
RmRpcClient rmRpcClient = RmRpcClient.getInstance(applicationId, transactionServiceGroup);
// Set resource management
rmRpcClient.setResourceManager(DefaultResourceManager.get());
// Set the client message listener
rmRpcClient.setClientMessageListener(new RmMessageListener(DefaultRMHandler.get(), rmRpcClient));
// The resource client is initializedrmRpcClient.init(); }}Copy the code
ResourceManagerInbound Contract for receiving resource management messages
public interface ResourceManagerInbound {
/** * Receive local transaction commit */ from TC
BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException;
/** * Rollback of local transactions sent by TC */
BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException;
}
Copy the code
ResourceManager Resource management contract
public interface ResourceManager extends ResourceManagerInbound.ResourceManagerOutbound {
/** * Register resources */
void registerResource(Resource resource);
/** * Delete resource */
void unregisterResource(Resource resource);
/** * get all resources */
Map<String, Resource> getManagedResources(a);
/** * get transaction model type */
BranchType getBranchType(a);
}
Copy the code
TCCResourceManager TCC transaction manager
public class TCCResourceManager extends AbstractResourceManager {
// Branch commit
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
// Get the TCC resource entity
TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);
if (tccResource == null) {
throw new ShouldNeverHappenException(String.format("TCC resource is not exist, resourceId: %s", resourceId));
}
// Get the target object
Object targetTCCBean = tccResource.getTargetBean();
// Get the commit method
Method commitMethod = tccResource.getCommitMethod();
try {
boolean result = false;
// Get the BusinessActionContext object
BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,
applicationData);
// Commit the method call
Object ret = commitMethod.invoke(targetTCCBean, businessActionContext);
// Get the result of the commit method
if(ret ! =null) {
if (ret instanceof TwoPhaseResult) {
result = ((TwoPhaseResult)ret).isSuccess();
} else {
result = (boolean)ret; }}// Returns the value of the two-phase commit result
return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable;
} catch (Throwable t) {
// Fail to throw exception
throw newFrameworkException(t, msg); }}//TCC branch type
public BranchType getBranchType(a) {
returnBranchType.TCC; }}Copy the code
TC core architecture code
RpcServer Transaction Coordinator (server side)
public class RpcServer extends AbstractRpcRemotingServer {
/** * Synchronously sends the response */
@Override
public void sendResponse(RpcMessage request, Channel channel, Object msg) {
Channel clientChannel = channel;
// Non-heartbeat packets
if(! (msginstanceof HeartbeatMessage)) {
// Get the client channel
clientChannel = ChannelManager.getSameClientChannel(channel);
}
if(clientChannel ! =null) {
// If the client channel is not empty, send the response result
super.defaultSendResponse(request, clientChannel, msg);
} else {
throw new RuntimeException("channel is error. channel:"+ clientChannel); }}/** * synchronously sends the request with the response body */
@Override
public Object sendSyncRequest(String resourceId, String clientId, Object message,
long timeout) throws TimeoutException {
Channel clientChannel = ChannelManager.getChannel(resourceId, clientId);
if (clientChannel == null) {
throw new RuntimeException("rm client is not connected. dbkey:" + resourceId
+ ",clientId:" + clientId);
}
return sendAsyncRequestWithResponse(null, clientChannel, message, timeout);
}
/** * synchronously sends requests with response objects */
@Override
public Object sendSyncRequest(Channel clientChannel, Object message) throws TimeoutException {
return sendSyncRequest(clientChannel, message, NettyServerConfig.getRpcRequestTimeout());
}
/** * synchronously sends requests with response objects */
@Override
public Object sendSyncRequest(Channel clientChannel, Object message, long timeout) throws TimeoutException {
if (clientChannel == null) {
throw new RuntimeException("rm client is not connected");
}
return sendAsyncRequestWithResponse(null, clientChannel, message, timeout);
}
/** * asynchronously sends a request with a response object */
@Override
public Object sendSyncRequest(String resourceId, String clientId, Object message)
throws TimeoutException {
return sendSyncRequest(resourceId, clientId, message, NettyServerConfig.getRpcRequestTimeout());
}
/** * asynchronously sends a request with a response object */
@Override
public Object sendASyncRequest(Channel channel, Object message) throws TimeoutException {
returnsendAsyncRequestWithoutResponse(channel, message); }}Copy the code
TransactionMessageHandler transaction coordinator receives the message processing contract
public interface TransactionMessageHandler {
/** * receive the request message */
AbstractResultMessage onRequest(AbstractMessage request, RpcContext context);
/** * Receive the response message */
void onResponse(AbstractResultMessage response, RpcContext context);
}
Copy the code
Your “like” is the best support and motivation for me.
Pay attention to xiao Ge play structure, follow-up efforts to launch high-quality content.
Chat 🏆 technology project stage v | distributed those things…