This article is participating in “Java Theme Month – Java Development in Action”, see the activity link for details
preface
Recently, there is a popular word “lie flat”.
Both tycoons and Internet celebrities are frantically discussing the bits and pieces behind the word.
So, let’s also say this word at the beginning of this article, in my thinking.
Let’s start with the concept, according to Wikipedia: frustrated with the oppressive work culture in China, young people are opting for a “lie down” attitude rather than struggling to conform to social expectations.
No matter what time, no matter what kind of work environment, there is this competition. So serious today, we should hold what kind of attitude is life?
I think lying flat is an option, but it’s full of depression.
I wonder, did you achieve your original dream? Perhaps it is to give happiness to family members, perhaps it is to buy what you want to buy, perhaps it is to see the scenery in life. In my opinion, as long as you do not achieve, then, we need to further efforts, nothing else, just for the original dream. People live forever, but a hundred years, if you don’t leave something, isn’t it a pity? We still need to fight for our own future.
Perhaps, a lot of people think, so inside roll, how can there be a future. Yes, everyone thinks this way, but can you change reality? Can’t! Your lying flat only makes you someone else’s tail in the inner reality. Therefore, lying flat is also impossible for you.
But, again whether does not want to kill stimulation inside roll? No, I think, you have to be content, but you have to have small goals. This is the premise of living a wonderful life.
Well, back to today’s technical article!
We still want to progress, still want to learn knowledge, knowledge no matter when, is useful.
Distributed transaction implementation is indispensable in distributed system architecture. Maybe one way or another, but today we’re going to talk about using RocketMQ, the message queue middleware, to implement distributed transactions.
Distributed transaction definition
Distributed transaction means that transaction participants, transaction supporting servers, resource servers, and transaction managers are located on different nodes of different distributed systems. To put it simply, a large operation consists of different small operations, which are distributed on different servers and belong to different applications. Distributed transactions need to ensure that all of these small operations either succeed or fail. Essentially, distributed transactions are designed to ensure data consistency across different databases.
Distributed transaction theory
CAP
CAP theorem, also known as Breuer’s theorem. For architects designing distributed systems (not just distributed transactions), CAP is your gateway theory. The following is an excerpt from Wikipedia to help you understand C A P.
Consistency: Each read operation on the client either reads the latest data or fails to read the data. In other words, consistency is a promise from a distributed system perspective to clients accessing the system: either I return you an error, or I return you absolutely consistent up-to-date data, with an emphasis on data correctness.
② Availability: any client request can get response data, there will be no response error. In other words, availability is another promise to customers who access the system from the perspective of a distributed system: I will return data to you, I will not return errors to you, but I do not guarantee that the data is up to date, emphasis is error free.
③ Partition tolerance: Because distributed systems communicate through networks, networks are unreliable. When any number of messages are lost or arrive late, the system continues to provide service and does not hang up. In other words, partition tolerance is another promise to clients accessing a distributed system that I will always run, no matter what data synchronization problems I have internally, and emphatically not hang up.
BASE
BASE is an acronym for Basically Available, Soft state, and Eventually consistent. AP is an extension to THE AP in CAP
Basic availability: When a distributed system fails, it allows the loss of some available functions to ensure the availability of core functions. Soft state: An intermediate state is allowed in the system that does not affect system availability. This refers to inconsistencies in the CAP. Final consistency: Indicates that data on all nodes will be consistent after a period of time.
BASE solves the problem of no network delay in CAP theory, and adopts soft state and final consistency in BASE to ensure the consistency after delay. BASE is the opposite of ACID in that it is completely different from ACID’s strong consistency model, but instead sacrifices strong consistency for availability and allows data to be inconsistent for a while, but eventually reach a consistent state.
Distributed transaction solutions
At present, there are many solutions for distributed transactions. Specialized open source middleware also exists, such as Seata.
Both RocketMQ transactions and Seata solve distributed transaction problems. The difference is that Seata is CAP theory, while RocketMQ scheme is BASE theory, which is final consistency.
So, let’s walk through RocketMQ’s implementation of distributed transactions.
RocketMQ transaction flow
The execution process is as follows:
Producer is the MQ sender
The Producer (the MQ sender) sends the transaction message to the MQ Server, which marks the message as Prepared. The MQ subscriber cannot consume the message.
Producer sends service-encapsulated messages to the MQ Server.
2. The MQ Server responds to the message sending successfully. If the MQ Server receives the message sent by Producer, the MQ Server responds to the message sending successfully.
The Producer end performs the business code logic and is controlled by the local database transaction.
Producer Adds users.
4. Message delivery If the Producer’s local transaction is successfully executed, the Producer will automatically send a COMMIT message to the MQServer. After receiving the COMMIT message, the MQServer marks the status of “business-encapsulated message” as consumable, and the MQ subscriber (points service) is the normal consumption message.
If the Producer fails to execute the local transaction, the Producer automatically sends a rollback message to the MQServer. After receiving the rollback message, the MQServer will delete the “service-encapsulated message”, and the downstream self-heating cannot consume the message.
The MQ subscriber (downstream service) consumes the message and responds to MQ with an ACK on success, otherwise the message will be received repeatedly. In this case, the ACK automatically responds by default, that is, it automatically responds to the ACK if the program is running properly.
If the Producer fails or times out while executing a local transaction, MQ Server will continuously ask other producers in the same Producer group to obtain the execution status of the transaction. This process is called transaction backcheck. The MQ Server decides whether to deliver the message based on the result of the transaction callback.
RocketMQ is installed and deployed
To use it, you need to install the corresponding message queue service first.
Download and install
- Download and install RocketMQ 4.8.0
The deployment of
Upload the installation to the /usr/local/src directory
Decompress the package and specify the installation directory
CD /usr/local/src unzip rocketmq-all-4.8.0-bin-release.zip mv rocketmq-all-4.8.0-bin-release.. / rocketmq - 4.8.0Copy the code
Start the NameServer
cd ../rocketmq-4.8.0
nohup sh bin/mqnamesrv &
Copy the code
Check the startup
tail -f ~/logs/rocketmqlogs/namesrv.log
Copy the code
Modify the Broker run configuration
Sh #JAVA_OPT="${JAVA_OPT} -server -xMS8g -xmx8g-xmn4g "#JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"Copy the code
Start the Broker
nohup sh bin/mqbroker -n localhost:9876 &
Copy the code
Check the startup
tail -f ~/logs/rocketmqlogs/broker.log
Copy the code
Firewall port
firewall-cmd --zone=public --add-port=9876/tcp --permanent
firewall-cmd --reload
Copy the code
If any error occurs, manually create the mapping file directory
cd /root/store
mkdir commitlog consumequeue
Copy the code
Test message
Message is sent
export NAMESRV_ADDR=localhost:9876
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
Copy the code
The message received
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
Copy the code
Quit running
Close the NameServer
sh bin/mqshutdown namesrv
Copy the code
Close the Broker
sh bin/mqshutdown broker
Copy the code
RocketMQ console installation
Download address
Download the source code and complete the packaging
mvn clean package -Dmaven.test.skip=true
Copy the code
Upload the file to the /usr/local/src directory
The script content
nohup java -jar -Dspring.config.location=/app/home/rocketmq-console/application.properties / app/service/rocketmq - the console/rocketmq - the console - ng - 2.0.0. Jar > / app/home/rocketmq - the console/logs/mq_console log > & 1 & 2Copy the code
Opening firewall Ports
firewall-cmd --zone=public --add-port=8080/tcp --permanent
firewall-cmd --reload
Copy the code
Test access
Example Service Scenario
Two service applications are provided as service providers and service consumers of message queues
Bank1 bank deduction service
Bank2 bank addition service
Scenario: Transfer The money from user A to user B from a-100 B+100
Bank1:
1. Provide external apis
2. Initiate a deduction request
3. Send the message to MQ
4. After receiving the message, MQ returns an acknowledgement
5. Bank1 performs the local debit transaction and submits it
MQ:
Mq unlocks the message to allow consumption upon receipt of the bank1 commit confirmation
Bank2:
1. To monitor the MQ
2. Consumer news
3. Perform the local add service
Microservice applications integrate MQ
- Introduction of depend on
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>The 2.0.3</version>
</dependency>
Copy the code
- Configuration file Configuration
rocketmq:
name-server: xxxx:9876
producer:
group: base_group_syncMsg
send-message-timeout: 5000
retry-times-when-send-failed: 2
max-message-size: 4194304
Copy the code
Bank1 application implementation
Provide request API
@GetMapping(value = "/rocketmq")
public String transfer(@RequestParam("accountNo")String accountNo, @RequestParam("amount") Double amount){
// Create the transaction ID as the message content to MQ
String tx_no = UUID.randomUUID().toString();
// Encapsulate event entities
AccountChangeEvent accountChangeEvent = new AccountChangeEvent(accountNo,amount,tx_no);
// Send a message
accountInfoService.sendUpdateAccountBalance(accountChangeEvent);
return "Processing successful - Account: {"+accountNo+"} deduction: {"+amount+"}";
}
Copy the code
Deductions request
Send a message to MQ
/** * Send transfer message to MQ *@paramAccountChangeEvent Event entity */
@Override
public void sendUpdateAccountBalance(AccountChangeEvent accountChangeEvent) {
// Convert accountChangeEvent to JSON
JSONObject jsonObject =new JSONObject();
jsonObject.put("accountChange",accountChangeEvent);
String jsonString = jsonObject.toJSONString();
// Generate the message type
Message<String> message = MessageBuilder.withPayload(jsonString).build();
// Send a transaction message
/** * String txProducerGroup Production group * String destination topic, * Message
message, message content * Object ARG parameter */
rocketMQTemplate.sendMessageInTransaction("producer_group_bank1"."bank",message,null);
}
Copy the code
Listen for MQ returns
/ * * *@authorLittle hidden Lele *@date 2021/06/3
* @descriptionConsumer monitoring */
@Slf4j
@Component
@RocketMQMessageListener(topic = "${rocketmq.producer.topic}", consumerGroup = "${rocketmq.producer.group}")
public class ConsumerListener implements RocketMQListener<String> {
/** * Inject business implementation */
@Autowired
AccountInfoService accountInfoService;
/** * Receive message */
@Override
public void onMessage(String message) {
log.info("Received consumption message :{}",message);
/ / parsing
JSONObject jsonObject = JSONObject.parseObject(message);
String accountChangeString = jsonObject.getString("accountChange");
// Convert to an AccountChangeEvent object
AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
// Set the account
accountChangeEvent.setAccountNo("2");
// Perform a business operation -- increase the amountaccountInfoService.addAccountInfoBalance(accountChangeEvent); }}Copy the code
Implement local business logic
/ * * *@authorLittle hidden Lele *@date 2021/06/3
* @descriptionAccount service realization */
@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {
@Autowired
AccountInfoDao accountInfoDao;
// Update account -- increase amount
@Override
@Transactional
public void addAccountInfoBalance(AccountChangeEvent accountChangeEvent) {
log.info("Bank2 update local account, account: {}, amount: {}",accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount());
// Local read transactions prevent repeated consumption
if(accountInfoDao.isExistTx(accountChangeEvent.getTxNo())>0) {return ;
}
// Insert data -- increases the amount
accountInfoDao.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount());
// Add transaction records for idempotent
accountInfoDao.addTx(accountChangeEvent.getTxNo());
// Reserve the error demo
if(accountChangeEvent.getAmount() == 250) {throw new RuntimeException("Message processing exception"); }}}Copy the code
Bank1 transaction callback listener
/ * * *@authorLittle hidden Lele *@date 2021/06/3
* @descriptionProducer transaction callback listener */
@Component
@Slf4j
@RocketMQTransactionListener(txProducerGroup = "producer_group_bank1")
public class ProducerCallbackListener implements RocketMQLocalTransactionListener {
@Autowired
AccountInfoService accountInfoService;
@Autowired
AccountInfoDao accountInfoDao;
/** * The callback method after the transaction message is sent, which is called back when the message is successfully sent to MQ@paramMessage message *@return* /
@Override
@Transactional
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
try {
// Parse the message
String messageString = new String((byte[]) message.getPayload());
JSONObject jsonObject = JSONObject.parseObject(messageString);
// Convert to the AccountChangeEvent entity
String accountChangeString = jsonObject.getString("accountChange");
// Convert accountChange (json) to AccountChangeEvent
AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
// Perform a local transaction and deduct the amount
accountInfoService.doUpdateAccountBalance(accountChangeEvent);
/ / when to return to RocketMQLocalTransactionState.COM MIT, automatically send a commit to mq message, mq message to consumption
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
e.printStackTrace();
// Send ROLLBACK to MQ, which keeps the state of the message unconsumable
returnRocketMQLocalTransactionState.ROLLBACK; }}/** * check whether the transaction status is deducted *@paramMessage message *@return* /
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
// Parse message to AccountChangeEvent
String messageString = new String((byte[]) message.getPayload());
JSONObject jsonObject = JSONObject.parseObject(messageString);
String accountChangeString = jsonObject.getString("accountChange");
// Convert accountChange (json) to AccountChangeEvent
AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
/ / transaction id
String txNo = accountChangeEvent.getTxNo();
int existTx = accountInfoDao.isExistTx(txNo);
if(existTx>0) {return RocketMQLocalTransactionState.COMMIT;
}else{
returnRocketMQLocalTransactionState.UNKNOWN; }}}Copy the code
Bank2 application implementation
Bank2 listening MQ
/ * * *@authorLittle hidden Lele *@date 2021/06/3
* @descriptionConsumer monitoring */
@Slf4j
@Component
@RocketMQMessageListener(topic = "bank", consumerGroup = "rocketmq.consumer.group")
public class ConsumerListener implements RocketMQListener<String> {
/** * Inject business implementation */
@Autowired
AccountInfoService accountInfoService;
/** * Receive message */
@Override
public void onMessage(String message) {
log.info("Received consumption message :{}",message);
/ / parsing
JSONObject jsonObject = JSONObject.parseObject(message);
String accountChangeString = jsonObject.getString("accountChange");
// Convert to an AccountChangeEvent object
AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
// Set the account
accountChangeEvent.setAccountNo("2");
// Perform a business operation -- increase the amountaccountInfoService.addAccountInfoBalance(accountChangeEvent); }}Copy the code
News consumption
log.info("Received consumption message :{}",message);
/ / parsing
JSONObject jsonObject = JSONObject.parseObject(message);
String accountChangeString = jsonObject.getString("accountChange");
// Convert to an AccountChangeEvent object
AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
Copy the code
Perform local debit transactions
accountInfoService.addAccountInfoBalance(accountChangeEvent);
Copy the code
conclusion
I’m finally done. It was a lot of work to write the demo.
There are many distributed transaction solutions, and it is also for us technical personnel to consider whether we need distributed transaction. If so, I’m sure you can’t miss this article as a quick start on implementing message queue distributed transactions for RocketMQ. I’m ready to write a column if I think it’s good, hahaha.
Lying flat, is not a good choice in the pursuit of dreams, the pace of technology, is always forward, work hard, boys!!