1. Business scenarios

In 2017, Mobike began to apply TiDB to the actual business. According to the continuous development of the business, TiDB version was rapidly iterated. We gradually divided the application scenarios of TiDB in Mobike into three levels:

P0 core services: core online services must have one service in one cluster and cannot share cluster performance with multiple services. They must be deployed across azs and have remote Dr Capability.

Level P1 online service: Online service, which allows multiple services to share a TiDB cluster without affecting the main flow.

Offline service cluster: non-online services that have low requirements on real-time performance and can tolerate minute-level data delay.

This paper will select three scenarios and briefly introduce the posture, problems and solutions of TiDB in Mobike.

2. Order Cluster (P0 business)

Order business is the P0 core business of the company. The previous Sharding scheme can no longer support mobike’s rapidly growing order volume. The problems of single database capacity ceiling and uneven data distribution are becoming more and more obvious. It not only perfectly solves the above problems, but also provides multi-dimensional queries for business.

2.1 Order TiDB cluster deployment architecture

  


Figure 1 Architecture diagram of a two-site, three-center deployment

The entire cluster is deployed in three equipment rooms: city A, city B, and remote C. Due to the high network delay of the remote machine room, the design principle is to try to make PD Leader and TiKV Region Leader in the same city machine room (ONLY the Leader node provides services for Raft protocol). Our solution is as follows:

PD sets the priorities of the three PD servers to 5, 5 and 3 through Leader Priority.

Divide TiKV instances across equipment rooms into AZs using labels to ensure that the three copies of Region do not belong to the same AZ.

The label-property reject-leader command is used to limit the Region leader in the remote equipment room. In most cases, the Region leader is selected in the same equipment room A and B.

2.2 Order cluster migration process and service access topology

  


Figure 2 Migration process and service access topology of order cluster

For ease of description, the Sharding-JDBC part of the figure is called the old Sharding cluster and the DBProxy part is called the new Sharding cluster.

The new Sharding cluster is modded according to order_ID and written to each sub-table through DBproxy to solve problems such as uneven data distribution and hot spots.

The data of the old Sharding cluster is synchronized to the new Sharding cluster by using DRC(Gravity, an open source heterogeneous data synchronization tool developed by Mobike), and the incremental data is marked. The reverse synchronization link ignores marked traffic to avoid circular replication.

In order to support rolling back business to the old Sharding cluster in the online process, it is necessary to synchronize incremental data on the new Sharding cluster back to the old Sharding cluster. Since writing back to the old Sharding cluster requires coupling business logic, Therefore, DRC(Gravity) is responsible for the incremental number of subscribing dbproxy-Sharding cluster into Kafka, and the business side develops a service that consumes Kafka to write data to the old Sharding cluster.

The new TiDB cluster acts as the order pool and uses DRC(Gravity) to synchronize data from the new Sharding cluster to TiDB.

In the new scheme, DBProxy cluster is responsible for the read and write traffic of order_ID, and TiDB database is responsible for the query of other dimensions as readonly.

2.3 Some problems in using TiDB

2.3.1 When the new cluster traffic grayscale reaches 20% at the initial stage of online operation, the TiDB CoProcessor is very high and a large number of Server is Busy errors occur in logs.

Problem analysis:

The single table of order data has more than 10 billion rows, and the data involved in each query is scattered across 1000+ regions. When the Handle constructed according to index reads the table data, it needs to send a lot of distSQL requests to these regions. Thus, the QPS of gRPC on COprocessor increases.

TiDB’s execution engine runs in a Volcano model, where all the physical executors form a tree and each layer gets results by calling the Next/NextChunk() method of the Next layer. Chunk is a data structure that stores internal data in memory, which is used to reduce memory allocation overhead, reduce memory usage, and implement statistics and control of memory usage. The execution framework used in TiDB 2.0 will continuously call Child’s NextChunk function to obtain data of a Chunk. Each function call returns a batch of data controlled by a session variable called tidb_max_CHUNk_size, which defaults to 1024 rows. Due to the scattered nature of the order table, not much data needs to be accessed from a single Region. Therefore, the default Chunk size (1024) is obviously inappropriate for this scenario.

Solution:

After upgrading to the 2.1 GA version, this parameter became a globally tunable parameter and the default value was changed to 32, making memory usage more efficient and reasonable. This problem was resolved.

2.3.2 When TiDB imports full data, TiDB uses an implicit auto-increment ROWID by default, and a large number of inserts write data sets into a single Region, resulting in write hotspots.

Solution:

By setting SHARD_ROW_ID_BITS, you can split ROwid into multiple regions, alleviating the problem of hot writes. ALTER TABLE table_name SHARD_ROW_ID_BITS = 8; .

2.3.3 Due to high network latency, the remote equipment room is mainly responsible for DISASTER recovery but does not provide services. Network jitter lasts for about 10 seconds. A large number of no Leader logs are detected on the TiDB and network isolation occurs on the Region follower node. The term of the isolated node increases. The network fluctuation for a long time will make the main selection occur many times, and the main selection process can not provide normal services, and finally may lead to an avalanche.

Problem analysis:

Network isolation occurs on a Follower in Raft algorithm, as shown in the figure below.

  


Figure 3 Network isolation of followers in Raft algorithm

Follower C initiates an election and changes to the Candidate role after the election timeout fails to receive the heartbeat.

The term will be increased by 1 each time an election is initiated. Due to network isolation, the term of C node that fails the election will continue to increase.

After the network is restored, the term of this node will spread to other nodes in the cluster, leading to the re-selection of the master. As the log data of C node is not the latest in fact, it will not become the Leader, and the order of the whole cluster will be disturbed by the ISOLATED C node in the network, which is obviously unreasonable.

Solution:

This issue was resolved with the introduction of Raft PreVote in TiDB 2.1 GA.

In the PreVote algorithm, a Candidate first confirms that he/she can win the votes of most nodes in the cluster before adding his/her term, and then initiates a real vote. Other nodes agree to initiate a re-election under stricter conditions, which must be simultaneously met:

No heartbeat was received from the Leader. At least one election timed out. Procedure

The Candidate log is new enough. With the introduction of PreVote algorithm, network isolated nodes cannot add term because they cannot obtain the permission of most nodes, and re-joining the cluster will not lead to a new master election.

3. Online Business Cluster (P1 Business)

Online business cluster, carrying user balance change, my message, user life cycle, credit score and other P1 business, data scale and access within a controllable range. The resulting TiDB Binlog can be synchronised incrementally to the big data team by Gravity, and the new credit score calculated by the analysis model is periodically written back to the TiDB cluster.

  


Figure 4 Online service cluster topology

Iv. Data Sandbox Cluster (Offline Service)

Data sandbox, belonging to offline business cluster, is a data aggregation cluster of Mobike. At present, nearly 100 TiKV instances are running, carrying more than 60 TB of data. Gravity data replication center developed by the company collects online databases to TiDB in real time for offline query. Meanwhile, the cluster also carries some internal offline business, data reports and other applications. At present, the average total write TPS of the cluster is 1-2W /s, the peak QPS is 9W /s+, and the cluster performance is relatively stable. The design advantages of this cluster are as follows:

Allows developers to securely query online data.

Cross – library join table SQL in special scenarios.

Big data team data extraction, offline analysis, BI reports.

Indexes can be added as needed at any time to meet complex multi-dimensional queries.

Offline businesses can direct traffic to sandbox clusters without placing additional burden on online databases.

Data aggregation of sub-database and sub-table.

Data archiving and disaster recovery.

  


Figure 5 Data sandbox cluster topology

4.1 Some problems and solutions encountered

4.1.1 TiDB Server OOM Restart

Many friends who have used TiDB may have encountered this problem. When TiDB encounters large requests, it will always apply for memory, resulting in OOM. Occasionally, the entire memory will be overloaded because of a simple query statement, affecting the overall stability of the cluster. Although TiDB itself has the oom Action parameter, the actual configuration has no effect.

Therefore, we chose a compromise solution, which is currently recommended by TiDB: deploy multiple TiDB instances on a single physical machine, distinguish them by ports, and set memory limits for ports with unstable queries (TiDBcluster1 and TiDBcluster2 in the middle part of Figure 5). Ex. :

[tidb_servers]

tidb-01-A ansible_host=$ip_address deploy_dir=/$deploydir1 tidb_port=$tidb_port1 tidb_status_port=$status_port1

tidb-01-B ansible_host=$ip_address deploy_dir=/$deploydir2 tidb_port=$tidb_port2 tidb_status_port=$status_port2 MemoryLimit=20G

Tidb-01-a and TIDB-01-B are deployed on the same physical machine. If the memory of TIDB-01-B exceeds the threshold, the system automatically restarts tiDB-01-B. This does not affect TIDB-01-A.

After version 2.1, TiDB introduced a new parameter tidb_mem_quota_query, which can set the memory usage threshold of query statements. TiDB can partially solve the above problems.

4.1.2 Efficiency of TIDB-binlog components

People usually focus on how to migrate from MySQL to TiDB, but when the business is actually migrated to TiDB, the Binlog of TiDB becomes important. The tiDB-binlog module contains PumpDrainer components. The Drainer merges and sorts the binlogs in The Kafka and sends them to the Drainer in a fixed format.

We ran into a few problems:

Pump sends to Kafka faster than Binlog generates.

Drainer processes Kafka data too slowly, causing excessive latency.

Single-node deployment with multiple TiDB instances does not support multiple Pumps.

In fact, the first two problems are caused by reading and writing Kafka. PumpDrainer writes and writes data sequentially and separately to a single partition. As a result, the speed bottleneck is obvious. But at the same time, I encountered some new problems:

A large number of Binlog messages are sent to Kafka at a time, causing Kafka oom.

Drain is not working and cannot read Kafka data when Pump is flooding Kafka at high speed.

Working with PingCAP engineers, we discovered that this was a bug in Sarama itself. Sarama had no threshold for writing data, but set a threshold for reading data: https://github.com/Shopify/sarama/blob/master/real_decoder.go#L88.

The final solution is to add the kafka-max-message parameter to Pump and Drainer to limit the message size. Single machine deployment with multiple TiDB instances does not support multiple Pumps. This is resolved by updating ansible scripts to change pump-8250.service and its relationship with TiDB to pump-8250.service.

PingCAP reconstructs tidb-binlog so that Kafka is no longer used to store binlogs. Pump and Drainer functions are also tweaked so that Pump forms a Drainer that scales out horizontally to even out the drain. Additionally, the Drainer’s binlog sorting logic is moved to Pump to improve overall synchronization.

4.1.3 Monitoring Problems

In the current TiDB monitoring architecture, TiKV relies on Pushgateway to pull monitoring data to Prometheus, and as the number of TiKV instances increases and the memory limit of Pushgateway reaches 2GB, the process enters suspended animation. The Grafana monitor will look like the breakpoint in Figure 7:

  


Figure 6 Monitoring topology view

  


FIG. 7 Monitoring display diagram

The current temporary solution is to deploy multiple sets of Pushgateway and point the monitoring information of TiKV to different Pushgateway nodes to share the traffic. The final solution to this problem is to use the new version of TiDB (supported by version 2.1.3 and above), which Prometheus will be able to pull TiKV monitoring information directly, eliminating its dependence on Pushgateway.

4.2 Data Replication Center Gravity (DRC)

Here is a brief introduction to Gravity(DRC), a data replication component developed by Mobike.

Gravity is a set of data replication components developed by the Database team of Mobike, which has supported hundreds of synchronous channels with TPS of 50000/s and 80 line delay of less than 50ms. Gravity has the following features:

Multiple data sources (MySQL, MongoDB, TiDB, PostgreSQL).

Support heterogeneous (synchronization between different libraries, tables, and fields), and support synchronization from sub-libraries and sub-tables to composite tables.

Supports hypermetro and multi-activity. Traffic is marked during replication to avoid cyclic replication.

The management node is highly available and data will not be lost during fault recovery.

Support filter Plugin (statement filtering, type filtering, column filtering and other multi-dimensional filtering).

Supports data conversion during transmission.

One-click full + incremental data migration.

Lightweight, stable, efficient and easy to deploy.

Support kubernetes-based PaaS platform to simplify o&M tasks.

Usage Scenarios:

Big data bus: send MySQL Binlog, Mongo Oplog, TiDB Binlog incremental data to Kafka for downstream consumption.

Unidirectional data synchronization: full and incremental synchronization of MySQL MySQLTiDB.

Bidirectional data synchronization: MySQL Bidirectional incremental synchronization of MySQL to prevent cyclic replication.

MySQL database synchronization: MySQL database synchronization. You can specify the mapping between source and target tables.

Data cleaning: During synchronization, the Filter Plugin can be used to customize data conversion.

Data archiving: MySQL archive library. Delete statements are filtered out in the synchronization link.

Gravity is designed to integrate multiple data sources to make business design more flexible, data replication and data transformation easier, and help people migrate their business to TiDB smoothly. This project has been open source on GitHub, welcome everyone to exchange and use.

Five, the summary

The emergence of TiDB not only makes up for the shortcomings of MySQL single capacity limit and traditional Sharding scheme single query dimension, but also makes it easier to expand cluster horizontally with its architecture design of computing and storage separation. Businesses can focus more on research and development without having to worry about complex maintenance costs. In the future, Mobike will continue to try to transfer more core businesses to TiDB, so that TiDB can play a greater role and wish TiDB a better and better development.