Level 1 billion | leads, is WeChat user orders of magnitude. Behind this huge figure is the reading and writing demand of 1 billion levels of database for “Look at”, “wechat advertising”, “wechat pay”, “small program” and other businesses. So just how powerful is FeatureKV?

Background: Two billion level challenges

PaxosStore is a distributed storage system with strong consistency that is widely used in wechat. It widely supports online applications of wechat, with a peak value of over 100 million TPS, running on thousands of servers, and strong performance in online service scenarios. However, there is no silver bullet in software development. In the face of offline production and online read-only data scenarios, PaxosStore faces two new billion challenges:

Billion/SEC challenge:

The Take a look team needed a storage system to hold the models needed for the CTR process, separating storage and computation so that the size of the recommended model was not limited by the stand-alone memory.

Every time to rank articles, CTRSVR will pull thousands of features from the storage system, these features need to be the same version, PaxosStore BatchGet does not guarantee the same version.

Service providers estimate that the storage system needs to support QPS of 1 billion/SEC. The number of copies in PaxosStore is fixed and read-only copies cannot be added.

The storage system must provide version management and model management functions to support rolling back historical versions.

Billion/hour challenge:

Many teams in wechat gave feedback that they needed to write 1 billion levels of information (that is, the order of magnitude of wechat users) into PaxosStore regularly every day, but the writing speed of PaxosStore could not meet the requirements, sometimes even in a day, writing too fast will affect other businesses of the live network.

PaxosStore is a storage system that ensures strong consistency and is designed for online services. However, in the face of such offline irrigation and storage, online read-only, does not require strong consistency guarantee scenario, it needs a high cost to meet the needs of the business.

With more and more data-based applications, there are more and more data storage requirements of this kind. We need to solve this problem and control the writing time of data with 1 billion levels of keys within 1 hour.

In order to solve the problem of pain points in these scenarios, we designed and implemented FeatureKV based on the powerful WFS (wechat self-developed distributed file System) and Chubby (wechat self-developed metadata storage). It is a high-performance key-value storage system with the following features:

High performance and easy to expand

Excellent read performance: on the B70 model, the full memory table can have tens of millions of QPS; On the TS80A model, data stored in SSD tables can have millions of QPS.

Excellent write performance: With adequate performance of the remote file system, it is possible to write 1 billion keys with an average ValueSize of 400 bytes in 1 hour.

Easy scaling: Horizontal (read performance) and vertical (capacity) scaling can be done in hours, while write capacity scaling is just scaling a stateless module (DataSvr) in minutes.

Batch write support is friendly

Task-based write interface: Files in the WFS or HDFS are used as input. Services do not need to write or execute data pouring tools.

Incremental update/Full update: Incremental update overwrites a batch of newly entered key-values on the basis of the previous version. The keys that are not entered remain unchanged. Full updates discard the previous version of the data and inject a new batch of key-values.

TTL: Supports the automatic deletion function when an expiration date expires.

Version management function

BatchGet interface for transactions: Ensure that the data obtained by a BatchGet is of the same version.

Historical version rollback: An incremental version is generated after an update. Historical version rollback is supported, including the version generated by incremental update.

Of course, there are no silver bullets in software development, and FeatureKV makes a design trade-off:

Online data writing is not supported, and FeatureKV can update data at a 10-minute rate when the volume is small (GB).

No strong consistency is guaranteed, final consistency is guaranteed, and sequential consistency is guaranteed most of the time.

FeatureKV is now widely used in Wechat, including Kankan, wechat advertising, wechat pay, and mini programs. In this article, WE will explain the design of FeatureKV and how to solve these two billion challenges.

The overall design

  1. System architecture

FeatureKV involves three external dependencies:

Chubby: saves metadata in the system. Much of FeatureKV uses metadata polling in Chubby for distributed collaboration and communication.

USER_FS: a distributed file system on the service side. It can be WFS/HDFS because FeatureKV’s write interface is task-based and the input is a path on a distributed file system.

FKV_WFS: a distributed file system used by FeatureKV to house KVSvr data files produced by DataSvr. Multiple historical versions can be saved to support rollback of historical versions.

All three external dependencies can be shared with other businesses. FKV_WFS and USER_FS can be the same module. FKV_WFS can be replaced by HDFS. Chubby can be replaced with etCD.

DataSvr:

Mainly responsible for data writing, input USER_FS, through data format restructuring, routing sharding, index building and other processes, generate KVSvr available data files, write FKV_WFS.

It is a stateless service. Status information about write tasks is stored in Chubby. Expanding DataSvr can improve the write performance of the system.

Generally, two VMS are required. In some scenarios, the number of write tasks can be expanded.

KVSvr:

External read services, polling Chubby to sense data updates, pulling data from WFS to local, loading data, and providing read-only services.

It is a stateful service. A KVSvr module consists of K Sect and N roles for K x N machines.

Each Sect has a full amount of data. Each BatchGet needs to be sent to only one Sect. Adding a Sect increases the read performance but does not increase the number of RPC operations for the BatchGet.

The same Role is responsible for the same data slices. When a single machine fails, it is better to change the Batch request directly and retry.

The value of K must be at least 2 to ensure the disaster recovery (Dr) capability of the system, including availability in case of change.

N can’t just be any number, but look at part 2 below.

Write process:

FeatureKV only supports batch data writes. Each write task can be incremental or full. There is no limit to the amount of data written at a time. Offline batch write interface design, we stepped on a few holes:

In the beginning, we want to wrap some classes/tools and let the business side use our classes/tools directly to package key-value data and write directly to the FKV_WFS directory. This scheme saves the most bandwidth, but it makes our subsequent data format upgrade very troublesome and requires the cooperation of all business parties, so this scheme was abandoned.

Then we create a new module, DataSvr, and open a TCP SVR on the DataSvr. The business side outputs key-value data, and the write tool sends key-value data to the TCP SVR for packaging.

The speed of writing depends on the quality of the code on the business side, as well as the resources of the machine. We have seen instances where the business side parses floating point input using STD :: Stringstreams, which takes up 90%+ of the CPU (STD ::strtof is much faster). Or the business side ran a writing tool machine that was using 90%+ of CPU by another process and reported FeatureKV writing slowly.

Daily changes to DataSvr, or machine failures, can cause tasks to fail. The packet sending method of the front-end tool cannot retry the task because the key-value input stream cannot be replayed.

Finally, we designed a task-like interface that takes the path on USER_FS as input:

The service side stores data in USER_FS in a specified format and submits a write task to DataSvr.

DataSvr streams data from USER_FS, formats it, shards it, indexes it, writes it to FKV_WFS, and updates the metadata in Chubby. Chubby is also used to synchronize task status for distributed execution and retry of write tasks.

KVSvr senses data updates by polling Chubby, pulls data locally, completes load, and provides services.

  1. Data routing

Considering scaling up, FeatureKV splits a version of data into N pieces, which are now 2400, using a HashFun(key) % N hash to determine which file the key belongs to.

Which files are loaded by KVSvr is determined by consistency hashing. KVSvr with the same role will load the same batch. When scaling, data is moved in units of files.

Since this consistency hash has only 2400 nodes, a significant load imbalance occurs when 2400 is not divisible by the number of machines in the sect. So the number of machines in FeatureKV’s sect has to be divisible into 2400. Ok 2400 is a lucky number, its factors including 1,2,3,4,5,6,8,10,12,15,16,20,24,25,30 within 30, already can satisfy most of the scene.

The figure above shows an example at N=6, where Part_00[0-5] represents six data files. When RoleNum=2 is expanded to RoleNum=3, only files Part_003 and Part_005 need to be moved. Part_005 is moved from Role_0 to Role_2. Part_003 is migrated from Role_1 to Role_2.

Since N=2400 is used in the live network and the number of nodes is small, in order to reduce the time consuming of each route, we enumerate all the cases where RoleNum< 100&&2400% RoleNum==0 and create a consistent hash table.

  1. System scalability

FeatureKV’s FKV_WFS has all the data available for the current version of FeatureKV, so the ability to move files through capacity expansion only requires new machines to pull numbered files from FKV_WFS and old machines to drop numbered files.

When BatchSize is large enough, the number of RPCS of a BatchGet is equivalent to the number of roles, and these RPCS are all parallel. When the number of roles is large, these RPCS are more likely to have at least one long-tail request, and the BatchGet time is determined by the slowest RPC. The figure above shows the BatchGet long-tail probability under different number of roles in the case that the probability of a single RPC being a long-tail request is 0.01%, which is calculated by formula 1 – (0.999^N).

Added Sect (Read Performance Expansion) :

Each Sect has a full amount of data. Adding a Sect means adding a read-only copy to expand read performance.

Since a BatchGet needs to be sent to only one Sect, the number of RPCS converges and does not initiate 200 RPCS because the underlying KVSvr has 200. This design can reduce the average time of BatchGet operations and reduce the probability of long-tail requests.

Adding a Role (Storage capacity + Read performance expansion) :

Assuming that each machine has the same storage capacity, increasing the number of roles increases the storage capacity.

As the entire module has more machines, the read performance increases. The effect of expanding the read throughput of the entire module is equivalent to increasing the Sect.

However, when the number of roles is large, the number of machines involved in a BatchGet will increase, and the probability of long-tail requests will increase. Therefore, it is generally recommended that the number of roles should not exceed 30.

Add DataSvr (write capacity expansion) :

DataSvr is a stateless service that can perform capacity expansion at minute-to-minute speeds.

A write task is divided into multiple parallel jobs. Increasing the number of DataSvr instances improves the write performance of the entire module.

Data migration is at the file level without complex migration logic. If gray scale process is not considered, it can be completed at the hour level. Generally, gray scale process is considered within a day.

  1. Disaster system

KVSvr side:

The machines of each Sect are deployed in the same campus, and only two sects need to be deployed to tolerate machine failures in one campus.

Case in point: On March 23, 2019, an optical cable was cut in Shanghai’s Nanhui Industrial Park, and a featurekV had one-third of its machines on it. The service was stable during the fault.

Some RPC timed out during the failure, resulting in an increase in long-tail requests. However, most of the requests were successful after switching and retrying, and the number of final failures was very low. After the machines in Nanhui park were blocked globally, the long tail request and final failure completely disappeared.

DataSvr/WFS side:

Even if both parts fail, FeatureKV’s KVSvr still provides read-only services, which is sufficient for most timed bulk write, online read-only scenarios.

Specific case: On June 3, 2019, a distributed file system cluster was faulty and unavailable for 9 hours. The USER_FS and FKV_WFS of a featurekV are both clusters. During the fault, the output process of the business side stops, and no write task is generated. Featurekv read service remained stable throughout the outage.

Billion per second challenge – Specific design of online read services

  1. KVSvr read performance optimization

In order to improve the performance of KVSvr, we adopted the following optimization measures:

High-performance hash tables: FeatureKV uses MemTable, a full-memory table structure, for low-volume, high-read data. The underlying implementation of Memtable is a read-only hash table implemented by ourselves, which can reach 2800W QPS when accessed concurrently by 16 threads. It has exceeded the performance of RPC framework and will not become the bottleneck of the whole system.

Libco aio: FeatureKV can use BlkTable or IdxTable table structures for large amounts of data with low read requests, which store data on SSDS. The read performance of SSDS can be fully realized only through multiple concurrent access. Online services cannot open too many threads, and operating system scheduling is expensive. We use libco to encapsulate Linux AIO to implement coroutine level multi-way concurrent disk reads. When value_size is 100Byte, four SSDS on TS80A can achieve 150W +/s QPS.

Packet serialization: In the process of perF tuning, we found that under the condition of large batch_size (the average batch_size of CtrFeatureKV is 4K +), the serialization of RPC packets would take a long time, so we made a layer of serialization/deserialization by ourselves. The argument to the RPC layer is a binary buffer.

Data compression: Different services have different requirements for data compression. In storage model scenarios, value is a floating-point number/floating-point number group, representing some non-zeros. Characteristics. At this time, if the plaintext compression algorithm like SNappy is used, the effect is not very good, the compression ratio is not high and the CPU is wasted. For such scenarios, we introduce semi-precision floating point numbers (provided by KimmyZhang’s Sage library) to compress data in the transmission stage and reduce bandwidth costs.

  1. Implementation of distributed transaction BatchGet

Background: Updates are classified into full updates and incremental updates. An update contains multiple data pieces, and each update increases the version number. BatchGet also returns multiple data pieces. The business side expects these updates to be transactional. If an update is not fully executed, the BatchGet will return the data of the previous version, not half-new and half-old data.

RoleNum=1

The data is not sharded and all falls on the same machine. After our investigation, we found that there are two methods:

MVCC: Multi-version concurrency control. LevelDB is a storage engine that stores multiple versions of data and can control the life cycle of data and access the specified version of data through snapshot. The data structure of this solution needs to support both read and write operations, and threads in the background need to clean up expired data. It is also complicated to support full updates.

COW: Copy-on-write. The specific implementation is double Buffer switching. For FeatureKV scenarios, incremental updates also require a copy of the previous version of the data, plus incremental data. The advantage of this approach is that you can design a generated read-only data structure, which can have higher performance, but the disadvantage is that it requires double the space overhead.

In order to ensure the performance of online services, we adopt COW and design the read-only hash table mentioned in the first part to achieve single transaction BatchGet.

RoleNum>1:

Data is distributed on different machines, and different machines complete data loading at different points. From the perspective of distribution, there may not be a unified version.

An intuitive idea is to save the last N versions, and then select the latest version that each Role has.

The value of N affects the cost of storage resources (memory and disks). The minimum cost is 2. To do this, we’ve added two restrictions to the DataSvr side:

Updates to individual tables are sequential.

Before the write task starts and ends, add one more step of version-alignment logic, that is, wait for all KVSVRS to finish loading the latest version.

This allows us to ensure that we have a unified version across the distribution, while keeping only the last two versions. In COW scenarios, as long as the data of the other Buffer is delayed to delete (until the next update), the latest two versions can be retained without increasing the memory overhead.

Once you have a globally uniform version, how should the transaction BatchGet be implemented?

First round of RPC asking about the version of each role? Doing so will double the QPS, and the next moment that machine may have a data update.

In fact, data updates and Version changes are very low frequency. Most of the time, it is enough to return the latest Version. In addition, b-version (that is, the Version of another Buffer) can be carried in the packet return, so that the client side in case of Version inconsistency. You can select a globally consistent version, SyncVersion, and retry data that is not SyncVersion.

When data is updated, the duration of data inconsistency may be minute, which can cause waves of retry requests, affecting system stability. So we also made an optimization to cache the SyncVersion, every time BatchGet, if there is a SyncVersion cache, then directly pull the SyncVersion data.

  1. Version back

The metadata of each table has a rollback version field. The default value is 0, indicating that the rollback is not in the rollback state. If this field is not 0, indicating that the rollback is to a certain version.

Consider how to implement version rollback first:

Consider the simple case where a table is fully updated every time. KVSvr pulls the specified version of data from FKV_WFS to the local directory, following the normal full update process.

And then you have to think about the delta case. If a table is updated incrementally every time, then rolling back a version of Vi would require pulling V1 through Vi to KVSvr for update replay, similar to the binlog of a database, which would be impossible after accumulating thousands of incremental versions.

We need to have an asynchronous worker that merges successive increments and their previous full versions into a new full version, similar to the concept of checkpoint, so that a rollback does not involve too many incremental versions. This asynchronous worker is implemented in DataSvr.

Further, there is an optimization that if the rollback version is in the local double Buffer, simply switch the pointer to the double Buffer to achieve the second rollback effect. In fact, many of the rollback operations are to fall back to the last normal version, probably the previous version, in the local dual Buffer.

Data cannot be written to a table in the rollback state to prevent incorrect data from being written again.

Consider how to unwind the rollback:

To unwind a rollback is to allow a table to continue to be serviced with the data of the rollback version and perform subsequent incremental updates based on the data of the rollback version.

If you cancel the rollback status, the live network will be updated to the version before the rollback first. If there is still traffic, abnormal data before the rollback will be read. There is a time window.

The version number of the data should be continuously incremented, which will be depended on during the data update process. Therefore, it is not possible to simply delete the last section of data.

To avoid this problem, we borrowed the idea of COW and copied it first. The implementation is to write a full version of the current rollback version as the latest data version.

This step takes some time, but in a rollback scenario, the time required to unwind the rollback is not high. As long as the rollback is fast enough that it is safe to undo the rollback, it is ok.

Billion per hour challenge – Specific design of offline write flow

  1. background

DataSvr writes data from USER_FS to FKV_WFS. In the process of writing data, you need to split routes and reconstruct data formats.

There are three different table constructs in FeatureKV, and each one has a different processing logic:

MemTable: full memory. The index is an unordered hash structure. The capacity is limited by the memory, and the offline write logic is simple.

IdxTable: full memory index. The index is an ordered array. The number of keys is limited by memory.

BlkTable: block index full memory. An index is ordered data that records begin_key and end_key of a 4KB data block on a disk. The capacity is unlimited.

  1. The single DataSvr

In the beginning, we only had MemTable, and the data was all in full memory. The maximum amount of MemTable data is 200+GB, which is not a large amount of data. Single processing can save the cost of distributed coordination, result merging and other steps, so we have the above architecture:

Only one DataSvr can perform a write task ata time.

Parser processes input files one at a time, parses key-value data, calculates routes, and delivers the data to the corresponding Que.

A Sender handles a Que, which corresponds to multiple FKV_FS files. A file on FKV_FS can only be written by one Sender.

The idea is to make processes that can run in parallel run in parallel, draining hardware resources.

Specific implementation, added a lot of batch optimization, such as FS IO with buffer, queue data in/out of the queue are batch, try to improve the throughput capacity of the whole system.

Eventually, writing speeds of up to 100MB/s can be achieved on a 24-core machine, and 100GB of data can be written in about 20 minutes.

  1. Distributed DataSvr

Further down the road, FeatureKV needs to handle billionth of keys and terabytes of data writes, so we added IdxTable and BlkTable to the table structure. This presents two challenges for the write process:

The generated data needs to be ordered. Only ordered data can achieve the effect of range index, so that the number of keys in a single machine is not limited by memory.

Terabyte write speed, 100MB/s is not enough, write a terabyte takes close to 3 hours, and this is not scalable, even with many, many machines, 3 hours, this needs to be scalable.

Consider sorting data first:

We have to run the data slices before we can take out all the data of a Part and sort the data. The first data slices are similar to Map of MapReduce, and the subsequent sorting is Reduce, which has a large cost of computing resources and needs to be distributed.

In the Map stage, the above single-machine DataSvr logic is reused. After data segmentation, a temporary full result is obtained, and then a distributed Reduce logic is realized. Each Reduce input is a piece of disordered data, and output is an ordered data and its index.

This approach has the additional overhead of one full write and one full read.

The specific process of DATASVR SORTING is shown in the figure below. Multiple DATASVR participate in the SORTING phase. Each light blue box represents a DATASVR instance.

Consider the expansibility in the case of large data volume:

The sorting phase of DataSvr is now distributed, and the only single point that can’t be scaled is the data slicing phase.

There are two ways to implement distributed data slicing:

The first is the User_Part file input by each DataSvr processing part. Each DataSvr outputs 2400 sliced files, so when K DataSvr instances participate in a distributed slice, 2400 * K sliced files will be generated. Subsequent files with the same number need to be merged or directly used as input in the sorting phase.

The other is that each DataSvr is responsible for generating a partially numbered FKV file, reading the full amount of user input each time, and generating a batch of numbered FKV files.

For Merging TMP_i_0, TMP_i_1, TMP_i_2… Merge into one FKV_i. In the case of BlkTable, only the logic of Sorting is changed to accept inputs from multiple files. Therefore, the disadvantage of this approach is that, for a small amount of data, MemTable or IdxTable may merge more slowly by adopting distributed data slices.

The second approach generated a straight out 2,400 files, but no subsequent Merging process. But it introduces the problem of read magnification, assuming that the data is shred into T-batches, there is an additional t-1 full read overhead. In the case of large data volume, the number of batches will be larger, because the sorted data needs to be all in memory, can only be cut smaller;

In small data scenarios, data sharding on a single machine is enough, so we choose the first solution.

Distributed sharding is optional. If the amount of data is small, you can skip this path and return to the single-machine DataSvr processing process.

In the end, we have an offline process that can scale linearly, with a billion and a terabyte of data:

This was an impossible task before BlkTable was implemented.

Before implementing distributed data slicing, this data takes 120 minutes to complete writing.

Now, it only takes 71min to write this data.

The preceding process is similar to MapReduce. It is the result of combining multiple Map and Reduce processes. We implemented it ourselves, mainly based on performance considerations, which can optimize the system to the extreme.

Current network operation status

FeatureKV in has now been deployed 10 + module, a total of 270 + machine, business related to have a look, search, WeChat ads, a small program, WeChat payments, data center user portrait, near the life, good things circle number of various types, such as business, solved the generated off-line data is applied to online services (problem, It supports the development of various data-driven businesses.

The largest model storage module has 210 machines:

1.1 billion features /s: The average daily peak BatchGet number is 29W /s, the average BatchSize is 3900, and the module pressure measurement has reached 3 billion features /s.

15ms: 96.3% of BatchGet requests were completed within 15ms and 99.6% of BatchGet requests were completed within 30ms.

99.999999% : 99.999999% transaction BatchGet successfully executed.

Wechat advertisement realizes personalized pull + personalized advertising position based on FeatureKV, and the recommendation strategy can be updated in time. Compared with the old scheme, the pull volume and revenue have achieved a large increase, pull +21.8%, revenue +14.3%.

FeatureKV is used in wechat Pay for face-to-face coupon issuance and payment risk control. It stores multiple billions-level features, which can be updated within hours if the data could not be updated in the previous day.

conclusion

At first, this kind of timed batch write, online read only requirement is not very common, the general business would use PaxosStore or file distribution to solve.

But as more and more applications/requirements are related to data that needs to be regularly and massively fed into online services and require strong versioning capabilities, such as user profiles, machine learning models (DNN, LR, FM), rule dictionaries, and even forward/inverted indexes, So we developed FeatureKV to address these pain points, and it has worked.

This article was first published on the official account of Yunjia community: QcloudCommunity