Xiaomi has recently opened source the distributed KV storage system Pegasus. What kind of design concept and technical details are behind this wheel made by Xiaomi?
Writing in the front

This time I’m going to bring you the topic of “Distributed Implementation Things — The Story behind Pegasus”. In this talk, I’ll focus on some of the pitfalls we’ve encountered with Pegasus’s architecture and implementation.

I come from Xiaomi and mainly do research and development of Pegasus. Pegasus is a distributed KV storage wheel built by Xiaomi itself. I don’t know how you feel about building a wheel, but personally, I’m trying to avoid it. If there is a better solution to the problem, the other solution should be used first. Unless, as the business grows, there are no good alternatives, then we will have to make one ourselves.

That’s how you make Pegasus.

The generation of Pegasus

Before The Pegasus project, Xiaomi mainly used HBase for distributed storage. After more than 10 years of development, HBase is stable and provides convenient interfaces. However, due to some design and implementation problems, we still have some pits in use.

Record an HBase accident

I believe you are familiar with the HBase architecture. As shown in the preceding figure, a Master node manages the status of the entire cluster in HBase. Under the overall management of the Master node, the Region Server node is responsible for reading and writing requests from clients. Data of the Region Server resides on HDFS, an external distributed file system. Zookeeper is responsible for heartbeat detection of the Region Server and high availability of the Master.

When we actually use it, a lot of problems are caused by Zookeeper. At the beginning of design, Zookeeper is a system with low load requirements. Therefore, Zookeeper is mostly used by multiple businesses on xiaomi’s side. In addition to HBase, many other services rely on Zookeeper for node exploration and service registration. However, when some services use Zookeeper, excessive pressure is put on Zookeeper, which causes Zookeeper instability. In addition, some misuses of Zookeeper, such as not sharing socket connections when accessing Zookeeper, also cause instability of Zookeeper service.

Zookeeper often crashes due to various misuse factors. For HBase, the Collapse of Zookeeper means the collapse of the Region Server. Xiaomi relies on HBase for many of its services, and once HBase collapses, many of its services will no longer be available.

The shortage of HBase

After reviewing a series of HBase problems, we found that the following points are worth mentioning:

1. HBase assigns the important task of “node exploration” to Zookeeper, which can be discussed. If Zookeeper is not carefully o&M, the HBase stability may be affected.

In HBase, the Region Server performs suicide for Zookeeper session timeout. However, the implementation of multiple regions co-writing a WAL to HDFS on the Region Server costs a lot to commit suicide because WAL will be split and replayed after the Server restarts. This means that if the entire HBase cluster is down, it takes a long time to pull HBase up again.

2. Even if we can ensure the stability of Zookeeper, the function of “node exploration” cannot run stably. Because HBase is implemented in Java. The GC causes Zookeeper to misjudge the normal Region Server as dead, which in turn causes the Region Server to commit suicide. Regions on the HDFS can provide services only when other servers load and replay WAL from the HDFS. And this process, too, is time-consuming. During this period, keys served by Region cannot be read or written.

This problem can be solved by lengthening the node probe time threshold. This, however, makes true “Region Server death” undetectable in time, which raises usability issues in another way.

3. Another problem with GC is that HBase has burrs in read and write latency. We hope that such burrs can be avoided as far as possible in advertising and recommendation business, that is, a relatively stable delay can be achieved.

To summarize the above three points, we believe that HBase has some drawbacks in terms of usability and performance latency. These problems can be mitigated by tinkering and adjusting parameters. But want to solve fundamentally, still not too easy.

Pegasus, the positioning of the

Since we couldn’t get HBase, we had to build our own. Since we already have HBase as a benchmark, our positioning is very clear: learn from HBase to complement its weaknesses. Specifically,

  • Consistent views and dynamic scaling of HBase are two good features of a storage system that we want to keep.

  • The HBase performance delay and availability issues should be addressed through architecture and implementation to cover more services.

With these positioning, our architecture also emerged clearly:

Overview of The Pegasus architecture

In general, this architecture borrows a lot from HBase:

  • In general, Pegasus is also a centralized distributed storage system. MetaServer manages cluster global status, similar to HMaster of HBase. ReplicaServer reads and writes data and is similar to a Hbase-based RegionServer.

  • For extensibility, we also partition keys into different partitions.

The differences between HBase and HBase are as follows:

  • Instead of relying on Zookeeper, the heartbeat is pulled out separately and managed directly by MetaServer

  • Data is not written to the third-party DFS, but directly to ReplicaServer.

  • To counter the failure of a single ReplicaServer, each Partition has three replicas scattered across different ReplicaServer replicas.

Currently MetaServer’s high availability relies on Zookeeper as a simple benefit for project development. Raft could be introduced later to eliminate the dependency on Zookeeper altogether.

Consistency protocols for multiple copies

As mentioned earlier, we have multiple copies of each Partition. In order to satisfy the strong consistency in the case of multiple copies, we must adopt the consistency protocol algorithm. We use MSRA published PacificA. The comparison between PacificA and Raft can be found in our documentation for the Github project, which I won’t go into detail here. Overall, we believe that implementing a usable storage system following PacificA’s paper is much less difficult than Raft.

Pegasus write request process

In the PacificA protocol, the person responsible for receiving read and write requests is called the Primary, which is the leader of Raft. The other two that accept replication requests are secondary, equivalent to followers in Raft. Write requests to a partition are also relatively simple: If the client has a write request, it first queries the MetaServer for the location of the Key, and then sends a write request to the Primary ReplicaServer. The primary then synchronizes the request to the secondary. If both are successful, a reply is returned indicating that the write request was successful.

Pegasus read request flowchart

The client initiates a read request directly with the primary, and the primary responds directly because it has all data.

Those holes in the implementation

The design considerations and overall architecture of Pegasus were briefly reviewed. Now let’s get down to business. What are the implementation problems with such an architecture?

scalability

First, scalability. Extensibility falls into two categories:

  • Partition schema problem: a complete key space, how to divide it into different partitions;

  • The problem of load balance: if the partition schema is determined, how to use a better algorithm to divide these partitions on different machines.

Select a Partition schema

For partition schema, the industry generally has two solutions, the first is hash, the second is sort.

For hashing, all keys are divided into buckets. A bucket is a partition, and then allocated to a bucket according to the hash value of the key. For sorting, all keys start out in a bucket, and then dynamically split and merge based on the capacity of the bucket.

Comparing these two schemes, sorting has an inherent advantage over hashing in that sorting schemes have the effect of global ordering. But sorting is more difficult to implement because it requires constant dynamic splitting.

In addition, if the partition schema of hash is used, the database is less prone to hot issues due to service access patterns. Sorting Because all keys are arranged in order, requests with the same prefix are likely to be clustered into one or adjacent partitions, and these requests are likely to be placed on the same machine. A hash, on the other hand, doesn’t have this problem because all the keys have been pre-shredded.

However, simply comparing the pros and cons of the two solutions is not meaningful, or to start from the business. We believe that in the Business scenario of the Internet, there is no need for a partial order relationship between different keys (such as the names of two users), so we adopt the hashing scheme after weighing.

Hash Schema implementation mode

The first problem we encountered in implementing HashSchema was “how to store data”. “Dropping a key into a bucket based on the hash value” is just an idealized abstraction. In a real system implementation, you must also provide a layer of “tables” to separate different businesses. Once we had the idea of a table, we started to have implementation disagreements. Specifically, we have a total of “multi-table mixed storage” and “separate storage” two schemes in storage.

The so-called multi-table hash is to calculate a new hash value by combining the table ID and hash key, and select a partition from the globally unique partition space according to the hash value. As shown in the left half of the image above.

For separate storage, the semantics of the table are pushed down to the storage layer. As shown in the right part of the preceding figure, each table has an independent partition space. When receiving an READ/write request, you need to find the partition space based on the table ID and then the hash key to find the partition.

So which of these two plans is better? Theoretically, we think that the multi-table mixed storage scheme is better, because it is more consistent with the idea of hierarchical software engineering. Muddling is also easier to implement because there is only one piece of metadata to manage and load balancing is simpler. But from a business point of view, it is better to use separate storage for each table. Because separate storage means easier inter-table resource restrictions, table-level monitoring, and table deletion operations. Even if you delete a partition by mistake, the impact of the error operation on different services is less.

The table below shows our comparison of the two schemes:

One is more elegant in theory and the other more business-friendly in practice. Finally, we started from the business. We gave up the plan with more beautiful theory and easier implementation, and chose the business-friendly plan.

Load balancing of Hash Schema

With hash Schema, there is the question of load balancing. The general distributed KV storage targets for load balancing are listed below:

  • Overheated requests for a single Key: No storage solution is easy to resolve. If it is read, consider adding layer read-only cache; In write mode, services can only split the key

  • Overheated requests for a single Partition: No consideration in hash schema because keys are hash scattered.

  • Uneven Partition capacity distribution: This parameter is not required in hash schema because keys are hash distributed.

  • The number of partitions is unevenly distributed across different replicaservers: this needs to be addressed, and the read and write requests are more evenly distributed across different ReplicaServer replicas.

Load balancing – Target

Specifically, there are two objectives of load balancing:

A. Replication and secondary cannot share the ReplicaServer

B. For each table, the primary and secondary can be equally distributed across different Replicaservers

The figure above is a simple illustration of target B: If a table has four partitions and there are three ReplicaServer replicas, we expect the distribution of 12 replicas to be (1, 3), (2, 2), (1, 3).

Load balancing – Algorithm

When implementing our load balancing algorithm, it is important to note that role switching is better than data copying because the cost of role switching is very low.

To illustrate this, consider the two situations in the figure above:

  1. In the figure on the left, ReplicaServer2 and ReplicaServer3 have four Primary’s, while ReplicaServer1 has no Primary’s. In this case, the Primary A on ReplicaServer3 is switched with the Secondary A on ReplicaServer1.

  2. In the figure on the right, the distribution of four ReplicaServer primaries is (2, 1, 1, 0). If you want to balance a Primary, you need to migrate a Primary from ReplicaServer1 to ReplicaServer4, but directly migrating the Primary requires copying the data. If ReplicaServer2, the middle node, is introduced, the roles of ReplicaServer1’s Primary A and ReplicaServer2’s Secondary A are switched. ReplicaServer2’s Primary D and ReplicaServer4’s Secondary D are exchanged to achieve Primary balancing.

To deal with these cases, we build a directed graph of the possible Primary flows in the cluster, and then use the Ford-Fulkerson method to perform the Primary migration transposition. The specific algorithm is no longer expanded, you can see our open source project code.

In real load balancing, there are many more cases to consider:

  • For better fault tolerance, you should not have multiple copies of a Partition on the same rack.

  • Different Replicaservers may have different storage capacities, and the absolute balance of the number of replicas may not be reasonable.

  • When performing load balancing data copy, you must limit traffic to avoid occupying too many system resources

In the current open source project at Pegasus, there are some things that have not been considered. In the future, we will continue to optimize the load balance, so you can keep paying attention.

Consistency and availability

Extensibility was introduced, followed by consistency and availability. Our design considerations have been described previously:

  • Reduce dependence on ZooKeeper

  • Data is not written to DFS

  • Multi-copy, PacificA algorithm is used to ensure strong consistency

When we implemented the system in accordance with these design objectives and were ready to test the business, the business said to us: “Do you have double room hot backup?”

Why single this out? For a highly consistent distributed storage system, cross-room fault tolerance is a troublesome matter:

  • On the one hand, if multiple copies of a Partition are deployed in two rooms, there must be a majority in one room. The overall outage of the machine room means that the cluster is unserviceable (due to strong consistency).

  • On the other hand, if the Partition of multiple copies deployed in three rooms, cross-room security is indeed guaranteed. But this means that every write request must have replication across the machine room, affecting performance.

By reflecting on this problem, we think that we have actually drilled a “perfect system” tip. From a business point of view, as a complete storage system, it is necessary to deal with all kinds of exceptions. However, as the probability of abnormal situations decreases, the requirements for consistency in the business are also gradually relaxed:

After understanding the requirements of the business, we designed multiple levels of redundancy strategies for Pegasus to address different risks:

  • Consistency protocol: used in single-node deployment to prevent single-node failures

  • Cross-room replication: Logs that are asynchronously replicated to each partition are writable in both rooms based on final consistency of NTP time stamps

  • Snapshot’s periodic cold backup: replication across regions, a last-ditch strategy in the event of extreme failures, where some data may be lost

A few notes on the above redundancy strategy:

  • The NTP timestamp may not be accurate enough. If two computer rooms write the same key successively, the former may overwrite the latter. There is no further way to circumvent this, because our goal is “final consistency”, that is, “over time, the end result of both rooms is the same”, so even if the first write overwrites later, the end state in both rooms is the same.

  • Based on our experience in HBase maintenance, it is very unlikely for two service rooms to write a key at the same time. Therefore, NTP causes the “write before write” problem. The service needs to pay attention to it.

  • Multiple levels of redundancy may result in a large number of data backups. First, not all businesses need so many levels of redundancy, but are configured online based on specific requirements. So these restrictions are redundant so far we’ve made table level parameters modifiable rather than cluster level mandatory. Furthermore, some backups, such as snapshot cold backup, have very inexpensive storage media.

  • Cross-room Replication and Snapshot cold backup are still in internal testing and not yet available in open source versions.

Delay guarantee

Finally, it introduces the problem of ensuring delay performance. There are two important points to emphasize about this question:

  • What implementation language to choose

  • How to implement conformance protocol efficiently

For the implementation language, we chose C++. The reason, as mentioned earlier, is that for performance, we must use a language without runtime GC. Another option might be Rust. Our reasons for choosing C++ over Rust are as follows:

  • At the time of the project, Rust was not popular, and third-party libraries were far less sophisticated than C++.

  • C++ is not particularly unruly, as long as a few programming specifications are followed.

  • C++ is much easier to recruit

Of course, this choice is conservative, and the discussion on language level ends here. Again, the focus is on the implementation of our consistency protocol.

Practice, think to the proper and efficient consistency protocol implementation, and ensure that the code is easy to maintain, to test, is a difficult thing, the main reason is a complete request involves many phases, phase between will produce IO, plus concurrent requirements, often need to very fine grained locking code.

The following diagram shows the process of a complete write request:

As shown in the preceding figure, when a client initiates a write request, multiple events such as write to the local disk and send the NETWORK RPC are generated. No matter whether the event succeeds or fails, the common consistency status needs to be accessed or modified. This logic, which requires tedious thread synchronization of consistency states, is very bug-prone.

So how to deal with this kind of problem? In our experience, the idea is to change the way code is organized from “scramble for critical sections” to “serialized queueing without locking.” Let me start with a diagram to illustrate our code architecture:

Specifically, it takes “consistency state” as the core, and strings all the events related to state change into a queue, and executes them through a separate thread. By execution, we mean changing the consistency state. If an I/O is fired during execution, it is done in a purely asynchronous manner, and the response event is queued after the I/O completes. As shown below:

In addition, to avoid creating too many threads, we use thread pools. A “consistent state” data structure can be modified by only one thread, but multiple structures may share one thread; The relationship between replica and thread is many-to-one.

In general, with this event-driven and purely asynchronous approach, we are able to avoid fine-grained lock synchronization when accessing consistent state. The CPU is not stuck in I/O wait and the thread pool is used, the performance aspect is also guaranteed. In addition, this implementation method is very convenient for us to monitor the read and write process because a write request is clearly divided into different stages, which is very beneficial to the performance analysis of the project.

Determinnistic test

Now let’s focus on how the test is done.

Problems with distributed system stability

When we put the system down, what really bothered us for a long time was how to stabilize the system.

What are the main aspects of this problem? The following three points are summarized:

  1. Difficult to test, there is no more effective way to test system problems. Now the experience is to treat it as a black box, read and write at the same time kill tasks, to find a way to make it a module problems, and then to see whether there is a problem globally. However, such testing methods can only hit bugs, because problems are probabilistic.

  2. Difficult to reproduce. Because bugs are probabilistic, even if they are found through testing, the problem is not likely to recur, further causing trouble for debugging.

  3. Difficult to return to. Let’s say I find the problem by looking at logs, observing phenomena, and analyzing code. It’s not convincing that your fix worked. Will this fix the problem? Will it lead to new problems? These two questions are difficult to answer because there is no way of stable recurrence.

Root cause: uncertainty

So what is the root cause of these difficulties? To summarize, we think it’s the uncertainty of the program itself. This uncertainty is reflected in two aspects:

  • Randomness in the program itself: system apis such as scheduling, timers, use of random numbers, and parallelism between multiple nodes.

  • IO: Programs may involve peripheral IO. The probability of I/O failures, timeout, packet loss, and disorderly order also lead to program uncertainty.

To summarize it in a formula:

IO errors with a low probability + random execution paths = Exceptions that cannot be easily recur

So how do we solve this problem?

Since it is difficult to reproduce problems online, can we construct a simulation scenario: in this simulation scenario, we can simulate the probability of IO errors, and also control the execution order of programs. Running the code in a simulation like this, if the logic does break down, we can reproduce the problems in the same order of execution. After the logic is modified, it can be further unit tested, so that the difficult regression problem is solved.

Of course, that’s a very abstract way to describe the problem. Here IS a simple example to illustrate:

Suppose there is such an account system, in which there are two people, Alice and Bob. The balance of their account is 100 yuan each, which is stored on two machines respectively. Both of them can initiate transfer transactions to each other. Now Alice wants to transfer 5 yuan to Bob. The simplest implementation is for Alice to deduct 5 yuan from her account balance and then make a request to Bob’s machine for an increase of 5 yuan. After Bob adds 5 yuan to his account, Alice can be notified that the transfer has been successful.

This simple implementation is fine if the machine doesn’t crash, the disk doesn’t fail, and the network doesn’t fail. However, such assumption will never be true, so in order to deal with these problems, we may add a series of measures to make account system reliable, such as adding transaction logs, backing up transaction and account information to multiple machines, introducing some distributed transaction technology. The introduction of these instruments will complicate our system.

With such a complex system, any unusual database state can cause us to scratch our heads, such as the following changes in Alice’s and Bob’s account information:

(Alice_100, Bob_100) -> (Alice_105, Bob_105)

When there are no bugs in the program, we can assume that the sum of Alice’s and Bob’s balances is equal to 200. Now the balance sum is 210, something must have gone wrong. Maybe there was a bug that happened when two people sent money to each other at the same time. Maybe the packet was sent twice. There are many possible ways to enter an illegal state, but probabilistic hardware failures and complex (possibly parallel) transfer records between Alice and Bob make it difficult to locate and reproduce the cause of the illegal state.

For such a problem, our weapons are simulation and pseudorandom. With both approaches, we want the program to follow a reproducible sequence of events. So if the program crashes because it entered an illegal state, we can reproduce, debug, and regression the state.

Take, for example, the transaction between Alice and Bob. A process in which both parties simultaneously transfer money might run like this in a simulated environment: Alice initiates transfer -> Bob initiates transfer -> Alice initiates write disk -> Alice initiates RPC -> Alice RPC succeeds -> Bob initiates write disk -> Alice fails to write disk

In other words:

  • Two sets of logic that were executed in parallel may be interleaved concurrently in the simulator

  • The scheduling selection of interleaving is pseudo-random, and the state transition flow of different simulator instances is completely different. But for any instance, as long as the same random number seed is selected, its transition process is determined

  • IO failures are also based on pseudo random probability of error injection

Given such an environment, the debug of distributed business logic must be much easier. On the one hand, randomness allows us to test various execution sequences; Pseudo-randomness and error injection, on the other hand, allow us to reproduce the problems encountered.

Controlling uncertainty

So, how does the specific control uncertainty work?

  • Interfaces that generate uncertainty provide a layer of abstraction. Currently, the system provides five abstract interfaces: thread pool, RPC, disk IO, thread synchronization and environment operation. Each interface has a simulated implementation (test) and a non-simulated implementation (deployment run). The code of the distributed business Logic only calls the abstract interface (Runtime) of the system.

  • The system provides the ability to imitate multiple service nodes in a single process, thus achieving the effect of simulating distributed system.

The simulation implementation of Runtime layer is a difficult point, the key points include:

  • A purely asynchronous programming interface is provided, and the running mode of the node is the process of the thread executing the task in the event queue.

  • When the business logic is called inside the Runtime module through the Runtime API, the Runtime suspends the current node and selects a new node to execute in a pseudo-random manner. The entire process is similar to that of a single-core multi-task operating system, where the Runtime is like a kernel, the distributed logic Node is like a user-mode process, and the Runtime API is like a system call. As shown below:

  • For IO events submitted to the event queue, we inject pseudo random errors with a certain probability. In addition, some global state detection modules are introduced to detect whether the global state is legitimate every time the state changes (such as checking whether the sum of the balance of Alice and Bob’s account is less than 200). The Application Logic code also adds a number of assert points to check whether the Node state is valid. After the program crash, you can use the same random number seed for reproduction and debugging. As shown below:

Pegasus unit testing is also done using this testing framework. In unit tests, we use a script to describe the actions and expected states applied to a scenario. The App Logic Checker shown above loads the script and checks whether the application state is as expected.

The concept and implementation of the testing framework used by Pegasus are derived from Microsoft’s open source framework rDSN. The whole framework is a little more difficult to understand, and here is just a brief overview of how it works. You can view the source code directly if you are interested.

Status and Plans

Pegasus has been stable inside Xiaomi for nearly a year now, serving nearly 10 businesses. For more on the storage engine, performance, and design aspects of Pegasus, check out another Arch Summit 2016 post (link below) and our documentation on Github.

In the future, we will open source the functions of hot and cold data backup, Partition splitting and so on. Please keep paying attention. Welcome to join xiaomi and join our team.

Write in the last

Finally, summarize the content of the whole article.

When researching or implementing a project, there are three things to focus on:

  • Focus on the business: Whether the project meets the business requirements

  • Architecture concerns: Whether the architecture of the project is reasonable. For a distributed storage system, it includes consistency, availability, scalability, and performance

  • Focus on software engineering: how well the project is tested, how well the code is maintained, how well the project is monitored

A link to the

  • Pegasus Project Address:

    https://github.com/XiaoMi/pegasus

  • Pegasus at Arch Summit 2016

    http://bj2016.archsummit.com/presentation/3023

  • RDSN Project Address:

    https://github.com/Microsoft/rDSN

  • The rDSN branch maintained by Pegasus:

    https://github.com/XiaoMi/rdsn

  • Links to PacificA papers:

    https://www.microsoft.com/en-us/research/wp-content/uploads/2008/02/tr-2008-25.pdf

About us

We are the cloud storage team of artificial Intelligence and Cloud Platform Department of Xiaomi. The team’s responsibility is to develop and maintain distributed storage system and provide distributed storage solutions for the whole Xiaomi company and enterprises in the ecological chain.

The systems developed and maintained by our team include: Zookeeper, HDFS, HBase, Pegasus, and FDS, SDS, EMQ and other services encapsulated based on these systems.

If you have any questions (not just technical ones), please feel free to contact us. At the same time, we also welcome ambitious candidates to join us.

  • Pegasus -help [at] xiaomi.com

  • Weibo: Xiaomi Cloud Technology


Hi, we made a “Geek Time” App. This is an IT knowledge service product, including column subscription, Q news, hot topics, live broadcast, video and audio and other forms of knowledge services.

Geek time, regain geek spirit, enhance technological awareness, explore the world with curiosity, create the future. Now login iOS APP Store, welcome to download. Android version is under development, soon online, please look forward to!