Author: Qi Tao, from the Data Intelligence Department – Real-time Computing Group, mainly responsible for cloud music algorithm feature storage related work.

Business background

Cloud music recommendation and search services have a large amount of algorithm feature data, which needs to be stored in key-value format to provide online read and write services. These features are mainly generated from spark or Flink tasks on big data platforms, such as song features and user features. They are characterized by large amount of data, periodic full update or real-time incremental update every day, and high performance requirements for query. Some of these algorithm feature data are stored in redis/ TAIR memory storage systems, and some are stored in MyRocks /hbase disk storage systems.

In order to reduce the cost of accessing different storage systems and customize the development for the storage characteristics of algorithmic characteristic KV data, rockSDB engine is introduced in the tAIR distributed storage framework to support the online storage of algorithmic characteristic KV data scenarios with large data volume at a low cost.

The following is a brief introduction to the scheme introduced by Tair into Rocksdb, and then our practice on the algorithm feature KV storage. In order to distinguish the memcache – based memory storage and rocksdb – based disk storage under tAIR framework, we call them MDB and RDB respectively.

RDB is introduced

As a distributed storage framework, TAIR consists of ConfigServer and DataServer. DataServer consists of multiple nodes that are responsible for the actual storage of data. All KV data is divided into buckets based on the hash value calculated for keys. Data in each bucket can be stored in multiple copies to different DataServer nodes. The specific mapping rules are determined by the routing table constructed by the ConfigServer.

The ConfigServer maintains the status of all DataServer nodes. If a node is added or decreased, it migrates data and builds a new routing table. DataServer supports different underlying storage engines. The underlying engines need to implement basic PUT, GET, and DELETE operations on KV data, as well as full data scan interfaces. The Client reads and writes data to the DataServer using the routing table provided by the ConfigServer. Data is written to and written to the master node of the bucket. Data is replicated internally by the DataServer.

Rocksdb is an open source KV storage engine based on LOG Structured Merge (LSM), which is a layered structure composed of many SST files. Each SST file contains a certain amount of KV data, along with the corresponding metadata information, and the KV in the SST file is sorted by key. Periodically compacts data at each level by compaction to delete invalid data. New data is written to level0, when level0 reaches its threshold size, it anneals to level1, and so on. All SST files at each level also remain in overall order and do not overlap (except level0) and are searched from top to bottom at each level when queried.

When rocksdb was introduced into TAIR, we designed the storage format of each KV data as follows.

The key stored in rocksdb is a combination of bucket_id+ Area_id + original key. Area_id indicates the ID of a service table. Different data tables have different areas. Bucket_id is used to facilitate data migration by bucket. Because rocksdb data is stored in an ordered manner, data in the same bucket can be aggregated to improve the efficiency of prefix scanning. Area_id is used to distinguish different service tables and avoid overlapping keys. In fact, for tables with a large amount of data, we will store them in a separate column family in rocksdb to avoid key overlap.

Value stored in rocksdb is concatenated from meta+ original value. Meta stores kv modification time and expiration time. Data in Rocksdb can be discarded when compaction occurs. You can delete expired data by defining CompactionFilter.

Bulkload Batch derivative data

Bulkload scheme

There are many scenarios of algorithm characteristic data where the latest full data is calculated offline on the big data platform every day and then imported into kv storage engine. The scale of these data tables is often over 100GB and the number of data tables is over 100 million. The RDB of the basic version can only be written one by one by invoking the PUT interface. As a result, many concurrent tasks are required to import full data in put mode, occupying computing resources of the big data platform.

Furthermore, the order in which data puts are written to the RDB is out of order, which results in a lot of IO pressure when a Rocksdb compaction compacts, since it needs to sort a large number of kV files to regenerate an orderly SST file. This is also the problem of rocksdb’s write amplification, rocksdb’s actual write data will be magnified by dozens of times, disk I/O pressure will cause the response time of read requests to fluctuate.

To solve this problem, we refer to the Hbase Bulkload mechanism to improve the import efficiency. To import large-scale offline features, Spark is used to sort the original data and convert it to rocksdb’s internal data format file SST, and then the SCHEDULer is used to load the SST file (Rocksdb provides ingest mechanism) to corresponding data nodes in the RDB cluster.

In this process, there are two points to improve the efficiency of data derivatives. One is to load data in large quantities through files instead of calling the PUT interface to write single or multiple pieces of data. Second, when Spark converts data, it does a sorting process, reducing data consolidation within Rocksdb.

We compare the performance of Loading algorithms using Bulkload with that using PUT. Bulkload performs three times better than PUT in terms of I/O pressure, READ RT, and compaction. Scenario: 3.8TB of full data is available (7.6TB in two copies), 300GB of 210 million incremental data is imported (600GB in two copies), the import time is about 100 minutes, and the read QPS is 1.2w/s.

Based on the internal logs of rocksdb, the total volume of bulkload is 85GB (10:00 to 13:00), and that of put is 273GB (13:00 to 16:00), which is approximately 1:3.2.

10:00 Cumulative compaction: 1375.15 GB write, 6.43 MB/s Write, 1374.81 GB read, 6.43 MB/s read, 23267.8 seconds 13:00 Cumulative compaction: 1460.62 GB write, 6.29 MB/s Write, 1460.29 GB read, 6.29 MB/s read, 24320.8 seconds 16:00 Cumulative compaction 1733.60 GB write, 7.16 MB/s write, 1733.31 GB read, 7.16 MB/s read, 27675.0 secondsCopy the code

Double version derivative data

On the basis of Bulkload, we further reduce the disk I/O when bulkload data is updated by covering the full derivative data each time through the dual-version mechanism. A piece of data corresponds to two versions (AreAID), that is, two column families in Rocksdb. The versions of the derivative data and read data are staggered and switched alternately. This completely eliminates a data merge in Rocksdb when a new compaction occurs.

The dual-version mechanism uses the multi-version function of the storage agent layer. The specific scheme and details are not described here. In this way, the RT of the query data fluctuates less during derivative data. The following figure shows the comparison between RDB cluster and hot and cold cluster (hBase+ Redis).

Key-value separated storage

Kv separation scheme

Rocksdb consolidates invalid data by compaction and ensures that data at each level is in order. Write compaction causes write magnification problems. For long values, write magnification is even more problematic because values are frequently read and written. For long value write magnification, there is an existing KV separation solution for SSD Storage, WiscKey: Separating Keys from Values in SSD-Conscious Storage [1]. LSM stores only the index of the value’s position in the BLOB file (Fileno +offset+size).

In RDB, we introduced tiDB’s open source KV separation plug-in, which is less intrusive to Rocksdb code and has a GC mechanism for invalid data recovery. GC updates each compaction to update how much data each BLOb file’s value is validating. If a BLOb file’s valid data percentage falls below a certain threshold (0.5 by default), it overwrites the valid data to the new file and deletes the original file.

By comparison, for long value, KV separation improves performance to varying degrees in random data write and bulkload data derivative scenarios, but at the cost of occupying more disk space. Due to the serious write amplification problem of random write data, the read RT energy decreases by 90% after KV separation. Bulkload derivative according to KV separation read RT can also drop more than 50%. In addition, we measured that the effective value length threshold of kv separation ranges from 0.5KB to 0.7KB. The default value threshold for online deployment is 1KB, and values exceeding this length will be separated and stored in blob files.

The following figure shows a scenario we tested. The average value length is 5.3KB and the full data is 800GB (160 million pieces). Bulkload imports updated data and reads data randomly. When kv separation was not performed, the average read RT was 1.02ms, and when KV separation was performed, the average read RT was 0.44ms, decreasing by 57%.

Sequence append

Based on the mechanism of KV separation, we are also exploring a further innovation: implementing in-place updating of values in BLOB files.

The idea comes from the fact that some algorithm features are stored as a sequence of values, such as a user’s historical behavior, and are updated by appending a short sequence to a long one. According to the original KV mode, we need to obtain the original sequence first, then append to update the data to form a new sequence, and finally write to RDB. This process overwrites a lot of data. To address this problem, we developed an interface for sequential append updates.

If you simply do sequential read -> append -> write operations inside the RDB, there will still be a lot of disk reads and writes. Therefore, we made a modification: in advance, we reserved some space for each value in the BLOB file after KV separation (similar to the memory allocation of vector in STL), and the sequence append was directly written to the end of the value in the BLOB file. If this process cannot proceed (for example, if there is not enough reserved space), read -> append -> write is still performed.

Sequence append updates are stored in the following format:

The sequence append update process is as follows:

At present, RDB’s sequence append function has been online, and the effect is very obvious. In an actual algorithm feature storage scenario, it used to take 10 hours to update data each time with several TB, but now it takes 1 hour to update data each time with several GB.

The ProtoBuf field is updated

The sequential append scheme proved feasible, so we explored further extensions to support more generic “partial update” interfaces, such as Add/INCr, etc.

The algorithm features of cloud music, KV data and value are basically stored in the format of ProtoBuf (abbreviated as PB). Our algorithm engineering team has also developed a memory storage engine that supports PB format field level update in 2020 (extended to PDB on MDB), and there will be a detailed introduction to PDB in the follow-up. I will not elaborate on it here. The principle of PDB is to encode and decode PB through the engine layer. It supports the update operation of specified numbered fields, such as INCR /update/add, as well as the deduplication and sorting of more complex Reapted fields. In this way, the process of reading -> decoding -> updating -> encoding -> writing, which was originally to be implemented in the application layer, can now be completed by calling pb_upate interface. PDB is already widely used online, so we want to extend this set of PB update capabilities to disk feature storage engine RDB.

At present, we have completed the development of this area and are doing more tests. The solution is to reuse the PDB PB update logic and modify the Rocksdb code to implement in-place modification of the value after kv separation to avoid frequent read and write operations. The effect will be synchronized later.

The modified Rocksdb storage format is as follows:

The detailed process of UPDATING PB in the RDB is as follows:

Conclusion thinking

After more than a year, on the basic version of RDB, we customized and developed some of the above new features according to the characteristics of algorithmic characteristic data storage. At present, RDB online cluster has a certain scale, storing tens of billions of data pieces, data volume of 10 TB, QPS peak up to a million per second.

As for the self-developed features of RDB, our thinking is as follows: the bottom kernel is transformed Rocksdb (with KV separation), and new application scenarios are customized based on this, including offline feature Bulkload, real-time feature Snapshot, PB field update protocol, etc.

Of course, RDB also has some disadvantages. For example, the Tair framework used by THE RDB for partitioning hash by key does not support scanning a range of data as well as partitioning by range. In addition, the data structure and operation interface supported by RDB are relatively simple at present, and we will develop and support more functions according to the business needs of feature storage in the future, such as calculating and querying statistics of a time series window (sum/ AVG/Max, etc.). We will also combine the evolution of the internal Feature platform Feature Store to build a complete set of Feature storage service oriented to machine learning.

The resources

[1]. Arpaci-Dusseau R H, Arpaci-Dusseau R H, Arpaci-Dusseau R H, et al. WiscKey: Separating Keys from Values in SSD-Conscious Storage[J]. Acm Transactions on Storage, 2017, 13(1):5.

This article is published by NetEase Cloud Music Technology team. Any unauthorized reprinting of this article is prohibited. We recruit all kinds of technical positions all year round, if you are ready to change jobs, and you like cloud music, then join us at [email protected]