background

In the early days of Zan, only MySQL was used for storage and CODIS was used for cache. With the development of business, MySQL was not suitable for some business data, while CODIS was not suitable for storage system because it was used for cache. Therefore, it was in urgent need of a high-performance NoSQL product to supplement. We needed an open source product that could be put to use quickly without requiring a lot of maintenance, given how few operational and peacekeeping developers there were. At that time, we compared several open source products and finally chose Aerospike as our KV storage solution. Facts have proven that aerospike as a mature commercial open source products bearing has a very good transition period Work in very small amounts of development and operational support, has been stable operation without any failure, meet a lot during the period of business requirements, and therefore can take time to invest more energy to solve the problem of other middleware.

However, with the rapid development of Youzan, the simple Aerospike cluster slowly began to fail to meet more and more diverse business needs. While performance and stability are still good, the cost of storage can be high for an increasing amount of data because its indexes must be loaded into memory. More business requirements dictate that we will need more data types to support business growth in the future. In order to take full advantage of the existing Aerospike cluster, and considering that the open source products at the time could not meet all of our business needs, we needed to build a KV storage service that could meet the awesome future for many years.

Design and Architecture

In designing such an underlying KV service that can meet the development of many years to come, we need to consider the following aspects:

  • It is necessary to use active open source products endorsed by big manufacturers as far as possible to avoid too much work and too long cycle
  • Avoid completely relying on and coupling an open source product, so as to not be able to adapt to uncontrollable changes of an open source product in the future, and not be able to enjoy future iterations and upgrades of technology
  • Avoid using too complex technology stack, which increases the later operation and maintenance cost
  • Because of business needs, we need to be able to do easy extensions and customizations
  • In the future, business needs will evolve in a variety of ways. A single product cannot meet all requirements, and multiple open source products may need to be integrated to meet complex and diverse requirements
  • While allowing the technical changes of the KV service back-end, the business interface should be as stable as possible, and subsequent upgrades should not bring excessive migration costs.

Based on the above points, we made the following architectural design:

In order to integrate and facilitate future expansion, we use proxy to shield specific back-end details, and use the widely used Redis protocol as our interface to the upper business. On the one hand, we make full use of the open source Redis client product to reduce the development workload, on the other hand, reduce the cost of business access learning. On the one hand, the aerospike cluster and CODIS cluster can be smoothly integrated to reduce the workload of business migration. Under this framework, we can also make some protocol conversion work at proxy level in the future, so as to make convenient use of future technical achievements and further expand our KV service capability by docking with more excellent open source products.

With this architecture, we can improve our current KV service shortcomings without changing the existing Aerospike cluster. Therefore, we independently developed ZanKV distributed KV storage based on several mature open source products. Self-developed ZanKV has the following characteristics:

  • The use of Golang language development, the use of its efficient development efficiency, can also reduce the later maintenance difficulty, convenient later customization.
  • Use the mature and active open source components etCD Raft, RocksDB and other components to reduce the development workload
  • CP systems are combined with existing Aerospike AP systems to meet different needs
  • Provide richer data structures
  • Support for greater capacity, and the combination of Aerospike significantly reduces storage costs without compromising performance requirements

The overall architecture diagram of self-developed ZanKV is as follows:

The cluster consists of Placedriver, Datanode, ETCD, and rsync. The roles of each node are as follows:

  • PD node: responsible for data distribution and data balancing, coordinating all ZANkV nodes in the cluster, and writing metadata to ETCD
  • Datanode: Stores specific data
  • Etcd: Is responsible for storing metadata, including data distribution mapping tables and other metadata for coordination
  • Rsync: transfers snapshot backup files

Let’s go through the internal implementation details one by one.

Implementation of insider

DataNode DataNode

First of all, we need a stand-alone high performance and high reliability KV storage engine as the cornerstone for all the work to be carried out, and we may also need to consider scalability to introduce a better underlying storage engine in the future. In this regard, we choose RocksDB as the starting point. Considering its interface and ease of use, RocksDB is a relatively stable open source product created by FB for many years, and it is also the common choice of many open source products. Basically, there are no problems and RocksDB can timely respond to the needs of the open source community.

RocksDB only provides a few simple Get, Set and Delete interfaces. In order to meet the rich data structures in redis protocol, we need to encapsulate more complex data structures on the basis of KV. Therefore, we built a data mapping layer on top of RocksDB to meet our needs, and the data mapping also referred to several excellent open source products (Pika, Ledis, TiKV, etc.).

To ensure data reliability, we replicated data to multiple machines using RAFT consistency protocol to ensure data consistency across multiple machines. Raft was chosen because ETCD already implemented a fairly complete and mature RAFT library using Golang. However, ETCD itself does not support massive data storage, so in order to expand the storage capacity indefinitely, we introduced the concept of RAFT Group based on ETCD raft. The ability to process multiple RAFT replicates in parallel at the same time is enabled by increasing the number of raft partitions.

Finally, we complete external services through redis protocol. It can be seen that ZanKV Datanodes can provide rich data storage service capabilities through the above layers, as shown in the figure below:

The Namespace and partition

In order to support large amounts of data, a raft cluster with a single partition would not scale indefinitely, so we needed to support data partitioning to scale out. Partitioning algorithms commonly used in the industry can be divided into two categories: Hash partition and Range partition have their own application scenarios. The advantages of range partition are that it can be globally ordered, but dynamic merge and split algorithms need to be implemented. The implementation is complex, and some scenarios are prone to write hotspots. The advantage of hash partition is that it is easy to implement and read and write data is generally evenly distributed. The disadvantage is that the number of partitions is usually set to a fixed value during initialization. Increasing or decreasing the number of partitions requires a large amount of data migration, and it is difficult to meet the requirements of global ordered query. Considering the development cost and the sequential requirements of some data structures, we currently adopt the prefix hash partition algorithm, which can ensure that the data with the same prefix globally meets part of the business requirements in an orderly manner, while reducing the development cost and ensuring that the system can be put online as soon as possible.

In addition, considering that Youzan will have more and more businesses in the future and need to be able to easily isolate different businesses, add new features and smooth upgrade, we introduced the concept of namespace. Namespaces can be dynamically added to a cluster, and the configurations and data of namespaces are completely isolated, including the number of copies, number of partitions, and partition policies. In addition, you can specify some node placement policies for the namespace to ensure that the namespace is bound to nodes of certain features. Currently, in the multi-room solution, copies are distributed in at least one room through rack awareness. With a namespace, you can separate some core and non-core services into different namespaces. You can also add incompatible new features to a new namespace for new services without affecting the old services, thus smooth upgrade.

PlaceDriver Node Global management Node

As can be seen, a large cluster will have many namespaces, and each namespace has many partitions, and each partition needs multiple copies. With so much data, it is necessary to have a node to optimize and schedule the data of the whole cluster from the global perspective to ensure the stability of the cluster and load balance of data nodes. The Placedriver node is responsible for node distribution for a specified data partition and automatically redistributes data distribution when a data node fails. We use separate stateless PD nodes to achieve this, which can be upgraded independently for convenient operation and maintenance, and can also be horizontally extended to support a large number of metadata query services, all metadata stored in the ETCD cluster. Placedrivers use etCD to elect a master for the assignment and migration of data nodes. Each Placedriver node watches node changes in the cluster to sense data node changes in the entire cluster.

The current data partitioning algorithm is implemented by hash partitioning. For hash partitioning, all keys are evenly mapped to the initial number of partitions. Generally, the number of partitions is several times the number of Datanodes, facilitating future expansion. Therefore, PD needs to select an algorithm to allocate partitions to corresponding Datanodes. Some systems may allocate partitions to nodes in a circular arrangement using consistent hash, but consistent hash will cause load imbalance and inflexibility when data nodes change. In ZanKV, we choose to maintain the mapping table to establish the relationship between partitions and nodes. The mapping table will be generated according to certain algorithms and flexible policies.

From the figure above, the entire read-write process: When the client performs read/write access, hash the primary key to get an integer value, then mold the total number of partitions to get a partition ID, search the partition ID and data node mapping table according to the partition ID to get the corresponding data node, and then send the command to the data node. After receiving the command, the data node verifies it according to the partitioning algorithm and sends it to the leader of the local data partition with the specified partition ID for processing. If there is no leader with the corresponding partition ID, the write operation will be forwarded to the leader node inside raft. Read operations return an error (possibly doing a leader switch). Based on the error information, the client decides whether to flush the local leader information cache and retry.

It can be seen that the read and write pressure is all on the leader of the partition, so we need to ensure that there is a balanced number of leaders of the partition on each node as much as possible, and at the same time reduce the data migration when adding or removing nodes as much as possible. When data nodes change, the mapping table between partitions and data nodes needs to be dynamically modified. The process of dynamically adjusting the mapping table is the process of data balance. Etcd’s Watch event is triggered when the data node changes, and Placedriver monitors the data node changes in real time to determine whether data balancing is needed. To avoid affecting online services, you can set the allowed time interval for data balancing. To avoid frequent data migration, the system determines the need for data balance based on emergencies after nodes change. In particular, during data node upgrade, unnecessary data migration can be avoided. Consider the following scenarios:

  • New node: it has the lowest balancing priority, and migrates data to new nodes only in the allowed time interval when no abnormal nodes exist
  • If less than half of the nodes are abnormal, wait for a period of time before migrating duplicate data on the faulty node to other nodes to avoid data migration when the node is temporarily abnormal.
  • If more than half of the nodes in the cluster are faulty, network partitions are likely to occur. In this case, automatic migration is not performed. If network partitions are not found, manually adjust the number of stable nodes in the cluster to trigger migration.
  • Insufficient number of nodes available for allocation: If the number of copies configuration is 3, but fewer than 3 nodes are available, no data migration occurs

The number of nodes in a stable cluster is increased by default. The number of nodes in a stable cluster is increased every time a new data node is discovered. If the number of nodes in a stable cluster needs to be reduced, you need to invoke the capacity reduction API to set the number. In this way, unnecessary data migration can be avoided during network partitioning. When the number of normal nodes in the cluster is less than or equal to half of the number of stable nodes, automatic data migration will not occur unless manual intervention.

Implementation of data expiration

Data expiration is one of the features of Redis that ZanKV needs to consider and design to support. Unlike Redis as an in-memory storage, ZanKV as a highly consistent persistent storage is faced with a scenario where a large amount of expired falling disk data needs to be processed. There are many trade-offs and considerations in the overall design.

First, ZanKV does not support data expiration at the millisecond level (corresponding to redis’s pEXPIRE command). This is because there is very little need for data expiration at the millisecond level in real business scenarios and the RTT of network requests in real production network environments is also at the millisecond level. An expiration to the millisecond is too much stress on the system and not very meaningful.

For second data expiration, ZanKV supports two data expiration strategies, each for different business scenarios. You can configure different expiration policies for different namespaces as required. The design and trade-offs of the two different expiration strategies are detailed below.

Consistency Data expiration

When the data expiration feature was originally designed, it was intended to be fully compatible with redis data expiration semantics while maintaining data consistency. Consistency data expiration is a design scheme to meet the design objective.

As mentioned above, ZanKV is currently using Rocksdb as the storage engine’s falling disk storage system. No matter what expiration strategy or implementation, data expiration information needs to be encoded to fall disk into storage in a certain way. In the consistency expiration policy, data expiration information is encoded as follows:

As shown in the figure above, each key needs to store two additional pieces of information in the presence of an expiration time:

  • Data expiration time corresponding to key. We call this Table 1
  • A key table prefixed with a Unix timestamp of expiration time. We call this Table 2

Rocksdb uses LSM as the underlying data storage structure, and it is relatively fast to scan table 2 stored in expiration order. Based on the above data storage structure, ZanKV implements consistent data expiration in the following ways: In each RAFT Group, the leader scans the expired data (table 2). Every time the data that needs to expire up to the current point in time is scanned, raft protocol initiates a deletion request and the stored data and expired metadata (Table 1 and Table 2) are deleted during the deletion request processing. With a consistency expired policy, all data operations are carried out via raft protocol to ensure data consistency. At the same time, all redis expired commands are well supported. Users can easily obtain and modify the lifetime of keys (corresponding to redis TTL and EXPIRE commands respectively) or persist keys (corresponding to Redis persist commands). However, this scheme has the following two obvious defects:

When a large amount of data is out of date, the Leader node generates a large number of raft protocol data deletion requests, causing cluster network stress. In addition, data expiration deletion operations are processed in RAFT protocol, which blocks write requests, reduces cluster throughput, and causes write performance jitter.

Currently, we are planning to optimize for this defect. The idea is that the raft group leader scans the expired data in the background. After scanning, only the timestamp that needs to expire is synchronized through raft protocol. Each cluster node removes all the expired data before the timestamp in raft request processing. The illustration is as follows:

This policy can effectively reduce raft requests when a large amount of data is expired, reducing network traffic and RAFT request processing pressure. Interested readers can help us explore and implement ZanKV’s open source project.

Another disadvantage is that any data deletions and writes require simultaneous manipulation of the data in Table 1 and Table 2, and write magnification is obvious. Therefore, the scheme is only applicable to a small amount of expired data, and the performance is not good for a large amount of expired data. Therefore, a data expiration policy for non-consistent local deletion is designed based on actual service scenarios.

Inconsistent local deletion

The starting point of this policy is that most services only pay attention to the data retention period, for example, the data retention period is 3 months or 6 months, but do not pay attention to the specific data clearing time, and do not adjust or modify the data expiration time repeatedly after data writing. Considering this business scenario, a data expiration policy for non-consistent local deletion is designed.

Unlike consistency data expiration, under this policy, data in Table 1 is no longer stored, but only data in Table 2 is retained, as shown in the following figure:

At the same time, data expiration deletion is no longer initiated via RAFT protocol. Instead, each node in the cluster scans the data in Table 2 every 5 minutes and directly deletes the expired data locally.

Because there is no data in Table 2, under this policy, users cannot obtain the expiration time corresponding to the key through the TTL command, nor can they reset or delete the expiration time of the key after setting the expiration time. However, it also effectively reduces write magnification and improves write performance.

In addition, because the deletion operation is performed by the local background, the cluster write performance jitter and cluster network traffic pressure caused by synchronization data expiration are eliminated. However, this also sacrifices some data consistency. Meanwhile, scanning every 5 minutes cannot guarantee the real-time performance of data deletion.

In a word, inconsistent local deletion is a balanced data expiration strategy, which is suitable for most business requirements and improves the stability and throughput of the cluster, but sacrifices part of the data consistency, and also causes the semantics of some instructions inconsistent with REDis.

You can configure different data expiration policies for different namespaces based on your requirements and service scenarios.

Prefixes are cleaned regularly

Although inconsistent deletion has been optimized to significantly reduce server-side stress, we can further reduce server-side stress for special scenarios with very large data volumes. Such a business scenario is that the data are usually have the time feature, so the key itself will have a timestamp information (such as log monitoring this data), in this case, we provide the prefix clear interface, can be a one-time batch deletes data of the specified time period, to avoid further service side scan overdue pressure data delete them one by one.

Cross-room scheme

ZanKV currently supports two cross-room deployment modes for different scenarios.

Cluster mode for a single machine room

In this mode, a large cluster is deployed, and all rooms are in the same city, and the delay is small. Generally, the mode is 3 rooms. In this mode, ensure that each copy is evenly distributed in different equipment rooms to ensure data consistency and read and write services in the event of a single server downtime.

During deployment, you need to specify equipment room information in the configuration file to detect equipment room information during data distribution. Data nodes in different equipment rooms use information about different equipment rooms. In this way, placedriver ensures that copies in each partition are evenly distributed in different equipment rooms.

In a cluster across machine rooms, copy synchronization is implemented using RAFT. When a single machine room fails, the other two machine rooms have more than half of the copy, so the read and write operations are not affected and the data is consistent. After the failed machine room is recovered, raft automatically synchronizes data during the failure so that the data in the failed machine room is synchronized after recovery. This mode does not require any manual intervention in the occurrence and recovery of faults. In the case of multiple machine rooms, it ensures the availability of single machine room faults and data consistency. This approach has a minor impact on latency due to cross-room synchronization.

Synchronization mode between clusters in multiple equipment rooms

If the equipment room is remote or the network latency of the equipment room is high, the single-cluster deployment across the equipment room may cause high synchronization delay, which greatly increases the read and write latency. To optimize latency, you can use synchronization mode between clusters in remote machine rooms. Because the remote equipment room is asynchronously synchronized in the background, the delay of the local equipment room is not affected but data synchronization lag is introduced. Data inconsistency may occur when a fault occurs.

The deployment of this mode is a little more complex, and the basic principle is to pull the Raft Log asynchronously by adding a Raft Learner node to the remote machine room and then dropping it into the remote machine room cluster. Since each partition is an independent RAFT group, there is serial playback within the partition and parallel raft log playback between the partitions. Remote synchronization Equipment room is read-only by default. If a fault occurs in the main equipment room and you need to switch over, some data may not be synchronized. You need to manually restore the data according to the RAFT Log after the fault is rectified. This mode has the disadvantages of troublesome operation and maintenance, and data needs to be repaired when a fault occurs. The advantage of this mode is that the read and write delay is reduced under normal circumstances.

Performance tuning experience

During the initial online operation of ZanKV, it has accumulated some tuning experience, mainly in the tuning of RocksDB parameters and operating system parameters. Most of the tuning is based on official documents, and the following parameters are highlighted here:

  • Block cache: The block cache contains decompressed blocks, which are different from the file cache provided by the OS. Therefore, you need to balance the ratio between block cache and file cache (10% to 30% is recommended in some pressure tests). In addition, there are many partitions, so different RocksDB instance shares need to be configured to avoid excessive memory usage. Write Buffer: This cannot be shared across multiple Rocksdb instances, so you need to avoid too many without sending write stalls too small. In addition, it needs to be guaranteed with several other parameters:level0_file_num_compaction_trigger * write_buffer_size * min_write_buffer_number_tomerge = max_bytes_for_level_baseTo reduce write magnification.
  • Background I/O speed limit: This operation uses the background I/O speed limit provided by rocksdb to avoid read/write burrs caused by a background compaction.
  • Iterator optimization: This is used on iterators to prevent rocksdb’s tag deletion feature from affecting data iteration performancerocksdb::ReadOptions::iterate_upper_boundParameter to end the iteration prematurely, as detailed in this article:https://www. Cockroachlabs. com/blog/adventures-performance-debugging/
  • Disable transparent large page THP: It is recommended to disable the transparent large page function of the operating system in the storage system access mode. Otherwise, I/O burr may be serious.
# echo never > /sys/kernel/mm/redhat_transparent_hugepage/enabled
# echo never > /sys/kernel/mm/redhat_transparent_hugepage/defrag
Copy the code

Roadmap

Although ZanKV has been in use within Uzan for some time now, there are still a lot of areas for improvement and improvement, and the following planned features are currently being designed and developed:

Secondary indexes

Functions similar to the following are implemented when the data type is HASH, facilitating services to query data using other field fields

Independence IDX. FROM test_hash_table WHERE "age>24 AND age<31"
Copy the code

Optimize raft log

The raft implementation of ETCD currently stores the RAFT log without the SNAPSHOT in the memory table. In ZanKV multiple RAFT group mode, it takes up too much memory. Optimization is needed so that most of the raft logs are kept on disk and only the last few logs need to be kept in memory for interaction between the follower and leader. The choice of RAFT Log disk storage is required to avoid two-tier WAL that degrades write performance.

Multi-index filtering

Secondary indexes can only meet simple single-field queries. If multiple fields need to be efficiently filtered at the same time to meet richer multidimensional query capabilities, multi-index filtering should be introduced. This feature can satisfy a large class of data search scenarios that do not require full-text search and precise sorting. There are already open source products that support compressed bitmaps for range queries. In this particular case, index filtering can provide much better performance than inversion.

Real-time data export and OLAP optimization

Raft Learner is mainly used to export raft log to other systems in real time. Further targeted scenarios, such as conversion to column storage for OLAP scenarios.

All the above features have a huge amount of development work, and currently the manpower is limited. We welcome people with lofty ambitions to join us or participate in our open source project. We hope to make full use of the power of the open source community to make our products rapidly iterate and provide more stable and rich functions.

conclusion

Due to space limitation, the above can only briefly describe several important ZanKV technical ideas, and there are many implementation details that cannot be clearly described. The project is open source: github.com/youzan/ZanR… , you are welcome to read the source code for further details, and contribute source code to jointly build a better open source product, but also look forward to the following more rich features of the implementation details.