Consider A common scenario in A distributed scenario: Service A sends A message to A message queue after A successful database operation, and now expects to send the message only if the database operation succeeds. Here are some common practices:

1. Perform database operations before sending messages

 

public void purchaseOrder() {
    orderDao.save(order);
    messageQueue.send(message);
}
Copy the code

It is possible that adding order succeeds, but sending messages fails. Finally, an inconsistent state is formed.

2. Send the message first and then perform database operations

 

public void purchaseOrder() {
    messageQueue.send(message);
    orderDao.save(order);
}
Copy the code

It is possible that the message was sent successfully but the order addition failed, resulting in an inconsistent state.

3. In database transactions, send messages before performing database operations

 

@Transactional public void purchaseOrder() {
    messageQueue.send(message);
    orderDao.save(order);
}
Copy the code

Again, there is no guarantee of consistency. If the database operation succeeds, however, the message has already been sent and cannot be rolled back.

4. In a database transaction, perform database operations before sending messages

 

@Transactional public void purchaseOrder() {
    orderDao.save(order);
    messageQueue.send(message);
}
Copy the code

The success of this scheme depends on whether the message queue has a reply mechanism and a transaction mechanism.

The reply mechanism means that after producer sends a message, the message queue can return a response to verify whether the message was inserted successfully.

If the message queue has a reply mechanism, rewrite the above code as:

 

@Transactional public void purchaseOrder() {
    orderDao.save(order); try{
        kafkaProducer.send(message).get();
    } catch(Exception e) throw new RuntimeException("Fail to send message");
    }
Copy the code

This code means that if you send a response that received a message queue error, a RuntimeException will be thrown. Failure to send a message can cause a rollback of the database operation. The scheme seems feasible. However, there is a case, if the message is sent successfully, and no immediate returns the response message queue due to network reasons, the message sender because there is no timely receipt of reply to that message sending failed, so the message sender database transaction rollback, the message is inserted into the success, however, This leads to the final inconsistency.

The above inconsistencies can be resolved through the transaction mechanism of the message.

The transaction mechanism indicates whether a message in a message queue has state, which determines whether a consumer consumes the message.

RocketMQ, an open source message queue owned by Alibaba known for its high availability, was one of the first to support transactional messages. Kafka has also supported transactions since version 0.11.

RoketMQ’s transaction mechanism marks messages as Prepared or Confirmed. Prepared messages are not visible to consumers.

Kafka marks messages as Uncommited or Commited through Transaction markers. Consumer determines which type of message is visible by setting isolation-level to READ_COMMITTED or READ_uncommitted.

5. Message queues do not support transaction messages

If the message queue does not support transaction messages, the solution is to create a new message table, start a scheduled task to scan the message table, send all prepared messages to the message queue, and set the message status to Confirmed.

The code is as follows:

 

@Transactional public void purchaseOrder() {
    orderDao.save(order);
    messageService.save(message);
}
Copy the code

At this point, the logic that inserts order and message is in the same database transaction, and the message table is constantly scanned by the background timer, so the message must be delivered successfully to the message consumer.

One problem with this scenario is that it is possible for a background task to fail after successfully sending a message and not have time to set the status of the sent message to Confirmed. So the next time the message table is scanned, the message is repeated. This is at least once delivery.

Due to the feature of at least once delivery, the consumer may receive duplicate data. At this point, a Message_consume table can be established on the consumer side to determine whether the message has been consumed, and if so, the message is discarded.