1. Introduction

We have introduced the application scenarios of Apache TubeMQ. TubeMQ is suitable for business scenarios that tolerate a small amount of data loss in extreme cases. Then under what circumstances may TubeMQ lose data? Why do they do that? How do MQ peers do this? This document aims to answer these questions.

Conclusion 2.

The Apache TubeMQ system uses the single-node multi-disk RAID10 copy scheme to store data. Data may be lost only in the following scenarios:

  1. The machine is powered off, and the data in memory will be lost, which has been successfully recovered but has not been consumed. After the server goes online, data stored on disks is not affected.
  2. RAID10 can not hold the disk exception, returned success but has not been consumed data will be affected; After the disk is restored, data stored but not restored will be lost.
  3. RAID10 can handle daily bad drives, and the production and consumption of bad Broker nodes are not affected.

3. Can you provide a quantitative data reliability index to evaluate the reliability of TubeMQ system?

This problem is also a topic of most communication, the most intuitive feeling is: the machine is easy to hang, TubeMQ only has a single node, the reliability of data is not high. My personal point of view here is as always expressed in other occasions: it is not appropriate to use data reliability index to reflect the data reliability of the system, because it only reflects the results of the system data reliability, there is no direct relationship to how to solve the data reliability; From the introduction of data may be lost scenarios we can see that the production environment of computer room, server hardware, as well as the business can use instant data directly affect the data reliability of the system, and the reliability of the system data was a result of these failures is high or low, if you don’t have these problems, In fact, the data reliability of TubeMQ system is 100%. Therefore, we should evaluate and analyze the data reliability of the system according to the failure rate of data loss caused by the corresponding environment. Personally, I think this is fundamental.

According to the failure data statistics of our online environment in 2019, the failure rate of TubeMQ cluster in our environment that may lead to data loss in the whole year is about 2.67% : In the whole TubeMQ cluster of 1500 machines, with an average daily access volume of 3.5 trillion pieces of data, about 40 servers experienced machine ping anomalies and disk group damage anomalies that RAID10 could not hold. Personally, I feel that the failure rate of our environment can be further reduced, as the TubeMQ cluster uses mostly used machinery that has been phased out after several years of critical business use.

4. Why is TubeMQ designed as a single-node storage?

Cost reduction:It is well known that achieving 100% data reliability is very expensive, and an MQ pipeline has to be built into a spaceship-like redundant data backup with multiple independent nodes to ensure that data is not lost; According to our analysis, about 90% of the business data transmitted through MQ can be lost in extreme cases, and about 10% of the data should not be lost at all, such as transaction flow, money-related log data, etc. We can save a lot of money if we separate out the 10% of highly reliable data. With this in mind, TubeMQ is responsible for delivering data services that require high performance and allow minimal data loss in extreme cases.

As shown in the diagram above, in addition to solution considerations, we also carefully considered the storage solution design of TubeMQ: we carried out data and index components by Topic dimension; We’ve superimposed a layer of memory storage on top of the files as an extension of disk file storage. But we are not completely reckless because the business tolerates a small amount of loss. For example, we use QoS1 solution for data production. Our data store is flush control with mandatory cache flush control (by bar, by time, by size); Our disk failure is due to the automatic read-only or offline control (ServiceStatusHolder) of the Broker nodes on the server based on the operation policy. The production side also has automatic abnormal node awareness and algorithm shielding for the quality of service of Broker nodes, etc., in order to achieve high performance and improve the reliability of data as much as possible and reduce the possibility of data loss.

How much will it cost? Take a number of external use of Kafka manufacturers have been publicly available operating data, 1 trillion daily access, Kafka about 200 to 300 megamachines, according to 2019 operating data, TubeMQ about 40 to 50 megamachines; There are some special cases that can be distinguished, such as separate clusters, differences in the number of specific business shares, etc., but the ratio of the machine’s cost indicator data should not differ much. If you convert these metrics into money, the savings in server costs alone can be measured in billions of units.

Some might say, can I use a single copy of Kafka as a business service to achieve the same cost effect as TubeMQ? I would like to express that if we could, we would not have spent so much time and resources to improve TubeMQ and directly use Kafka single copy solution. In the early days of open source, we did a comprehensive single Broker performance comparison report tubemq_perf_test_vs_Kafka. You can find the specific differences above.

The point here is that, in fact, the Data reliability of the TubeMQ system itself is not low. Have you ever thought about how reliable the system data is for each MQ in a multi-copy scheme?

5. Multi-copy scheme analysis of similar MQ:

Kafka: In my opinion, Kafka’s multi-copy scheme is only as good as it can be in a high performance scenario.

The Kafka replica mechanism identifies and distinguishes the number of copies and online synchronization copies through an AR set and ISR set. The replicas record the latest synchronization time of each copy to determine whether each Follower is still online with the Leader. Before 0.9x Kafka also has another parameter removed, replica.lag.max.messages. The Follower copy lags behind the Leader copy in the number of messages, which is combined to determine invalid copies. The min.insync.replicas server ensures the minimum number of replicas in the ISR that are in the synchronous state. The client can specify the number of Ack requests (0: no reply, 1: The Leader store (OK, -1: all ISR nodes respond) is combined to ensure that data can be received by multiple replicas. From the design of this mechanism, we can see clearly that even the designers of Kafka are aware that the data may not be synchronized from the Leader to each Follower, and the replication may not be timely. Therefore, the ISR identification is changed from (lag number, synchronization time) to (synchronization time). Too much lag affects the determination of ISR copies. If the number of ISR copies is 0, native Kafka cannot write messages. For that Kafka has added an unclean. Leader. Election. Enable parameters, allowing not in copy of the Topic can be selected as the leader of foreign service, do our best service.

From the above analysis, under the big data scenarios Kafka this copy mechanism, can meet the demand of consumption business back, master copy machine fault, is synchronized to the copy of the data can be back to business consumption, but the problems lost data due to the above problems, for back and to ensure that the scene cannot meet the data is not lost; At the same time, this scheme consumes large resources and has a very low utilization rate. According to the configuration of two copies, resources are increased at least twice and network bandwidth is reduced by half. Meanwhile, in order to avoid the situation that ISR of two copies is 0, users are likely to configure three copies, so that more resources are increased and the utilization rate is lower. With such a high cost to maintain an unreliable data service, the solution is not cheap and effective; Finally, Kafka blocks production traffic when a partition has no replica alive, which is unacceptable to individual businesses in a high-traffic environment. Even with the configuration of three replicas, because three replicas are alive dynamically, there are still production disruptions in extreme cases.

Pulsar: In my personal analysis, I can guarantee data loss, but it has an impact in big data scenarios. Pulsar adopts a similar Raft protocol pattern where most copies are written successfully and the server actively pushes requests to each Bookeeper copy node. This real-time multi-copy synchronization scheme can meet the vast majority of highly reliable business needs, and the eyes of users are very clear. I think the recent popularity of Pulsar is also related to its meeting this business demand in the market. However, if it is placed in a big data scenario with thousands of topics and tens of thousands of partitions, this multi-copy scheme needs to consume a lot of machine resources. Therefore, our TEG data platform department uses Pulsar for internal service for highly reliable data. We also donate our improvements to Pulsar to the community.

TubeMQ: As described in its application scenarios, TubeMQ is designed to meet the requirements of a service data reporting pipeline that allows a small amount of data to be lost in extreme cases. It takes a different self-research path in combination with the requirements of business cost and data reliability, and achieves different results in system reliability for fatal exceptions:

  1. External services are available to a Topic as long as any of its allocated set of brokers are alive.
  2. Based on point 1, as long as all the topics in the cluster still have any Broker alive, the entire cluster of topics are available for external services.
  3. Even if all the Master nodes fail, the newly added production and consumption in the cluster will be affected, but the registered production and consumption will not be affected, and the production and consumption can continue.

6. Conclusion:

Based on the premise of damaging the service, TubeMQ adopts the idea of ensuring data loss and service is not blocked as far as possible, and strives to make the scheme simple and easy to maintain. In the design of TubeMQ, partition failure does not affect the overall external service of Topic. As long as one Topic partition survives, the overall external service will not be blocked. The data delay P99 of TubeMQ can be achieved in milliseconds, so as to ensure that the business can consume the data as quickly as possible and do not lose as much as possible. TubeMQ’s unique data storage solution is designed to perform at least 50% better than Kafka’s TPS (and double the performance on some models), and to accommodate more topics and partitions on a single machine, resulting in a larger cluster size and reduced maintenance costs. These different considerations and implementations combine to make TubeMQ a low-cost, high-performance, and stable foundation.