Abstract:

background

In the current database system ecosystem, most systems support the data synchronization mechanism between multiple node instances, such as Mysql Master/Slave Master/Slave, Redis AOF Master/Slave synchronization, etc. MongoDB even supports the synchronization of replica sets with 3 or more nodes, which provides a good support for the above mechanism

A logical unit

Data synchronization across luS, even across cells and data centers is sometimes very important in the business layer. It makes load balancing, mutual backup of multiple computer rooms in the same city, and even disaster recovery and multi-activity of remote multiple data centers possible. As the master-slave synchronization built into MongoDB replica sets has great limitations for such business scenarios, we developed MongoShake system, which can be applied to replication between instances, replication between machine rooms and replication across data centers to meet the requirements of DISASTER recovery and multi-activity.

In addition, data backup is at the core of MongoShake, but not the only feature. As a platform service, MongoShake users can realize data subscription and consumption through docking with MongoShake to meet different business scenarios.

Introduction to the

MongoShake is a general platform-based service written in Golang language. It replicates MongoDB data by reading Oplog operation logs of MongoDB cluster, and then realizes specific requirements through operation logs. Logs can provide many scenario-based applications. Therefore, we have considered Making MongoShake a universal platform-type service in our design. Through operation logs, we provide the PUB/SUB function of log data subscription consumption, which can be flexibly connected to different scenarios (such as log subscription, data center synchronization, Cache asynchronous elimination, etc.) through SDK, Kafka, MetaQ, etc. Cluster data synchronization is the core application scenario. The oPLOg is captured and played back to achieve synchronization, realizing DISASTER recovery and multi-active service scenarios.

Example Application Scenarios

1. Data is asynchronously replicated between MongoDB clusters to avoid double-write.

2. Mirror backup of data between MongoDB clusters (current 1.0 open source version has limited support)

3. Analyze logs offline

4. Log subscription

5. Data routing. Based on service requirements, the system can obtain the desired data by combining the log subscription and filtering mechanism to achieve the data routing function.

6. Cache synchronization. As a result of log analysis, you can know which caches can be eliminated and which caches can be preloaded, driving Cache updates in reverse.

7. Cluster monitoring based on logs

Function is introduced

MongoShake grabs Oplog data from the source library and sends it to various tunnel channels. The existing channel types are:

1. Direct: Directly writes data to the destination MongoDB

2. RPC: Net/RPC is used for connection

3. TCP: Indicates the TCP connection

4. File: Connects to the system in File mode

5. Kafka: Interconnects in Kafka mode

6. Mock: Used for testing. Do not write to the tunnel and discard all data

Consumers can obtain desired data through a tunnel channel. For example, data is written to the destination MongoDB through a Direct channel or data is transmitted through RPC. In addition, users can create their own API for flexible access. The following two diagrams illustrate the basic architecture and data flow.

Source database connected with MongoShake supports three modes: single Mongod, Replica set and Sharding. The destination database supports Mongod and Mongos. If the source database is replica set, we suggest connecting the standby database to reduce the pressure of the primary database. If it is in Sharding mode, then each shard will be connected to MongoShake and fetch in parallel. For the destination library, multiple Mongos can be connected, and different data will be hashed and written to different Mongos.

  • The parallel copying

MongoShake provides parallel replication capability. The shard_key option can be ID, Collection or auto. Different documents or tables can be entered into different hash queues and executed concurrently. Id means hash by document; Collection means hash by table; Auto indicates automatic configuration. If a table has a unique key, it is degraded to Collection. Otherwise, it is equivalent to ID.

  • HAplan

MongoShake stores synchronization contexts on a regular basis, either in third-party apis (registries) or source libraries. The current context content is Oplog timestamp successfully synchronized. In this case, when the service is switched or restarted, the new service can continue to provide services by connecting to the API or database.

In addition, MongoShake provides a Hypervisor mechanism to pull services back up if they fail.

  • filter

The blacklist and whitelist mechanisms are used to selectively synchronize db and collection.

  • The compression

Support oplog compression before sending, currently supported compression formats are gzip, Zlib, or Deflate.

  • Gid

Data in a database may contain data from different sources: self-generated and replicated from elsewhere. If you do not take proper measures, data may be replicated in A ring. For example, data from A to B may be copied from B to A, causing A service storm and suspension. Or write back from B to A failed because of unique key constraints. This leads to service instability.

In the MongoDB version on Ali Cloud, we provide a feature to prevent ring replication. The main principle is that the MongoDB kernel is modified, the GID is added in oplog to identify the current database information, and the GID is carried by the op_command command during the replication process, so that each piece of data has the source information. If only the data generated by the current database is needed, only the oplog whose GID is equal to the database ID is fetched. So, under the scenario of annular replication, grab gid equals id_A MongoShake from A database (A gid) data, from database grab gid is equal to B id_B gid (B) data can solve this problem.

Note: Since the gid part of the MongoDB kernel is not open source yet, this function is limited in the open source version, but it is supported in the MongoDB version of Ali Cloud. This is why the “mirror backup of data between MongoDB clusters” mentioned earlier is limited in the current open source version.

  • Checkpoint

MongShake uses the ACK mechanism to ensure successful playback of oplog. If it fails, retransmission will be triggered. The retransmission process is similar to the sliding window mechanism of TCP. This is mainly designed for application layer reliability, such as decompression failure, etc. To better illustrate, let’s define a few terms:

Log Sequence Number (LSN) : indicates the latest Oplog Sequence Number that has been transmitted.

LSN_ACK (Acked Log Sequence Number) : indicates the maximum LSN that has been acknowledged by ack. That is, the LSN that is written to the tunnel successfully.

LSN_CKPT (Checkpoint Log Sequence Number) : Indicates that the Checkpoint LSN has been configured, that is, the PERSISTENT LSN.

The values of LSN, LSN_ACK, and LSN_CKPT are all from the timestamp TS field of Oplog, where the implicit constraint is LSN_CKPT<=LSN_ACK<=LSN.

As shown in the figure above, LSN=16 indicates that 16 Oplogs have been transmitted. If there is no retransmission, LSN=17 will be transmitted next time. LSN_ACK=13 indicates that the first 13 packets have been received. If the packets need to be retransmitted, the earliest packets start from LSN=14. LSN_CKPT=8 Checkpoint =8 The point of persistence is that if MongoShake hangs at this point and you reboot, the oplog of the source database will start reading from LSN_CKPT instead of starting from LSN=1. Because of the idempotent nature of oplog DML, multiple transmission of the same data does not cause problems. However, with DDL, retransmission can cause errors.

  • Barriers and speed limits

MongoShake provides Restful API externally, providing real-time view of the internal process of each queue data synchronization, easy to troubleshoot problems. In addition, it also provides speed limiting function, which is convenient for users to control in real time and reduce database pressure.

  • Collision detection

MongoShake currently supports table level (Collection) and document level (ID) concurrency. Id level concurrency requires db to have no unique index constraint, while table level concurrency does not perform well when the number of tables is small or some tables are very unevenly distributed. Therefore, in the case of table-level concurrency, it is necessary to have evenly distributed concurrency while also resolving the case of unique key conflicts in the table. For this reason, if the tunnel type is direct, we provide pre-write conflict detection.

Currently, only unique indexes are supported. Other indexes such as prefix, sparse, and TTL indexes are not supported.

The precondition of conflict detection should meet two prerequisite constraints:

1. MongoShake think synchronous mongo Schema is consistent, also won’t listen the Oplog System. The indexes table changes

2. Conflict indexes are recorded in Oplog, not those in MongoDB.

Also, MongoShake’s manipulation of indexes during synchronization may throw exceptions:

1. An index is being created. If the index is created in the background, the index is invisible to write requests during this period, but is visible to the internal index, which may cause high memory usage. If the foreground index is built, all user requests are blocked, and if blocked for too long, retransmission will occur.

2. If the only index in the destination inventory does not exist in the source database, data inconsistency will be caused and the data will not be processed.

3. Only after oplog is generated does the source database add or delete unique indexes. Retransmission may cause problems in adding or deleting indexes, and we do not handle this problem.

In order to support the function of conflict detection, we modified the MongoDB kernel to bring the UK field into oplog to identify the unique index information involved, such as:

{
    "ts" : Timestamp(1484805725, 2),
    "t" : NumberLong(3),
    "h" : NumberLong("6270930433887838315"),
    "v": 2."op" : "u"."ns" : "benchmark.sbtest10"."o" : { "_id" : 1, "uid" : 1111, "other.sid":"22222"."mid": 8907298448,"bid":123 }
    "o2" : {"_id" : 1}
    "uk" : {
        	"uid": "1110"
        	"mid^bid": [8907298448, 123]
        	"other.sid_1": "22221"}}Copy the code

The key below UK represents the column name of the unique key, and the key is joined by ^ to represent the joint index. There are three unique indexes in the above records: uid, mid, bid, and other.sid_1. Value has different meanings in addition, deletion and modification. If the operation is addition, value is null. If the operation is a deletion or modification, record the value before the deletion or modification.

The specific processing process is as follows: Pack k consecutive OploGs into a batch, analyze the dependencies within each batch in a flow way, and divide them into segments. If there are conflicts, the batch is divided into multiple segments based on dependencies and timing relationships. If there is no conflict, it is divided into a segment. It then writes concurrently within segments and sequentially between segments. Intra-segment concurrency means that multiple concurrent threads write data in the segment at the same time, but the same ID in the same segment must be in order. Sequential execution is guaranteed between segments: Subsequent segments are written only after the previous segment is completely executed.

If oplogs with different ids in a batch operate the same unique key at the same time, these Oplogs are considered to have a sequential relationship, also known as a dependency relationship. We must split the oplog with dependencies into two segments.

MongoShake has two ways to deal with existing dependency relations:

(1) insert the barrier

The batch is split by inserting barriers, with concurrency occurring within each segment. For example, here’s an example:

ID indicates the document ID, op indicates the operation, I indicates the insert, u indicates the update, d indicates the delete, UK indicates all the unique keys in the document, UK ={a:3} => UK ={a:1} indicates that the value of the unique key is changed from a=3 to a=1, and a is the unique key.

At the beginning, there are 9 Oplogs in the batch, which are split by analyzing the UK relationship. For example, article 3 and Article 4 operate the same UK ={a:3} in the case of inconsistent ID, then the barrier between item 3 and item 4 needs to be inserted (no matter which item is the same before or after modification, it is considered a conflict). Same with articles 5 and 6, 6 and 7. It is allowed to operate with the same ID and UK in the same section, so items 2 and 3 can be placed in the same section. After splitting, the segment can be concurrently executed by ID, with the same ID still in order: for example, items 1 and 2, 3 in the first segment can be concurrently executed, but items 2 and 3 need to be executed sequentially.

(2) Split according to the relationship dependency graph

Each Oplog corresponds to a time serial number N, so there may be a M in each serial number N so that:

  • If M and N operate on the same value of the same unique index, and M ordinal is less than N, a directed edge from M to N is constructed.
  • If the document IDS of M and N are the same and the ordinal number of M is less than N, a directed edge from M to N is also constructed.
  • Since the dependence is ordered in time, there must be no rings.

Therefore, this graph becomes a directed acyclic graph. The points with an input degree of 0 (no input edge) can be concurrently written according to the topological sorting algorithm each time. For points with a non-0 input degree, the data can be written after the input degree becomes 0, that is, the data can be written after the execution of the pre-order nodes is completed.

The figure below shows an example: there are 10 Oplog nodes, a horizontal line indicates that the document ID is the same, and the arrow direction on the right shows that there is a unique key conflict dependency. So, the diagram is executed four times: write 1,2,4,5, then 3,6,8, then 7,10, and finally 9.

Note: Because the modification of conflict detection UK in MongoDB has not been open source yet, this function is limited in the open source version, but it is supported in ali Cloud MongoDB version.

Architecture and data flow

MongoShake’s internal architecture and data flow details are shown above. In general, the whole MongoShake can be roughly divided into three parts: Syncer, Worker and Replayer. Replayer is only used when tunnel type is Direct.

If the source is Mongod or ReplicaSet, there is only one Syncer. If the source is Sharding, there are multiple Syncers that correspond to the Shard. In Syncer, fetcher first uses the mgo.v2 library to fetch data from the source library and then batch packages it into the PendingQueue. The Deserializer thread captures data from the PendingQueue for deserialization. The Batcher reorganizes the data captured from LogsQueue, aggregates the data destined for the same Worker, and then hashes it to the corresponding Worker queue.

The main function of Worker is to grab data from WorkerQueue and send it. Due to the ACK mechanism, several queues are maintained internally, namely unsent queue and sent queue. The former stores the unsent data, and the latter stores the sent data that has not received ACK confirmation. After sending, the unsent data is transferred to the sent queue. After receiving an ACK reply from the peer end, data whose SEQ is smaller than ACK in the sent queue will be deleted to ensure reliability.

Workers can connect to different Tunnel channels to meet different requirements of users. If the channel type is direct, the destination MongoDB operation will be directly written to Replayer. Worker and Replayer correspond one by one. First, Replayer distributes the received data to different executorqueues based on conflict detection rules, and executors then grab it from the queue and write it concurrently. In order to ensure high efficiency of writing, MongoShake will also merge adjacent Oplog with the same Operation and the same Namespace before writing.

User Case

Amap App is the leading map and navigation application in China. Aliyun MongoDB database service provides partial storage support for this application, storing hundreds of millions of levels of data. Now Amap uses the two-center strategy in China to improve service quality by routing to the nearest center through geographical location and other information. The business party (AMAP) routes to three urban data centers through users, as shown in the figure below. There is no dependent calculation between data in the machine room.

These three cities are geographically across the whole China from north to south, which poses challenges to the replication and disaster recovery of DCS. If there is a problem in the equipment room or network in one area, the traffic can be smoothly switched to another place, so that users hardly feel the problem.

At present, our strategy is that the topology adopts the interconnection mode of two rooms, and the data of each room will be synchronized to the other two rooms. Then, through the routing layer of Autonavi, user requests are routed to different data centers, and read and write are sent to the same data center to ensure a certain transactional nature. Then, through MongoShake, the data of the two data centers is replicated asynchronously in both directions to ensure that each data center has the full amount of data (to ensure final consistency). If a problem occurs in any equipment room, one of the other two equipment rooms can provide read and write services after switching. The following figure shows the synchronization of rooms in City 1 and City 2.

If a certain unit cannot be accessed, MongoShake Restful management interface can be used to obtain the synchronization offset and time stamp of each machine room, and judge whether asynchronous replication has been completed at a certain point in time by judging collection and writing values. Together with the DNS traffic cutting of the service side, the traffic of the unit is cut off and the requests of the original unit can be read and written in the new unit, as shown in the following figure.

Performance test data

For details about the test data, see the performance test documents.

subsequent

MongoShake will be maintained for a long time, with continuous iterations of major and minor versions. Ask questions, leave comments and join us for open source development.

The original link