Problem description

Message idempotency of message queues is primarily caused by the MQ retry mechanism.

We often think of message-oriented middleware as a reliable component — by reliable I mean that as long as I successfully deliver a message to message-oriented middleware, the message will not be lost, that is, the message will be guaranteed to be successfully consumed by consumers at least once. This is one of the most basic features of message-oriented middleware. A message will be successfully consumed AT LEAST ONCE.

, for example, A message M sends the message middleware and message delivery to the consumer process A, A have received A message, and then to spend, but at the time of consumption to A half application restart, this time the news was not marked as consumption is successful, the news will continue to be delivered to the customer, until its consumption is successful, Message-oriented middleware stops delivery. However, because of this reliability, messages can be delivered more than once.

For another example, after receiving the message M and completing the consumption logic, program A is about to notify the messaging middleware that “I have consumed successfully” and the program restarts. For the messaging middleware, the message has not been consumed successfully, so it will continue to deliver. For application A, it looks like the message was consumed successfully, but the message middleware is still delivering it again.

The solution

Deduplication based on the business table

Suppose our logic goes like this:

select * from t_order where order_no = 'order123' if(order ! = null) { return ; // Message is repeated, return directly}Copy the code

This can be problematic in parallel cases (when a concurrent request comes in while the business is not finished, it penetrates the check barrier), so atomic issues need to be considered.

The solution was also introduced in our last article, that is to use:

  • Pessimistic lock (SELECT… FOR UPDATE)

  • Optimistic locking

However, both the select for Update and optimistic locking solutions are actually de-duplicated based on the business table itself, which undoubtedly increases the complexity of business development. If every consumption logic itself needs to do de-duplicated/idempotency development based on the business itself, it is a tedious workload.

Based on message table + local transaction

Taking RocketMQ as a messaging middleware example, the RocketMQ documentation describes:

RocketMQ cannot avoid message duplication (Exactly-Once), so it is important to de-process at the business level if the business is sensitive to consumption duplication. You can use a relational database for de-weighting. You first need to determine the unique key of the message, either msgId or a unique identifying field in the message content, such as an order Id. Determine if the unique key exists in the relational database before consuming. If not, insert and consume, otherwise skip. (The actual process should consider the atomicity problem, determine whether there is an attempt to insert, if the primary key conflict is reported, the insert fails, directly skip)

Msgids must be globally unique identifiers, but in practice, there may be situations where the same message has two different MSGIds (active retransmissions by consumers, duplicates due to client retransmissions, and so on), which require repeated consumption of business fields.

According to the description of the documentation, we get the first solution, that is, by de-duplicating the table, which was described in the previous article “Idempotent Solution Set (1)”. Note that:

Determine if the unique key exists in the relational database before consuming. If not, insert and consume, otherwise skip.

Specifically, we can add a message consumption record table to the database, commit the business operation and the message insertion action in the same transaction, and ensure that the message will be consumed only once.

  1. Open the transaction

  2. Insert message table (handle primary key conflicts)

  3. Update order table (original consumption logic)

  4. Commit the transaction

But there are limitations

  1. The message consumption logic must be dependent on a relational database transaction. If the consumption process also involves modification of other data, such as Redis, a data source that does not support transaction features, the data cannot be rolled back.

  2. Database data must be in one library, cross-library cannot be resolved

Send and receive messages using the exact-once post semantics

Exact-once delivery semantics

The following is quoted from aliyun’s documentation:

Exactly-Once means that a message sent to the message system can be processed by the consuming end only Once. Even if a message is re-delivered at the production end, the message is consumed only Once at the consuming end.

The exact-once semantics are the ideal state of message flow in messaging systems and streaming computing systems, but there are not many ideal implementations in the industry. Because the real sense of Exactly-Once depends on the coordination of the three states of the server, the client and the user consumption logic of the messaging system. For example, when your consumer is abnormally down after consuming a message, and the consumer is restarted, the message may be re-consumed because the consuming point is not synchronized to the server side of the messaging system.

There is a great controversy about the exact-once delivery semantics in the industry, and many people will come up with “FLP impossibility theory” or other consistency laws to deny this issue. However, in fact, the exact-once semantic implementation of specific scenarios is not very complicated, just because people usually do not accurately describe the nature of the problem.

If you want to implement the consumption results of a message to be valid only once in a business system, you only need to solve the problem of how to guarantee the consumption idempotency of the same message. The Exactly-Once semantics of the RocketMQ version of the message queue address the problem that the consumption result of one of the most common messages in the business (the result of the message being computed and processed at the consumer end) is valid only Once in the database system.

Typical Usage Scenarios

In the e-commerce system, the upstream real-time computing module releases the information of commodity price change and asynchronously notifies the downstream commodity management module of price change. At this point, it is necessary to ensure that the consumption idempotent of each message, that is, the repeated price change information will only take effect once, so that the situation of repeated price modification will not occur, and the realization of message consumption idempotent is guaranteed.

steps

1 Adding a Dependency

The RocketMQ version of ExactlyOnceConsumer is released in the client SDK ONs-client-ext-1.8.4.final, which is required to be relied on in the application to use the exact-once delivery semantics. In addition, ExactlyOnceConsumer implements a spring-based way to enable exact-once consumption via the @mqTransaction annotation, so you need to add Spring 3.0 + dependencies to your application.

The complete dependencies are shown below.

<dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>ons-client-ext</artifactId> < version > 1.8.4. Final < / version > < / dependency > < the dependency > < groupId > org. Springframework < / groupId > <artifactId>spring-context</artifactId> <version>${spring-version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jdbc</artifactId> <version>${spring-version}</version> </dependency>Copy the code

2 Create a transaction_RECORD table in the database where the message consumption results are stored (note: the database system storing the message consumption results must support local transactions)

To use the exact-once delivery semantics of the RocketMQ version of the message queue, you need to create a Transaction_record table in the database where the business process results are persisted, in the same database where the business process results are stored, and in a database that supports local transactions. Currently, the exact-once delivery semantics of the RocketMQ version of the message queue enable your business to access both MySQL and SQLServer types of data sources. The transaction_record table construction sentences for these two types of data sources are shown below (mysql).

CREATE TABLE `transaction_record` (
  `consumer_group` varchar(128) NOT NULL DEFAULT '',
  `message_id` varchar(255) NOT NULL DEFAULT '',
  `topic_name` varchar(255) NOT NULL DEFAULT '',
  `ctime` bigint(20) NOT NULL,
  `queue_id` int(11) NOT NULL,
  `offset` bigint(20) NOT NULL,
  `broker_name` varchar(255) NOT NULL DEFAULT '',
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  PRIMARY KEY (`id`),
  UNIQUE KEY `message_id_unique_key` (`message_id`),
  KEY `ctime_key` (`ctime`),
  KEY `load_key` (`queue_id`,`broker_name`,`topic_name`,`ctime`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;           

Copy the code

3 in news production end use PropertyKeyConst. EXACTLYONCE_DELIVERY attribute is set to open Exactly Once delivery semantics

/ * * * * by PropertyKeyConst TestExactlyOnceProducer start EXACTLYONCE_DELIVERY open exactly - once delivery semantics. */ public class TestExactlyOnceProducer { public static void main(String[] args) { Properties producerProperties = new Properties(); producerProperties.setProperty(PropertyKeyConst.GROUP_ID,"{GROUP_ID}"); producerProperties.setProperty(PropertyKeyConst.AccessKey,"{accessKey}"); producerProperties.setProperty(PropertyKeyConst.SecretKey,"{secretKey}"); producerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR,"{NAMESRV_ADDR}"); producerProperties.setProperty(PropertyKeyConst.EXACTLYONCE_DELIVERY,"true"); Producer producer = ExactlyOnceONSFactory.createProducer(producerProperties); producer.start(); System.out.println("Producer Started"); for (int i = 0; i < 10; i++) { Message message = new Message("{topic}", "{tag}", "mq send transaction message test".getBytes()); try { SendResult sendResult = producer.send(message); assert sendResult ! = null; System.out.println(new Date() + " Send mq message success! msgId is: " + sendResult.getMessageId()); } catch (ONSClientException e) {system.out.println (" send failed "); If an exception occurs, the message fails to be sent. To avoid message loss, you are advised to cache the message and retry. } } producer.shutdown(); }}Copy the code

4 Create ExactlyOnceConsumer on the message consumer and enable the exact-once consumption mode.

When consuming using the exact-once delivery semantics of the RocketMQ version of the message queue, Consumers need to use the create ExactlyOnceConsumer createExactlyOnceConsumer ExactlyOnceONSFactory call interface, The ExactlyOnceConsumer is then used for ExactlyOnce mode consumption.

Note the following when using ExactlyOnceConsumer:

  • Can create ExactlyOnceConsumer, by setting the PropertyKeyConst. EXACTLYONCE_DELIVERY attribute opening or closing Exactly – Once delivery semantics. ExactlyOnceConsumer turns on the exact-once delivery semantics by default.

  • When using ExactlyOnceConsumer consumption, in the consume method of the MessageListener MessageListener, your business processing logic needs to use MQDataSource to read and write to the database.

You can turn on the exact-once delivery semantics on the consumer side in either of the following ways:

  • Turn on the exact-once delivery semantics in a non-Spring way

  • In MessageListener, many database operations and message consumption are transactional

  • In MessageListener, the exact-once delivery semantics are enabled through Springboot annotations

  • In MessageListener, the exact-once delivery semantics are realized in MyBatis mode

The following example is: MessageListener in the transactional way to achieve a number of database operations and message consumption transactional, other examples of code, please refer to ali cloud documentation: help.aliyun.com/document\_d…

/** * TestExactlyOnceListener implementation. * Implemented a scenario where multiple business tables are updated in a transaction to ensure that operations within a transaction take effect only once. */ public class SimpleTxListener implements MessageListener { private MQDataSource dataSource; public SimpleTxListener() { this.dataSource = new MQDataSource("{url}", "{user}", "{passwd}", "{driver}"); } @Override public Action consume(Message message, ConsumeContext context) { Connection connection = null; Statement statement = null; Try {/** * this is where the business computation is performed on the consumed message and the result is persisted to the database system using MQDataSource. * This example demonstrates a business scenario where multiple tables are updated within a transaction, and the exact-once delivery semantics guarantee that there will be one and only one operation within the transaction. * The actual business process is designed as follows: receive message -> business process -> result persistence. */ connection = dataSource.getConnection(); connection.setAutoCommit(false); String insertSql = String.format("INSERT INTO app(msg, message_id, ctime) VALUES(\"%s\", \"%s\", %d)", new String(message.getBody()), message.getMsgID(), System.currentTimeMillis()); String updateSql = String.format("UPDATE consume_count SET cnt = count + 1 WHERE consumer_group = \"%s\"", "GID_TEST"); statement = connection.createStatement(); statement.execute(insertSql); statement.execute(updateSql); connection.commit(); System.out.println("consume message :" + message.getMsgID()); return Action.CommitMessage; } catch (Throwable e) { try { connection.rollback(); } catch (Exception e1) { } System.out.println("consume message fail"); return Action.ReconsumeLater; } finally { if (statement ! = null) { try { statement.close(); } catch (Exception e) { } } if (connection ! = null) { try { connection.close(); } catch (Exception e) { } } } } }Copy the code

conclusion

As you can see, this approach is encapsulated by Ali Cloud, but theoretically the same as the previous solution based on message tables + local transactions.

Using Redis

The last solution is similar to that based on message table + local transaction, but the Redis distributed lock is used to determine whether repeated messages are implemented. For details, the steps are similar to our previous article, please refer to the previous article.

The last

When an exception occurs, the consumer can be asked to retry several times. If the exception persists after several retries, data compensation is required. Data compensation scheme: If an exception persists after several retries, the message is put into the dead-letter queue and finally entered into the database. Then, a scheduled job is set to query the data and manually compensate the error.

reference

  • Github.com/apache/rock…

  • www.baiyp.ren/%E4%B8%9A%E…

  • Mp.weixin.qq.com/s/Ojdh0-POj…

  • Help.aliyun.com/document\_d…