Describes how to design a stable, high-concurrency, message-preserving IM system, and how to optimize the system architecture by using the advanced features of the storage tier.
In the construction of social IM and moments applications, a basic requirement is to timely and accurately update the messages sent by users and moments to the users’ friends. In order to do this, it is often necessary to set a sequence number or ID for each message or friend update sent by the user and ensure that all messages are processed in complete and correct order by the recipient. Or the message is sent when the message total concurrency is very big, we usually choose no storage products to store messages, but no common products provided from the function of the column, so often to use external components to realize news serial number and ID increasing, the overall structure is more complex, also influences the time delay of the entire link.
Function is introduced
Table storage is newly launchedPrimary key column incrementsFunctionality effectively addresses the requirements of the scenario described above. Specific practices for when creating a table, in a statement the primary key as the column, at the time of writing a line of new data, the application to fill on the column with the real value, only need to fill in a placeholder, form the storage system in the received data after the on the columns will automatically generate a value, and ensure that within the scope of the same partitioning key, the generated value is bigger than into the value of Mr.
The primary key column increment function has the following features:
- Table stores unique system architecture and primary key autoincrement column implementation, which can ensure that the value of the generated autoincrement column is unique, andStrictly increasing 。
- Currently, multiple primary keys are supported. The first primary key is the partition key. To evenly distribute data, the partition key cannot be set to auto-increment column.
- Primary key column increment is because partition keys are not allowed to be set to increment columnsIncrement of partition key level 。
- Any of the primary keys except the partitioning key can be set to an increasing column.
- For each table, at presentYou can set only one primary key column increment column 。
- Attribute columns cannot be set to increment columns.
- The value automatically generated by the autoincrement column isA 64-bit signed long integer 。
- The autoincrement function isTable level, the same instance can have an autoincrement column table, can also have a non-autoincrement column table.
- You can set a self-added column only when creating a table. You cannot upgrade an existing table to a self-added column.
This section describes how to use the primary key column increment function in a table.
scenario
Let’s continue with the example at the beginning of this article by building an IM chat tool to demonstrate the function and use of primary key column increment.
function
Our IM chat software needs to support the following functions:
- Support users one-on-one chat
- User group chat is supported
- Supports message synchronization between multiple terminals for the same user
The existing structure
The first step is to determine the message model
- The figure above shows this message model
- After the sender sends a message, the message is pushed to the background system by the client
- The background system stores messages first
- After the storage is successful, a message is pushed to the client of the receiver
The second step is to determine the background architecture
- Background architecture is mainly divided into two parts: logical layer and storage layer.
- The logical layer includes application server, queue service and auto-increment ID generator. It is the core of the whole background architecture and processes core business logic such as message receiving, push, notification, group message writing and replication.
- The storage layer is used to persist message data and other data that needs to be persisted.
- For one-to-one chat, the sender sends a message to the application server, application server to save the message as a recipient of the primary key in the table, at the same time inform the application server have a new message, the message recommendation information push service will be the last push to the recipient’s last message message ID as starting a primary key, after reading all the news from the storage system, The message is then pushed to the recipient.
- For chat within a group, the logic is more complicated, and messages need to be spread out through asynchronous queues. That is, a message sent to a group will be saved for everyone in the group.
- The figure above shows the group message sending process with the storage layer omitted.
-
There are two main reasons for using diffuse writing rather than diffuse reading:
- Group members are generally not many, the storage cost is not high, and there is compression, the cost is lower.
- After the message spreads to everyone’s storage (inbox), you only need to check your inbox when you push the message to each recipient. At this time, the processing logic of group chat is the same as that of single chat, which is easy to implement.
-
After the sender sends a message, the message is pushed to the application server, the client application server, according to the receiver’s ID will be distributed to one of the message queue, the receiver of the message in the same queue, the queue, order processing each message, first from the increased ID to get a new message ID in the generator, This message is then written to the table storage system. Write successfully before writing the next message.
-
Messages from the same receiver are kept in a queue as much as possible. There may be messages from multiple receivers in a queue.
-
Within the group chat may appear the same time two user sends a message at the same time, the two news may enter the different application server, but the application server will be the same receiver of messages to the same queue service, by this time, for the same receiver, this two messages will be in the same queue, the diagram below:
-
Serial processing the data of each queue, when stored in written form, assign a new ID, is bigger than ID before, in order to ensure that messages can be strictly increasing, avoid written before a message failure cannot be strictly increasing, need to write data to the storage system, holds a user-level lock, before not written success, Other messages of the same user cannot continue to be written. Otherwise, if the current message fails to be written, the sequence is out of order. When the write succeeds, the lock is released and the next message continues.
-
In the previous step, if the queue is down, need to deal with these news, at that time, the original message will enter a new queue, then a new queue needs a new message ID, but more than previously existing message ID, and the new queue does not know what is the biggest ID before, so, here each queue can’t create the ID, Instead, you need a global increment ID generator.
-
In order to support multiple terminals, in the application server will be holding a session for each terminal, each session held a current update ID, when informed have new news, will go to the storage system all messages after read the current news, thus guarantee the terminal more online at the same time, each terminal can be synchronous message, and do not affect each other, See below.
-
In multiple terminal, if there is a part of the terminal from online to offline, so the application server will be the end of the session to the other a table of the storage system, when after a period of time, the terminal online again, can recover from the storage system before the session, continue to push terminal before did not read the message.
Step 3: Determine the storage system
Storage system, we chose Ali CloudTable is stored, mainly for the following reasons:
- Write operations are not only supportedSingle writeAnd also supportsWrite multiple lines in batchesTo meet the requirements of concurrent data writing.
- Supported byRange of reading, many messages can turn pages.
- supportData life cycle management, automatic cleaning of expired data, saving storage costs, detailed documentation
- Table storage is a cloud service that Ali Cloud has commercialized.Stable and reliable 。
- Table is storedcheapFor users with a large amount of data, they can also buy plans at a better price.
- Excellent read/write performanceFor chat messages, the latency is generally in milliseconds or even subtle.
Step 4, determine the table structure
The table structure of the determined table store is as follows:
The primary key sequence | Name of the primary key | The primary key | instructions |
---|---|---|---|
1 | partition_key | Md5 (receive_id) before four | Partition key to ensure uniform distribution of data |
2 | receive_id | receive_id | User ID of the receiver |
3 | message_id | message_id | Message ID |
- The table structure is divided into two parts, the primary key column part and the attribute column part. The primary key column part supports a maximum of four primary keys. The first primary key is the partition key.
- The structure of the primary key column must be determined before use and cannot be modified during use. The attribute column part is Schema Free, and users can customize it freely. The attribute column part of each row of data can be different, so only the structure of the primary key column part needs to be designed.
- The first primary key is the shard key, which aims to evenly distribute data and requests and avoid hot spots. Since the message is read according to the receiver, the shard key can be used as the ID of the receiver. For more balance, part of the MD5 value of the receiver ID, such as the first four characters, can be used. So you can distribute the data evenly.
- The first primary key uses only part of the recipient ID. In order to locate the recipient’s message, the complete recipient ID needs to be saved, so the recipient ID can be used as the second primary key.
- The third primary key can be the message ID, which needs to be monotonically incremented because of the need to query for the latest messages.
- Property columns can store message content, metadata, and so on.
At this point, we have designed a complete chat system, although the system is operational and can handle large concurrency and performance is not bad, but there are still some challenges.
challenge
- Multiple users in a queue, the queue serial execution, strictly increasing in order to keep the news, here to hold locks in the implementation process, there will be a risk: if the amount of messages sent to a user, the user will get more messages in the queue, it is possible to block other users’ messages, lead to delay with the other users message queue.
- When there are major events or special holidays, when the chat information volume is large, the queue part needs to be expanded, otherwise it may not be able to withstand the heavy pressure, resulting in the overall system delay increase or crash.
For the above two problems, Problem 2 can be solved by adding more machines, but problem 1 cannot be solved by adding more machines, which can only alleviate the problem, but not completely solve it. Is there a way to solve these two problems?
The new architecture
The complexity of the above two problems is mainly due to the need for strict message increment. The upper application layer is much simpler if the primary key column increment function stored in tables is used.
Table storage is usedPrimary key column increment functionThe new structure is as follows:
- The most obvious difference is that the architecture is simpler without the queue service and the increment ID generator.
- After receiving the message, the application server directly writes the message to the table for storage. For message_ID, you only need to fill in a specific placeholder instead of a definite value. The value is automatically generated in the table storage system.
- In the new architecture, increment operations are handled within the table storage system. Even if multiple application servers simultaneously write data to the same receiver in the table storage, the table storage system ensures that the messages are processed sequentially, with each message having an independent message ID and strictly incrementing. The previous queue service is no longer needed. itSolve the problem 1 above completely
- The form storage system is a cloud service, users do not need to consider the capacity of the system, and the form storage support payment by volume, so thatSolve the problem 2 above completely
- Previously, only one queue could process messages from the same user. Now, multiple queues can process messages in parallel, so that a sudden increase in the volume of messages from some users does not immediately block other users, but distributes the pressure evenly across all queues.
- With the primary key auto-add function, the application server can write data directly to the table store without having to go through the queue and get the message ID.Performance will be better.
implementation
With the above architecture diagram, now you can start to implement, using the JAVA SDK, the current 4.2.0 version has supported the primary key column increment function, the 4.2.0 version of the JAVA SDK documentation and download address.
The first step is to create a table
According to the previous design, the table structure is as follows:
The primary key sequence | Name of the primary key | The primary key | instructions |
---|---|---|---|
1 | partition_key | Hash (receive_id) before four | Partition key to ensure uniform data distribution. You can use MD5 as the hash function |
2 | receive_id | receive_id | User ID of the receiver |
3 | message_id | message_id | Message ID |
The third column PK is message_id, which is the AUTO_INCREMENT column of the primary key. When creating the table, specify the attribute of message_id as AUTO_INCREMENT and type as INTEGER.
Private static void createTable(SyncClient Client) {TableMeta TableMeta = new TableMeta(" message_table "); / / the first listed as partition built tableMeta. AddPrimaryKeyColumn (new PrimaryKeySchema ("partition_key", PrimaryKeyType.STRING)); . / / the second as the receiver ID tableMeta addPrimaryKeyColumn (new PrimaryKeySchema ("receive_id", PrimaryKeyType.STRING)); / / the third message ID, automatically add column, type INTEGER, attribute is PKO_AUTO_INCREMENT tableMeta. AddPrimaryKeyColumn (new PrimaryKeySchema ("message_id", PrimaryKeyType.INTEGER, PrimaryKeyOption.AUTO_INCREMENT)); int timeToLive = -1; Int maxVersions = 1; int maxVersions = 1; TableOptions TableOptions = new TableOptions(timeToLive, maxVersions); CreateTableRequest request = new CreateTableRequest(tableMeta, tableOptions); client.createTable(request); }Copy the code
This creates a table whose third column PK is auto-incremented.
Step two, write the data
Data writing supports PutRow and BatchWriteRow, both of which support the function of adding the primary key column. When writing data, the third column message_id is the column of adding the primary key. You only need to fill in placeholders.
private static void putRow(SyncClient client, String receive_id) {/ / construct primary key PrimaryKeyBuilder PrimaryKeyBuilder = PrimaryKeyBuilder. CreatePrimaryKeyBuilder (); // The value of the first column ishash(receive_id) before four primaryKeyBuilder. AddPrimaryKeyColumn (" partition_key ", PrimaryKeyValue. FromString (hash(receive_id).substring(4))); / / the second column has a value of the receiver ID primaryKeyBuilder. AddPrimaryKeyColumn (" receive_id, PrimaryKeyValue fromString (receive_id)); // The third column is the message ID, the primary key increment column, which is generated by TableStore. The user does not need to fill in the true value here, but only needs a placeholder: AUTO_INCREMENT. primaryKeyBuilder.addPrimaryKeyColumn("message_id", PrimaryKeyValue.AUTO_INCREMENT);
PrimaryKey primaryKey = primaryKeyBuilder.build();
RowPutChange rowPutChange = new RowPutChange("message_table", primaryKey); // Set the return type to RT_PK, which means that the return result contains the value of the PK column. If ReturnType is not set, it is not returned by default. rowPutChange.setReturnType(ReturnType.RT_PK); RowPutChange. AddColumn (new Column("content", ColumnValue.fromString(content))); // Write data to TableStore PutRowResponse Response = client.putrow (new PutRowRequest(rowPutChange)); // Prints out the returned PK column RowreturnRow = response.getRow();
if (returnRow ! = null) { System.out.println("PrimaryKey:" + returnRow.getPrimaryKey().toString()); } / / print out the consumption of CU CapacityUnit CU = response. GetConsumedCapacity () getCapacityUnit (); System.out.println("Read CapacityUnit:" + cu.getReadCapacityUnit());
System.out.println("Write CapacityUnit:" + cu.getWriteCapacityUnit());
}
Copy the code
Step 3: Read the data
When reading messages, the GetRange interface reads the latest messages. The PK column starts with message_id+1 of the previous message and ends with INF_MAX. In this way, the latest messages can be read each time and sent to the client
private static void getRange(SyncClient client, String receive_id, String lastMessageId) {RangeRowQueryCriteria RangeRowQueryCriteria = new RangeRowQueryCriteria(" message_table "); / / set the initial primary key PrimaryKeyBuilder PrimaryKeyBuilder = PrimaryKeyBuilder. CreatePrimaryKeyBuilder (); // The value of the first column ishash(receive_id) before four primaryKeyBuilder. AddPrimaryKeyColumn (" partition_key ", PrimaryKeyValue. FromString (hash(receive_id).substring(4))); / / the second column has a value of the receiver ID primaryKeyBuilder. AddPrimaryKeyColumn (" receive_id, PrimaryKeyValue fromString (receive_id)); / / the value of the third column for the message ID, began on a message primaryKeyBuilder. AddPrimaryKeyColumn (" message_id ", PrimaryKeyValue fromLong (lastMessageId + 1)); rangeRowQueryCriteria.setInclusiveStartPrimaryKey(primaryKeyBuilder.build()); / / set the end of the primary key primaryKeyBuilder = primaryKeyBuilder. CreatePrimaryKeyBuilder (); // The value of the first column ishash(receive_id) before four primaryKeyBuilder. AddPrimaryKeyColumn (" partition_key ", PrimaryKeyValue. FromString (hash(receive_id).substring(4))); / / the second column has a value of the receiver ID primaryKeyBuilder. AddPrimaryKeyColumn (" receive_id, PrimaryKeyValue fromString (receive_id)); / / the value of the third column for the message ID primaryKeyBuilder addPrimaryKeyColumn ("message_id", PrimaryKeyValue.INF_MAX);
rangeRowQueryCriteria.setExclusiveEndPrimaryKey(primaryKeyBuilder.build());
rangeRowQueryCriteria.setMaxVersions(1);
System.out.println("The results of GetRange are :");
while (true) {
GetRangeResponse getRangeResponse = client.getRange(new GetRangeRequest(rangeRowQueryCriteria));
for(Row row : getRangeResponse.getRows()) { System.out.println(row); } // If nextStartPrimaryKey is not null, continue reading.if(getRangeResponse.getNextStartPrimaryKey() ! = null) { rangeRowQueryCriteria.setInclusiveStartPrimaryKey(getRangeResponse.getNextStartPrimaryKey()); }else {
break; }}}Copy the code
The above demonstrates the application of table storage and its primary key column increment function in chat system. It also has great value in other scenarios. We look forward to exploring it together.
You are also welcome to join the table storage technology exchange pin group discussion:
Other articles recommended:
How to store GPS data efficiently
Accessing TableStore (OTS) with MaxCompute
The original link