11 Distributed transaction DTM
11.1 DTM
introduce
DTM is a distributed transaction manager developed by Golang, which solves the consistency problem of updating data across databases, services and language stacks.
The vast majority of ordering systems have transactions that cross services, so there is a need to update data consistency, and DTM can simplify the architecture dramatically to form an elegant solution.
And DTM has deep cooperation, native support for distributed transactions in GO-Zero, the following details to explain how to use DTM to help our order system to solve the consistency problem
11.2 go-zero
useDTM
Distributed transaction
First, let’s review the Create interface processing logic for the ORDER RPC service in Chapter 6. Finally, a new order is created through OrderModel and the product RPC service Update interface is called to Update the product inventory.
func (l *CreateLogic) Create(in *order.CreateRequest) (*order.CreateResponse, error) {
// Check whether the user exists
_, err := l.svcCtx.UserRpc.UserInfo(l.ctx, &user.UserInfoRequest{
Id: in.Uid,
})
iferr ! =nil {
return nil, err
}
// Check whether the product exists
productRes, err := l.svcCtx.ProductRpc.Detail(l.ctx, &product.DetailRequest{
Id: in.Pid,
})
iferr ! =nil {
return nil, err
}
// Determine whether the product inventory is sufficient
if productRes.Stock <= 0 {
return nil, status.Error(500."Insufficient product inventory")
}
newOrder := model.Order{
Uid: in.Uid,
Pid: in.Pid,
Amount: in.Amount,
Status: 0,
}
res, err := l.svcCtx.OrderModel.Insert(&newOrder)
iferr ! =nil {
return nil, status.Error(500, err.Error())
}
newOrder.Id, err = res.LastInsertId()
iferr ! =nil {
return nil, status.Error(500, err.Error())
}
_, err = l.svcCtx.ProductRpc.Update(l.ctx, &product.UpdateRequest{
Id: productRes.Id,
Name: productRes.Name,
Desc: productRes.Desc,
Stock: productRes.Stock - 1,
Amount: productRes.Amount,
Status: productRes.Status,
})
iferr ! =nil {
return nil, err
}
return &order.CreateResponse{
Id: newOrder.Id,
}, nil
}
Copy the code
As we said before, there is a data consistency problem in the processing logic here. It is possible that the order is created successfully, but there may be a failure when updating the product inventory. At this time, there will be a situation that the order is created successfully and the product inventory is not reduced.
Because the product inventory update here operates across services and there is no way to handle it using a local transaction, we need to use a distributed transaction to handle it. Here we need to use DTM SAGA protocol to realize the cross-service distributed transaction operation of order creation and product inventory update.
You can go to the DTM documentation and start with the SAGA transaction mode.
11.2.1 addDTM
Service configuration
Modify the DTM ->config.yml configuration file by referring to Chapter 2 Environment Construction. We only need to modify the Target and EndPoint configuration in MicroService to register DTM in ETCD.
#...
# micro service
MicroService:
Driver: 'dtm-driver-gozero' The name of the driver to handle registration/discovery
Target: 'etcd://etcd:2379/dtmservice' Register the ETCD address of the DTM service
EndPoint: 'dtm:36790'
#...
Copy the code
11.2.2 adddtm_barrier
The data table
Microservices is a distributed system, so various exceptions may occur, such as network jitter resulting in repeated requests, which can make service processing extremely complicated. DTM pioneered the sub-transaction barrier technology, which can solve the exception problem very conveniently and greatly reduce the threshold of the use of distributed transactions.
Using the subtransaction barrier technology provided by DTM, you need to create subtransaction barrier related tables in the business database with the following table construction sentences:
create database if not exists dtm_barrier
/ *! 40100 DEFAULT CHARACTER SET utf8mb4 */
;
drop table if exists dtm_barrier.barrier;
create table if not exists dtm_barrier.barrier(
id bigint(22) PRIMARY KEY AUTO_INCREMENT,
trans_type varchar(45) default ' ',
gid varchar(128) default ' ',
branch_id varchar(128) default ' ',
op varchar(45) default ' ',
barrier_id varchar(45) default ' ',
reason varchar(45) default ' ' comment 'the branch type who insert this record',
create_time datetime DEFAULT now(),
update_time datetime DEFAULT now(),
key(create_time),
key(update_time),
UNIQUE key(gid, branch_id, op, barrier_id)
);
Copy the code
!!!!! ! Note: the library name and the name of the table do not change, if you customize the table name, before use, please call dtmcli. SetBarrierTableName.
11.2.3 modifyOrderModel
和 ProductModel
In each subtransaction, a lot of operation logic is required to use local transactions, so we add some Model methods that are DTM-compatible subtransaction barriers
$ vim mall/service/order/model/ordermodel.go
Copy the code
package model
......
type (
OrderModel interface {
TxInsert(tx *sql.Tx, data *Order) (sql.Result, error)
TxUpdate(tx *sql.Tx, data *Order) error
}
)
......
func (m *defaultOrderModel) TxInsert(tx *sql.Tx, data *Order) (sql.Result, error) {
query := fmt.Sprintf("insert into %s (%s) values (? ,? ,? ,?) ", m.table, orderRowsExpectAutoSet)
ret, err := tx.Exec(query, data.Uid, data.Pid, data.Amount, data.Status)
return ret, err
}
func (m *defaultOrderModel) TxUpdate(tx *sql.Tx, data *Order) error {
productIdKey := fmt.Sprintf("%s%v", cacheOrderIdPrefix, data.Id)
_, err := m.Exec(func(conn sqlx.SqlConn) (result sql.Result, err error) {
query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, orderRowsWithPlaceHolder)
return tx.Exec(query, data.Uid, data.Pid, data.Amount, data.Status, data.Id)
}, productIdKey)
return err
}
func (m *defaultOrderModel) FindOneByUid(uid int64) (*Order, error) {
var resp Order
query := fmt.Sprintf("select %s from %s where `uid` = ? order by create_time desc limit 1", orderRows, m.table)
err := m.QueryRowNoCache(&resp, query, uid)
switch err {
case nil:
return &resp, nil
case sqlc.ErrNotFound:
return nil, ErrNotFound
default:
return nil, err
}
}
Copy the code
$ vim mall/service/product/model/productmodel.go
Copy the code
package model
......
type (
ProductModel interface {
TxAdjustStock(tx *sql.Tx, id int64, delta int) (sql.Result, error)
}
)
......
func (m *defaultProductModel) TxAdjustStock(tx *sql.Tx, id int64, delta int) (sql.Result, error) {
productIdKey := fmt.Sprintf("%s%v", cacheProductIdPrefix, id)
return m.Exec(func(conn sqlx.SqlConn) (result sql.Result, err error) {
query := fmt.Sprintf("update %s set stock=stock+? where stock >= -? and id=?", m.table)
return tx.Exec(query, delta, delta, id)
}, productIdKey)
}
Copy the code
11.2.4 to modifyproduct rpc
service
-
Add DecrStock, DecrStockRevert interface methods
We need to add DecrStock and DecrStockRevert interface methods to the Product RPC service for product inventory updates and compensation for product inventory updates, respectively.
$ vim mall/service/product/rpc/product.proto
Copy the code
syntax = "proto3";
package productclient;
option go_package = "product"; .// Reduce product inventory
message DecrStockRequest {
int64 id = 1;
int64 num = 2;
}
message DecrStockResponse {}// Reduce product inventory
service Product {...rpc DecrStock(DecrStockRequest) returns(DecrStockResponse);
rpc DecrStockRevert(DecrStockRequest) returns(DecrStockResponse);
}
Copy the code
! Tip: Use the goctl tool to generate the following code after modification.
-
Implement DecrStock interface methods
In this case, only when the inventory is low, we don’t need to retry, just roll back.
$ vim mall/service/product/rpc/internal/logic/decrstocklogic.go
Copy the code
package logic
import (
"context"
"database/sql"
"mall/service/product/rpc/internal/svc"
"mall/service/product/rpc/product"
"github.com/dtm-labs/dtmcli"
"github.com/dtm-labs/dtmgrpc"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/stores/sqlx"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type DecrStockLogic struct {
ctx context.Context
svcCtx *svc.ServiceContext
logx.Logger
}
func NewDecrStockLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DecrStockLogic {
return &DecrStockLogic{
ctx: ctx,
svcCtx: svcCtx,
Logger: logx.WithContext(ctx),
}
}
func (l *DecrStockLogic) DecrStock(in *product.DecrStockRequest) (*product.DecrStockResponse, error) {
/ / get RawDB
db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB()
iferr ! =nil {
return nil, status.Error(500, err.Error())
}
// Get the child transaction barrier object
barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)
iferr ! =nil {
return nil, status.Error(500, err.Error())
}
// Enable the subtransaction barrier
err = barrier.CallWithDB(db, func(tx *sql.Tx) error {
// Update product inventory
result, err := l.svcCtx.ProductModel.TxAdjustStock(tx, in.Id, - 1)
iferr ! =nil {
return err
}
affected, err := result.RowsAffected()
// Lack of inventory, return subtransaction failure
if err == nil && affected == 0 {
return dtmcli.ErrFailure
}
return err
})
// The inventory is insufficient, no longer retry, rollback
if err == dtmcli.ErrFailure {
return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)
}
iferr ! =nil {
return nil, err
}
return &product.DecrStockResponse{}, nil
}
Copy the code
-
Implement the DecrStockRevert interface method
In the DecrStock interface method, the product inventory is subtracted from the specified quantity, and here we add it back. The product inventory is then returned to the quantity before subtracting it from the DecrStock interface method.
$ vim mall/service/product/rpc/internal/logic/decrstockrevertlogic.go
Copy the code
package logic
import (
"context"
"database/sql"
"mall/service/product/rpc/internal/svc"
"mall/service/product/rpc/product"
"github.com/dtm-labs/dtmcli"
"github.com/dtm-labs/dtmgrpc"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/stores/sqlx"
"google.golang.org/grpc/status"
)
type DecrStockRevertLogic struct {
ctx context.Context
svcCtx *svc.ServiceContext
logx.Logger
}
func NewDecrStockRevertLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DecrStockRevertLogic {
return &DecrStockRevertLogic{
ctx: ctx,
svcCtx: svcCtx,
Logger: logx.WithContext(ctx),
}
}
func (l *DecrStockRevertLogic) DecrStockRevert(in *product.DecrStockRequest) (*product.DecrStockResponse, error) {
/ / get RawDB
db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB()
iferr ! =nil {
return nil, status.Error(500, err.Error())
}
// Get the child transaction barrier object
barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)
iferr ! =nil {
return nil, status.Error(500, err.Error())
}
// Enable the subtransaction barrier
err = barrier.CallWithDB(db, func(tx *sql.Tx) error {
// Update product inventory
_, err := l.svcCtx.ProductModel.TxAdjustStock(tx, in.Id, 1)
return err
})
iferr ! =nil {
return nil, err
}
return &product.DecrStockResponse{}, nil
}
Copy the code
11.2.5 modifyorder rpc
service
-
Add the CreateRevert interface method
The Order RPC service already has the Create interface method, and we need to Create its compensation interface method, CreateRevert.
$ vim mall/service/order/rpc/order.proto
Copy the code
syntax = "proto3";
package orderclient;
option go_package = "order"; .service Order {
rpc Create(CreateRequest) returns(CreateResponse);
rpc CreateRevert(CreateRequest) returns(CreateResponse); . }Copy the code
! Tip: Use the goctl tool to generate the following code after modification.
-
Modify the Create interface method
The product inventory judgment and update operations in the original Create interface method have been implemented in the Product RPC DecrStock interface method, so we only need to Create an order operation here.
$ vim mall/service/order/rpc/internal/logic/createlogic.go
Copy the code
package logic
import (
"context"
"database/sql"
"fmt"
"mall/service/order/model"
"mall/service/order/rpc/internal/svc"
"mall/service/order/rpc/order"
"mall/service/user/rpc/user"
"github.com/dtm-labs/dtmgrpc"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/stores/sqlx"
"google.golang.org/grpc/status"
)
type CreateLogic struct {
ctx context.Context
svcCtx *svc.ServiceContext
logx.Logger
}
func NewCreateLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CreateLogic {
return &CreateLogic{
ctx: ctx,
svcCtx: svcCtx,
Logger: logx.WithContext(ctx),
}
}
func (l *CreateLogic) Create(in *order.CreateRequest) (*order.CreateResponse, error) {
/ / get RawDB
db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB()
iferr ! =nil {
return nil, status.Error(500, err.Error())
}
// Get the child transaction barrier object
barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)
iferr ! =nil {
return nil, status.Error(500, err.Error())
}
// Enable the subtransaction barrier
if err := barrier.CallWithDB(db, func(tx *sql.Tx) error {
// Check whether the user exists
_, err := l.svcCtx.UserRpc.UserInfo(l.ctx, &user.UserInfoRequest{
Id: in.Uid,
})
iferr ! =nil {
return fmt.Errorf("User does not exist")
}
newOrder := model.Order{
Uid: in.Uid,
Pid: in.Pid,
Amount: in.Amount,
Status: 0,}// Create an order
_, err = l.svcCtx.OrderModel.TxInsert(tx, &newOrder)
iferr ! =nil {
return fmt.Errorf("Order creation failed")}return nil}); err ! =nil {
return nil, status.Error(500, err.Error())
}
return &order.CreateResponse{}, nil
}
Copy the code
-
Implement the CreateRevert interface method
In this interface we query the order that the user just created and change the status of the order to 9 (invalid state).
$ vim mall/service/order/rpc/internal/logic/createrevertlogic.go
Copy the code
package logic
import (
"context"
"database/sql"
"fmt"
"mall/service/order/rpc/internal/svc"
"mall/service/order/rpc/order"
"mall/service/user/rpc/user"
"github.com/dtm-labs/dtmgrpc"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/stores/sqlx"
"google.golang.org/grpc/status"
)
type CreateRevertLogic struct {
ctx context.Context
svcCtx *svc.ServiceContext
logx.Logger
}
func NewCreateRevertLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CreateRevertLogic {
return &CreateRevertLogic{
ctx: ctx,
svcCtx: svcCtx,
Logger: logx.WithContext(ctx),
}
}
func (l *CreateRevertLogic) CreateRevert(in *order.CreateRequest) (*order.CreateResponse, error) {
/ / get RawDB
db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB()
iferr ! =nil {
return nil, status.Error(500, err.Error())
}
// Get the child transaction barrier object
barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)
iferr ! =nil {
return nil, status.Error(500, err.Error())
}
// Enable the subtransaction barrier
if err := barrier.CallWithDB(db, func(tx *sql.Tx) error {
// Check whether the user exists
_, err := l.svcCtx.UserRpc.UserInfo(l.ctx, &user.UserInfoRequest{
Id: in.Uid,
})
iferr ! =nil {
return fmt.Errorf("User does not exist")}// Query the latest order created by the user
resOrder, err := l.svcCtx.OrderModel.FindOneByUid(in.Uid)
iferr ! =nil {
return fmt.Errorf("Order does not exist")}// Change the order status 9 to indicate that the order is invalid and update the order
resOrder.Status = 9
err = l.svcCtx.OrderModel.TxUpdate(tx, resOrder)
iferr ! =nil {
return fmt.Errorf("Order update failed")}return nil}); err ! =nil {
return nil, status.Error(500, err.Error())
}
return &order.CreateResponse{}, nil
}
Copy the code
11.2.6 modifyorder api
service
We combine the order RPC service Create and CreateRevert interface methods, the Product RPC service DecrStock and DecrStockRevert interface methods, A distributed transaction operation in SAGA transaction mode is mentioned in the ORDER API service.
- add
pproduct rpc
Depend on the configuration
$ vim mall/service/order/api/etc/order.yaml
Copy the code
Name: Order
Host: 0.0. 0. 0
Port: 8002
.
OrderRpc:
Etcd:
Hosts:
- etcd:2379
Key: order.rpc
ProductRpc:
Etcd:
Hosts:
- etcd:2379
Key: product.rpc
Copy the code
- add
pproduct rpc
Instantiation of the service configuration
$ vim mall/service/order/api/internal/config/config.go
Copy the code
package config
import (
"github.com/tal-tech/go-zero/rest"
"github.com/tal-tech/go-zero/zrpc"
)
type Config struct {
rest.RestConf
Auth struct {
AccessSecret string
AccessExpire int64
}
OrderRpc zrpc.RpcClientConf
ProductRpc zrpc.RpcClientConf
}
Copy the code
- Register the service Context
pproduct rpc
The dependence of
$ vim mall/service/order/api/internal/svc/servicecontext.go
Copy the code
package svc
import (
"mall/service/order/api/internal/config"
"mall/service/order/rpc/orderclient"
"mall/service/product/rpc/productclient"
"github.com/tal-tech/go-zero/zrpc"
)
type ServiceContext struct {
Config config.Config
OrderRpc orderclient.Order
ProductRpc productclient.Product
}
func NewServiceContext(c config.Config) *ServiceContext {
return &ServiceContext{
Config: c,
OrderRpc: orderclient.NewOrder(zrpc.MustNewClient(c.OrderRpc)),
ProductRpc: productclient.NewProduct(zrpc.MustNewClient(c.ProductRpc)),
}
}
Copy the code
- Add the import
gozero
的dtm
drive
$ vim mall/service/order/api/order.go
Copy the code
package main
import(...... _"github.com/dtm-labs/driver-gozero" // Add the 'DTM' driver that imports' goZero '
)
var configFile = flag.String("f"."etc/order.yaml"."the config file")
func main(a){... }Copy the code
- Modify the
order api
Create
Interface methods
$ vim mall/service/order/api/internal/logic/createlogic.go
Copy the code
package logic
import (
"context"
"mall/service/order/api/internal/svc"
"mall/service/order/api/internal/types"
"mall/service/order/rpc/order"
"mall/service/product/rpc/product"
"github.com/dtm-labs/dtmgrpc"
"github.com/tal-tech/go-zero/core/logx"
"google.golang.org/grpc/status"
)
type CreateLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewCreateLogic(ctx context.Context, svcCtx *svc.ServiceContext) CreateLogic {
return CreateLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *CreateLogic) Create(req types.CreateRequest) (resp *types.CreateResponse, err error) {
// Get OrderRpc BuildTarget
orderRpcBusiServer, err := l.svcCtx.Config.OrderRpc.BuildTarget()
iferr ! =nil {
return nil, status.Error(100."Order creation exception")}// Get ProductRpc BuildTarget
productRpcBusiServer, err := l.svcCtx.Config.ProductRpc.BuildTarget()
iferr ! =nil {
return nil, status.Error(100."Order creation exception")}// EtCD registered address of DTM service
var dtmServer = "etcd://etcd:2379/dtmservice"
// Create a GID
gid := dtmgrpc.MustGenGid(dtmServer)
// Create a saga protocol transaction
saga := dtmgrpc.NewSagaGrpc(dtmServer, gid).
Add(orderRpcBusiServer+"/orderclient.Order/Create", orderRpcBusiServer+"/orderclient.Order/CreateRevert", &order.CreateRequest{
Uid: req.Uid,
Pid: req.Pid,
Amount: req.Amount,
Status: 0,
}).
Add(productRpcBusiServer+"/productclient.Product/DecrStock", productRpcBusiServer+"/productclient.Product/DecrStockRevert", &product.DecrStockRequest{
Id: req.Pid,
Num: 1,})// Transaction commit
err = saga.Submit()
iferr ! =nil {
return nil, status.Error(500, err.Error())
}
return &types.CreateResponse{}, nil
}
Copy the code
! Tip: The first parameter to the sagagrpc. Add method, action, is the method path to access the microservice GRPC. This method path needs to be found in the following files. Mall/service/order/RPC/order/order pb. Go mall/service/product/RPC/product/product. The pb. Go to Invoke search keywords can be found.
11.3 testgo-zero
DTM
Distributed transaction
11.3.1 Testing the normal flow of distributed transactions
- use
postman
call/api/product/create
Interface to create a product inventorystock
为1
.
- use
postman
call/api/order/create
Interface, create an order, product IDpid
为1
.
- We can see that the product inventory from the original
1
Has become0
.
- Let’s look at the transaction barrier table
barrier
We can see that the operations on both services have completed.
11.3.2 Testing distributed transaction Failure Flow 1
- Following the above test results, the product ID at this time is
1
The inventory is already0
The use,postman
call/api/order/create
Interface to create an order.
- Let’s take a look at the order data table and there’s an entry with an ID
2
Product ID for1
And its order data status is9
.
- Let’s look at the transaction barrier table
barrier
And we can see that(gid = fqYS8CbYbK8GkL8SCuTRUF)
First service(branch_id = 01)
The sub-transaction barrier operation is normal for the second service(branch_id = 02)
The subtransaction barrier operation failed. Compensation is required. Both services then record compensating operations.
-
The operational flow of this distributed transaction
- First of all,
DTM
Service will be adjustableorder rpc
Create
Interface for creating order processing. - After the order creation is complete
DTM
Service to theproduct rpc
DecrStock
Interface, the inside of this interface throughpid
Update product inventory, throw transaction failed due to insufficient product inventory. DTM
Service initiated compensation mechanism, callorder rpc
CreateRevert
Interface for order compensation processing.DTM
Service initiated compensation mechanism, callproduct rpc
DecrStockRevert
Interface for product inventory update compensation processing. But because of theproduct rpc
DecrStock
The business process was not successful within the sub-transaction barrier of the interface. So in theDecrStockRevert
The business logic within the sub-transaction barrier is not executed in the interface.
- First of all,
11.3.3 Testing distributed Transaction Failure Flow 2
- We manually set the product ID in the database to
1
Inventory is changed to 100, and then inproduct rpc
DecrStock
Interface method failed to create an artificial exception outside the neutron transaction barrier.
- use
postman
call/api/order/create
Interface, and create an order, product IDpid
为1
.
- Let’s look at the order data table and product data table respectively. The order data table ID is
3
, whose order data status is9
. The product data table ID is1
Its inventory is still100
And the data update time has also changed.
- Let’s look at the transaction barrier table
barrier
And we can see that(GID = ZbjYHv2jNra7RMwyWjB5Lc)
First service(branch_id = 01)
The sub-transaction barrier operation is normal for the second service(branch_id = 02)
The sub-transaction barrier operation is also normal. Because in theproduct rpc
DecrStock
Outside the neutron transaction barrier of the interface method, we artificially failed to create an exception, so the two services occurred to compensate for the operation records.
You can compare the difference between testing distributed transaction failure flow 1 and testing distributed transaction failure flow 2, and see if you can appreciate the power of DTM’s sub-transaction barrier technology.
The subtransaction barrier automatically identifies whether the forward operation has been performed. The failed process 1 does not perform any service operation, so compensating service operation will not be performed when compensating. Failed process 2 performed a business operation, so compensation also performed the compensated business operation.
Project address: Github
Previous post go-Zero: Making Microservices Go — 10 Link Tracking Jaeger