Xiaomi’s business lines are numerous, covering many fields from information flow, e-commerce, advertising to finance. Xiaomi Streaming platform provides integrated streaming data solutions for all businesses of Xiaomi Group, mainly including data acquisition, data integration and streaming computing three modules. At present, the daily data volume reaches 1.2 trillion pieces, with 15,000 real-time synchronization tasks and 1 trillion pieces of data calculated in real time.

With the development of Xiaomi’s business, streaming platform has undergone three major upgrades to meet the various needs of many businesses. The latest iteration is based on Apache Flink, which completely reconstructs the internal modules of the Streaming platform, while xiaomi’s services are gradually switching from Spark Streaming to Flink.

background

The vision of Xiaomi Streaming Platform is to provide integrated and platform-based streaming data solutions for all xiaomi business lines. Specifically, it includes the following three aspects:

  • Streaming data storage: Streaming data storage refers to message queue. Xiaomi has developed a set of its own message queue, which is similar to Apache Kafka, but has its own characteristics. Xiaomi streaming platform provides the storage function of message queue.
  • Streaming access and dump: With message queues as the caching area for streaming data, it is necessary to provide streaming access and dump functions.
  • Streaming data processing: Refers to the platform’s processing of Streaming data based on computing engines such as Flink, Spark Streaming and Storm.




The following figure shows the overall architecture of the streaming platform. The first orange column from left to right is the data source, which contains two parts, User and Database.

  • User refers to the User’s various buried data, such as the User APP and WebServer logs, followed by Database data, such as MySQL, HBase, and other RDS data.
  • The blue part in the middle is the specific content of streaming platform, where Talos is the message queue implemented by Xiaomi, and its upper layer includes Consumer SDK and Producer SDK.
  • In addition, Xiaomi has also implemented a complete set of Talos Source, which is mainly used to collect the data of all scenarios of users and databases mentioned just now.
Talos Sink and Source are combined into a data flow service, which is responsible for dumping Talos data to other systems with very low latency. Sink is a standardized service, but it is not customized enough. Talos Sink module will be reconstructed based on Flink SQL later.





The chart below shows the scale of Xiaomi’s business. At the storage level, Xiaomi has about 1.2 trillion messages a day, with peak traffic of 43 million per second. Talos Sink alone dumps 1.6 PB of data every day, and there are nearly 15,000 dump operations. There are more than 800 streaming computing jobs and more than 200 Flink jobs every day. Flink can process 700 billion messages and more than 1 PB of data every day.





Mi streaming platform development history

The development history of Miui streaming platform is divided into the following three stages:

  • Streaming Platform 1.0: Version 1.0 of Xiaomi’s Streaming Platform was built in 2010 and originally used Scribe, Kafka and Storm, where Scribe is a set of services that addresses data collection and data dump.
  • Streaming Platform 2.0: Due to various problems existing in version 1.0, we have developed Xiaomi’s own message queue Talos, including Talos Source, Talos Sink and Spark Streaming.
  • Streaming Platform 3.0: This version adds Schema support to the previous version and also introduces Flink and Stream SQL.




The Streaming Platform 1.0 is a cascading service consisting of Scribe Agents and Scribe Servers. It is mainly used to collect data and meet the requirements of offline computing and real-time computing. HDFS and Hive are used for offline computing, and Kafka and Storm are used for real-time computing. Although this offline and real-time approach can basically meet xiaomi’s business needs at that time, it also has a series of problems.

  • First, there are too many Scribe Agents and a lack of configuration and package management mechanisms, resulting in high maintenance costs.
  • With the Push architecture of Scribe, data cannot be effectively cached in case of exceptions and HDFS/Kafka data interact with each other.
  • Finally, when the data link cascades for a long time, the whole link data black box lacks monitoring and data inspection mechanism.




In order to solve the problems of Streaming Platform 1.0, Xiaomi launched the Streaming Platform 2.0 version. Talos is introduced in this version, which is used as data cache for streaming data storage. On the left side, there are various data sources, and on the right side, there are various sinks. In other words, the original cascading architecture is transformed into a star architecture, which has the advantage of convenient expansion.

  • This version implements a configuration management and package management system, which can automatically update and restart the Agent after a configuration.
  • In addition, Xiaomi has also implemented a decentralized configuration service, in which configuration files can be automatically distributed to distributed nodes after being set.
  • Finally, this version also implements end-to-end data monitoring, which monitors data loss and data transmission delay across the entire link through buried points.




The advantages of Streaming Platform 2.0 include:

  • Multi Source & Multi Sink are introduced. Before, derivatives between the two systems are directly connected according to the need. The current architecture reduces the system integration complexity from O(M*N) to O(M+N).
  • Introduce configuration management and package management mechanisms to thoroughly solve a series of problems such as system upgrade, modification, and online, and reduce the pressure of operation and maintenance.
  • The end-to-end data monitoring mechanism is introduced to realize full-link data monitoring and quantify full-link data quality.
  • Productized solutions to avoid repeated construction and solve business operation and maintenance problems.




The following figure illustrates MySQL synchronization in detail, where a MySQL table is synchronized to the message queue Talos using the mechanism described above. The Binlog service disguised as a Slave of MySQL sends a Dump Binlog request to MySQL. After receiving the Dump request, MySQL starts to push Binlog to the Binlog service. The Binlog service dumps binlogs to Talos in a strictly ordered form. The Spark Streaming job is then connected, and the binlog is parsed. The result is written to the Kudu table. The platform currently supports more than 3000 orders of magnitude of tables written to Kudu.





The following figure shows the functional modules of Agent Source. It supports RPC and Http protocols and can monitor local files through File to realize dual cache of memory and File and ensure high reliability of data. The platform realized the SDK of Logger Appender and RPC protocol based on RPC protocol. For Http protocol implementation of HttpClient; For files, File Watcher is implemented to automatically discover and scan local files, and Offset Manager automatically records Offset. The Agent mechanism is deeply integrated with the K8S environment and can be easily combined with streaming computing at the back end.





The following is the logical flow chart of Talos Sink, which realizes a series of flows based on Spark Streaming. On the left is a series of Talos Topic Partition fragments based on each batch abstraction common logic, such as startProcessBatch() and stopProcessBatch(). Different sinks only need to implement Write logic. Different sinks are independent for different operations to avoid mutual influence; Sink is optimized based on Spark Streaming to achieve dynamic resource scheduling based on Topic traffic and maximize resource saving while ensuring system delay.





The following figure shows the end-to-end data monitoring mechanism implemented by the platform. The specific implementation is to have a timestamp EventTime for each message, which represents the time when the message is really generated. According to EventTime, the time window is divided into one minute, and each hop of data transmission counts the number of messages received in the current time window, and finally calculates the integrity of the message. The delay is calculating the difference between ProcessTime and EventTime for a hop.





Streaming Platform 2.0 has three main problems:

  • Talos data lacks Schema management. Talos does not understand incoming data. In this case, SQL cannot be used to consume Talos data.
  • Talos Sink module does not support customization requirements, such as transferring data from Talos to Kudu. Talos has ten fields, but Kudu only needs five fields. This function is not well supported at present.
  • Spark Streaming has an issue with the Event Time, end-to-end Exactly Once semantics is not supported.




Real-time data warehouse based on Flink

In order to solve the above problems of Streaming Platform 2.0, Xiaomi has conducted a lot of research, and also made a series of communication and exchanges with the real-time computing team of Ali. Finally, xiaomi has decided to use Flink to transform the current process of the Platform. The following is a detailed introduction of the practice of Xiaomi Streaming computing Platform based on Flink.

The design concept of using Flink to transform the platform is as follows:

  • Full link Schema support, where the full link not only includes Talos to Flink phase, but from the initial data collection phase to the back-end computing processing. Data verification mechanism should be implemented to avoid data contamination. Field change and compatibility check. In big data scenarios, Schema changes are frequent, so compatibility check is necessary. Learn from Kafka’s experience and introduce forward, backward, or full compatibility checks to schemas.
  • With the help of the Flink community, Flink is fully promoted to be launched in Xiaomi. On the one hand, the real-time computing operations of Streaming are gradually transferred from Spark and Storm to Flink to ensure the original delay and resource saving. So far, Xiaomi has run more than 200 Flink jobs. On the other hand, it is expected to use Flink to transform the process of Sink, improve the operation efficiency, support ETL, and vigorously promote the Streaming SQL on this basis.
  • Realize the Streaming productization, introduce the platform management of Streaming Job and Streaming SQL;
  • Talos Sink is reformed based on Flink SQL to support business logic customization




The following image shows the Streaming Platform 3.0 architecture, which is similar to the 2.0 architecture, but from a different Angle. Specifically, it contains the following modules:

  • Abstract tables: In this version, storage systems such as MySQL and Hive are abstracted into tables to prepare for SQL.
  • Job management: Supports the management of Streaming jobs, including multi-version support, separation of configuration and Jar, compilation and deployment, and Job status management.
  • SQL management: SQL is eventually converted into a Data Stream job, which includes Web IDE support, Schema exploration, UDF/ dimension table Join, SQL compilation, automatic DDL construction and SQL storage, etc.
  • Talos Sink: This module reconstructs the 2.0 version of Sink based on SQL management, including the functions of one-click table building, automatic update of Sink format, field mapping, job merging, simple SQL and configuration management, etc. In the scenario mentioned above, Message is read from Talos based on Spark Streaming and transferred to HDFS for offline data warehouse analysis intact, which can be easily expressed in SQL. In the future, we hope to realize the deep integration of this module with other internal systems of Xiaomi, such as ElasticSearch and Kudu, etc. The specific scenario is that Talos Schema is assumed and the Kudu table can be automatically created based on Talos Topic Schema.
  • Platformization: to provide users with integrated and platform-based solutions, including debugging and development, monitoring and alarm, operation and maintenance, etc.




The Job management

Job management Provides Job lifecycle management, Job rights management, and Job label management. Display Job running history for easy tracing. Supports Job status and delay monitoring to automatically pull up failed jobs.





SQL management

It mainly includes the following four links:

  • Convert external tables into SQL DDL, which corresponds to standard DDL statements in Flink 1.9, including Table Schema, Table Format, and Connector Properities.
  • Add SQL statements based on a fully defined external SQL table to complete the expression of user needs. That is, SQL Config represents the complete expression of user expectation, which is composed of Source Table DDL, Sink Table DDL and SQL DML statements.
  • Convert SQL Config to Job Config, which is the Stream Job representation.
  • Convert Job Config to JobGraph for submitting Flink jobs.




The process for converting external tables into SQL DDL is shown below.

  • First, obtain Table Schema and Table Format information from external tables. The latter is used to reverse data, for example, deserialize Hive data.
  • Then the default Connector configuration is generated on the backend. The configuration is divided into three parts: unmodifiable, modifiable by users with default values, and mandatory by users without default values.
The unmodifiable configuration case assumes that the Talos component is consumed, and the connector.type must be Talos, so the configuration does not need to be changed. The default value is consumption from the Topic header, but the user can set consumption from the tail, which is a configuration with default values that the user can modify. Some permissions are mandatory for users.

The purpose of layer 3 configuration management is to minimize the complexity of user configuration. Table Schema, Table Format, and Connector 1 other configuration information make up the SQL DDL. After the SQL Config is returned to the user, the user is required to fill in the modifiable information, which completes the conversion from the external table to the SQL DDL. The red font indicates the information modified by the user.





SQL management introduces a feature of External Table. Assume that when users choose to consume a Topic on the platform, this feature will automatically obtain the Schema and Format information of the Table mentioned above, and show that the logic of registering Flink Table is removed. When you get a Schema, this feature automatically converts the external Table field type to Flink Table field type and automatically registers it as Flink Tab. At the same time, the Connector Properties are divided into three categories, with default parameters and only required items to be filled in by users. All parameters are expressed in the form of Map, which is easy to convert to TableDescriptor inside Flink.





The SQL DDL creation process is described above. Based on the EXISTING SQL DDL, such as Source SQL DDL and Sink SQL DDL, the user is required to fill in the SQL query and send it to the back end. The back end verifies the SQL. An SQL Config is then generated, which is a complete expression of an SQL statement.





The following figure shows the process for converting SQL Config into Job Config.

  • Firstly, on the basis of SQL Config, the resources required by the Job and the relevant configuration of the Job (Flink state parameters, etc.) are added.
  • Then compile the SQLConfig into a Job Descriptor that describes the Job Config, such as the address of the Jar package of the Job, MainClass, and MainArgs.




The following figure shows the conversion of Job Config to Job Graph. Schema, Format, and Property in DDL correspond to Table Descriptor in Flink. In this case, you only need to call the built-in interface of Flink to easily convert the information to Table Descriptor, such as CreateTableSource() and RegistorTableSource(). Through this process, DDL can be registered for direct use in the Flink system. For SQL statements, TableEnv can be converted directly using sqlUpdate().





The process for converting SQL Config into a Template Job is as follows. MainClass is the Template Job. MainClass is the Jar address of the Template Job. If you already have an SQL DDL, you can convert it to a Table Descriptor, and then use the findAndCreateTableSource() method of TableFactorUtil to get a TableSource. The conversion process of Table Sink is similar. After completing the first two steps, the sqlUpdate() operation is finally performed. This translates an SQL Job into the last executable Job Graph submitted to run on the cluster.





Talos Sink uses the following three modes:

  • Row: Talos data is poured into the target system intact. The advantage of this mode is that data is read and written without serialization or deserialization, which is high efficiency.
  • ID Mapping: The left and right fields are mapped. Name corresponds to field_name, timestamp corresponds to timestamp, and Region fields are discarded.
  • SQL: Logical processing is represented by SQL expressions.




The future planning

The future plans of Xiaomi streaming platform are as follows:

  • Continue to promote the Streaming Job and platformization construction when Flink lands;
  • Flink SQL is used to unify offline data warehouse and real-time data warehouse.
  • Analysis and presentation of data lineage based on Schema, including data governance;
  • Continue to participate in the development of the Flink community.






On the cloud to see the Cloud habitat, click here to see more!

This article is the original content of Aliyun and shall not be reproduced without permission.