Why TCC mode

Seata provides AT mode, TCC mode, Sega mode, XA mode, why do I choose TCC mode? First of all, XA mode is excluded. In the actual development process, not only database is used, and XA is a “rigid transaction”, performance is worrying. The rest are not really good or bad. The advantages and disadvantages of a distributed transaction framework are usually judged from the following three aspects:

  • Low cost of business transformation
  • Low performance loss
  • Isolation ensures integrity

But just like CPA theory, you can never satisfy all three of them, as shown below:

Sega based on business compensation has good performance and low service transformation cost but does not support isolation; Seata(AT mode) has low service transformation cost and meets isolation type but not ideal performance; TCC has good performance and meets isolation but high transformation cost. Define the prepare, COMMIT, and rollback methods for each local transaction. The company is engaged in e-commerce business with large traffic. Performance and isolation are indispensable, so I choose TCC.

The overall architecture of Seata

Seata is best known for the AT mode, but it also supports TCC mode. Let’s start with a look at some of the common considerations for designing distributed transaction frameworksDTPTheory of Distributed Transaction Processing:

  • RM is responsible for submitting local transactions, registering branch transactions, determining locks, and playing the role of transaction participant
  • TM is responsible for the commit of the whole transaction and the trigger of rollback instructions, acting as the overall coordinator of the transaction.

When different frameworks are implemented, they will make appropriate adjustments based on this theoretical model. For example, TM is deployed together with applications in the form of JAR packages, while TM is separated and needs to be deployed separately (for example, Seata puts the main functions of TM into a logical set of Server, It is called TCS (Transaction Coordinators). The advantage of this method is that it relies on decoupling, so that the same JAR does not have to be stored on each application server. The disadvantage of this method is that TCS and RMS have more NETWORK IO when they interact with TCS.

So the overall architecture of Seata is shown below:It’s actually kind of confusing when you look at this diagram and not look at the code; TC is easier to understand, but what about RM and TM?

  • RM full ResourceManager ResourceManager. What is a resource? Let’s draw a topology of RPC calls:

In the figure above, one server A1 providing service A needs to call service B, service C and service D to complete A transaction. Each service is provided by several producers. For example, service B is provided by machines B1, B2 and B3, and the interface called on these three machines is one of themRM(The granularity of RM is accurate to a certain IP address, a certain port, and a certain interface of an application.) A1, as the initiator of the concatenation of this call, holds the global transaction and is a TM role. However, not every RM will participate in the global transaction initiated by TM every time. Take service B as an example. If the service B called by A1 is actually provided by B1 machine this time, only B1 of the three RM’s B1, B2 and B3 will participate in the global transaction.

In addition, when a TM on A1 initiates a transaction, it may first call A1 machine’s own file system, which is also an RM.

Several key ids in Seata

Key ids in Seata are:

  • Xid: global transaction ID. Corresponding to the above example, TM of A1 initiates a global transaction, and A1 calls B, C, and D. The whole process has a common XID, which is generated when TM initiates a global transaction and stored in the GlobalSession. Similar to traceId in a distributed link
  • BranchId: branch transaction ID. Corresponding to the above example, A1 invokes service B. Assuming that B1 actually uses the service provided by B1 server resources, B1 will apply for a branchId from TC transaction coordinator through netty communication and put it into BranchSession when being invoked. Seata maintains a GlobalSeesion mapping from 1 to N for BranchSession in TCS.
  • ResourceId: ResourceId, as explained above, is a resource. In TCC mode, resourceId is the name attribute of @twophasebusinessaction. Therefore, in distributed cluster scenario, resourceId cannot uniquely identify a machine. The resourceId is placed in RM_CHANNELS as the first filter for the method to locate the target machine. When the requested channel is destroyed, the TC will change the resource to another port on the same machine. If the resource cannot be found, the TC will search for another port on the same machine. Other applicationId machines will be found. But resourceId does never compromise the bottom line. This code gets a channel like this:
public static Channel getChannel(String resourceId, String clientId) {
        Channel resultChannel = null;

        String[] clientIdInfo = readClientId(clientId);

        if (clientIdInfo == null|| clientIdInfo.length ! =3) {
            throw new FrameworkException("Invalid Client ID: " + clientId);
        }

        String targetApplicationId = clientIdInfo[0];
        String targetIP = clientIdInfo[1];
        int targetPort = Integer.parseInt(clientIdInfo[2]);

        ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer,
            RpcContext>>> applicationIdMap = RM_CHANNELS.get(resourceId);

        if (targetApplicationId == null || applicationIdMap == null ||  applicationIdMap.isEmpty()) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("No channel is available for resource[{}]", resourceId);
            }
            return null;
        }

        ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>> ipMap = applicationIdMap.get(targetApplicationId);

        if(ipMap ! =null && !ipMap.isEmpty()) {
            // Firstly, try to find the original channel through which the branch was registered.
            ConcurrentMap<Integer, RpcContext> portMapOnTargetIP = ipMap.get(targetIP);
            if(portMapOnTargetIP ! =null && !portMapOnTargetIP.isEmpty()) {
                RpcContext exactRpcContext = portMapOnTargetIP.get(targetPort);
                if(exactRpcContext ! =null) {
                    Channel channel = exactRpcContext.getChannel();
                    if (channel.isActive()) {
                        resultChannel = channel;
                        if (LOGGER.isDebugEnabled()) {
                            LOGGER.debug("Just got exactly the one {} for {}", channel, clientId); }}else {
                        if (portMapOnTargetIP.remove(targetPort, exactRpcContext)) {
                            if (LOGGER.isInfoEnabled()) {
                                LOGGER.info("Removed inactive {}", channel); }}}}// The original channel was broken, try another one.
                if (resultChannel == null) {
                    for (ConcurrentMap.Entry<Integer, RpcContext> portMapOnTargetIPEntry : portMapOnTargetIP
                        .entrySet()) {
                        Channel channel = portMapOnTargetIPEntry.getValue().getChannel();

                        if (channel.isActive()) {
                            resultChannel = channel;
                            if (LOGGER.isInfoEnabled()) {
                                LOGGER.info(
                                    "Choose {} on the same IP[{}] as alternative of {}", channel, targetIP, clientId);
                            }
                            break;
                        } else {
                            if (portMapOnTargetIP.remove(portMapOnTargetIPEntry.getKey(),
                                portMapOnTargetIPEntry.getValue())) {
                                if (LOGGER.isInfoEnabled()) {
                                    LOGGER.info("Removed inactive {}", channel);
                                }
                            }
                        }
                    }
                }
            }

            // No channel on the this app node, try another one.
            if (resultChannel == null) {
                for (ConcurrentMap.Entry<String, ConcurrentMap<Integer, RpcContext>> ipMapEntry : ipMap
                    .entrySet()) {
                    if (ipMapEntry.getKey().equals(targetIP)) { continue; }

                    ConcurrentMap<Integer, RpcContext> portMapOnOtherIP = ipMapEntry.getValue();
                    if (portMapOnOtherIP == null || portMapOnOtherIP.isEmpty()) {
                        continue;
                    }

                    for (ConcurrentMap.Entry<Integer, RpcContext> portMapOnOtherIPEntry : portMapOnOtherIP.entrySet()) {
                        Channel channel = portMapOnOtherIPEntry.getValue().getChannel();

                        if (channel.isActive()) {
                            resultChannel = channel;
                            if (LOGGER.isInfoEnabled()) {
                                LOGGER.info("Choose {} on the same application[{}] as alternative of {}", channel, targetApplicationId, clientId);
                            }
                            break;
                        } else {
                            if (portMapOnOtherIP.remove(portMapOnOtherIPEntry.getKey(),
                                portMapOnOtherIPEntry.getValue())) {
                                if (LOGGER.isInfoEnabled()) {
                                    LOGGER.info("Removed inactive {}", channel); }}}}if(resultChannel ! =null) { break; }}}}if (resultChannel == null) {
            resultChannel = tryOtherApp(applicationIdMap, targetApplicationId);

            if (resultChannel == null) {
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("No channel is available for resource[{}] as alternative of {}", resourceId, clientId); }}else {
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("Choose {} on the same resource[{}] as alternative of {}", resultChannel, resourceId, clientId); }}}return resultChannel;

    }
Copy the code
  • ClientId: ApplicationId:IP:Port, clientId exists in the RpcContext. The TC’s ChannelManager maintains a global variable called Map

    . When the client is communicating with the TC, You can use a communication Channel to find the RpcContext and retrieve the clientId. RpcContext is generated when the client starts and the TC establishes netty communication to report local Resource information.
    ,>

Key components in the Seata source code

  • TCCResourceCache: As with almost all distributed frameworks, resource information in SEATA is stored locally as well as in the TC that acts as the central manager. TCCResourceCache is locally stored resource information.
  • RpcContext and RM_CHANNELS: In contrast to TCCResourceCache, RpcContex and RM_CHANNELS are resource information stored in TC. RM_CHANNELS is a multi-layer mapping table of key-value pairs. The structure is resourceId -> applicationId -> IP -> port -> RpcContext. It can be used to find out which interface method on which port on which IP should be called.
  • TransactionInfo: Wraps the attributes in the @GlobalTransactional annotation
  • GlobalSession: The session of the global transaction, which is created by the initiator of the global transaction. The global XID is generated and stored in GlobalSession. BranchSession is maintained on TCS
  • BranchSession: It is generated at runtime when a method is called on a machine. Therefore, the xID and resourceId, including which machine is called, are known. This provides a basis for the subsequent query using XID to find which COMMIT method \rollback method on which machine is called. It can be said that BranchSession is a bridge connecting XID, branchId, resourceId and clientId. BranchSession is maintained on TCS. The relationship between GlobalSession and BranchSession is one-to-many.

Look at the key notes in the Seata process

  • @localTCC: it is not necessary and does not need to be used with @TwophaseBusinessAction. The tactical status of @LocalTCC is the same as that of duBBo, SOFA, and HSF interfaces, so that local resources can be registered with TCS when services are started.
  • TwoPhaseBusinessAction: Mandatory. Without this annotation, you cannot register with the TC as a resource
  • @ GlobalTransactional: Is a must, without this, the method cannot be cut MethodInterceptor GlobalTransactionalInterceptor intercepted, the formation of subsequent xid operation is impossible, also is impossible to have what distributed transactions

The life flow that a complete call goes through

The whole process is illustrated in the following small piece of code

  1. Start global transactions, create a new GlobalSession, and generate a globally unique XID
  2. Perform real business logic
  3. When another exception occurs in a global transaction, the ROLLBACK method of all methods is called
  4. If the global transaction is all successful, call the COMMIT method for all methods

So how does the system know which machine to call the ROLLBACK or commit methods on?

As mentioned above, how are 3 and 4 implemented in detail?

Remember that seATA’s TCC implementation uses two facets:

  • One is GlobalTransactionalInterceptor, used to resolve @ GlobalTransactional annotations, and generate GlobalSession, in our SessionManager maintenance xid and GlobalSession relationship.
  • One is the TccActionInterceptor, which interprets the @twophaseBusinessAction annotation and reports the local COMMIT method, rollback method, user-defined context parameter values, and machine IP to the TC. BranchSession is generated on the TC, and the mapping between GlobalSession and BranchSession N is maintained.

To give you an overview of the process:

  1. The TM responsible for initiating GlobalTransaction issues a GlobalTransaction commit request to the TC after encountering no exceptions throughout the process
  2. After the TC receives the global transaction submission request, it finds the GlobalSession by xid, and then finds all branchsessions by GlobalSession
  3. Iterate over all branchSessions, locate the channel using resourceId and clientId, and further locate the RpcContext to find the two-phase COMMIT method to execute and establish communication with the client
  4. After the client knows which commit method it wants to execute, it executes it through reflection and reports back to the TC about the method’s execution
  5. Depending on the execution and configuration of each Branch, the TC decides whether to retry or complete. (5 retries by default)

Again, what is generated before a transaction and what is generated at transaction time

  • Resources are generated before transactions are executed. The ResourceManager of each resource reports its information to the TC when the service is started and stores the information locally and on the TC. Equivalent to the implementation of service registration and dynamic discovery in RPC framework, service providers register with ZK and consumers dynamically pull registration information from ZK, and the information is saved locally and in ZK. In fact, distributed frameworks are common in many ways.

In fact, the most important significance of RM registration is fault tolerance. When TC initiates commit or rollback requests, and a node’s channel fails, the same resource method on other nodes can be replaced.

  • GlobalSessioin, xid; BranchSession, branchId; The input value of each method; These are obtained in real time from the aspect as the transaction executes. The basic logic is that the called method will be cut by the TccActionInterceptor, and the machine executing the method will report the method information, COMMIT method, rollback method, user-defined context parameter values, and machine IP to the TC via netty communication. The mapping between XID and GlobalSession is maintained in SessionManager.

A few final questions

GlobalSession information and BranchSession information, XID and GlobalSession information are stored in local cache or database. I dare not delete it.

Source code analysis can refer to Seata combat -TCC mode distributed transaction principle, source code analysis