This article is the 11th session of Meitu Internet Technology Salon shared by guests. The public account replied to “Meitu Big Data Platform” to get PPT. Click to read the original article to watch the full video playback.
Nowadays, the application of big data is more and more widely in various industries: operation focuses on operation effect based on data, product focuses on conversion rate based on data analysis, development measures system optimization effect based on data, etc. Meitu company has more than a dozen apps such as Meipai, Meitu Xiuxiu and Beauty Camera. Each app makes personalized recommendation, search, report analysis, anti-cheating and advertising based on data. As a whole, the business demand for data is more and the application is more extensive.
Therefore, the business background of meitu data technology team is mainly reflected in: many business lines and wide application. This is one of the main reasons why we build a data platform, driven by business.
Figure 1
Here are some examples of data applications of beautiful pictures. As shown in Figure 1, the first one from the left is DataFace, a data visualization platform developed by Meitu, which supports business parties to generate visual reports by dragging and dropping freely, facilitating efficient data reports and subsequent analysis. The second one is the home page of Mepai APP. The popular personalized recommendation is a list of videos that users may like and be interested in based on the behavioral data used. The third one is based on the cheating data of users, anti-cheating is carried out according to certain models and strategies, and cheating behaviors of users are judged and filtered effectively. In addition, search, A/B experiments, channel tracking, advertising and other aspects are widely used.
Currently, Meitu has 500 million monthly active users, and these users generate nearly 20 billion pieces of behavior data every day. The overall magnitude is relatively large, with thousands of clustered machines and petabytes of historical data.
Meitu has more business lines, and each business line is widely used in data, and the overall user scale is relatively large, all of the above factors, we must build a corresponding data platform, to drive the growth of these businesses, more efficient use of data.
Overall architecture of Meitu Data platform
Figure 2 shows the overall architecture of our data platform. In the data collection part, we built a collection server log system Arachnia, which supports the client SDK integrated by each APP and is responsible for collecting app client data. There is also data integration based on DataX (import and export); The Mor crawler platform supports the development of configurable tasks for crawling public network data.
Figure 2
The data storage layer selects different storage solutions based on service characteristics. Currently, HDFS, MongoDB, Hbase, and ES are mainly used. In the data computing part, the current offline computing is mainly based on Hive&MR, the real-time streaming computing is Storm, Flink and another self-developed bitmap system Naix.
In data development, we built a data workshop, data bus distribution, task scheduling and other platforms. Data visualization and application is mainly based on user needs to build A series of data application platform, including: A/B experimental platform, channel promotion tracking platform, data visualization platform, user portrait and so on.
The right side of Figure 2 shows some of the basic services that each component may depend on, including geolocation, metadata management, unique device identification, and so on.
Figure 3 shows the basic data architecture flow diagram. The typical LAMDA architecture starts from the left data source collection. Arachnia and AppSDK respectively report the server and client data to the proxy service collector, and then write the data to Kafka through the parse data protocol. The real-time stream then goes through a layer of data distribution, eventually consuming the Kafka data for real-time computing.
Figure 3
When offline, the ETL service is responsible for dumping data from Kafka to HDFS, and then heterogeneous data sources (such as MySQL, Hbase, etc.) mainly import and export data based on DataX and Sqoop. Finally, data is written to various storage tiers through Hive, Kylin, and Spark. Finally, data is connected to the service system and our own visualization platform through unified external apis.
The phased development of data platform
The construction of enterprise-level data platform is mainly divided into three stages:
-
At the beginning, the free third-party platform was basically used. The feature of this stage was that it could quickly integrate and see some statistical indicators of the app. However, the disadvantage was also obvious: without raw data, other analysis and recommendation could not be realized except for basic indicators provided by the third party. So there’s a process from zero to one that allows us to have our own data to work with;
-
After the data is available, due to the explosion of business lines and demand, it is necessary to improve the development efficiency and let more people participate in the data development and use of the data, instead of being limited to the data research and development personnel. Therefore, it involves opening the data, computing and storage capacity to each business line, rather than holding it in their own hands.
-
After the data is opened, the business side will ask whether the data task can run faster, whether it can be released in seconds, whether it can be more real-time; On the other hand, in order to meet the business needs of the cluster is getting larger and larger, so you start to think about how to meet the business at the same time, to achieve more resources.
Meitu is now in the second and third phase of transition. While constantly improving data opening, it also gradually improves the query and analysis efficiency, and starts to consider how to optimize the cost. Next, we will focus on the practice and optimization ideas of our platform in the two stages of 0 to 1 and data opening.
From 0 to 1
From 0 to 1 solve from data collection to the final data available. Figure 4 shows the evolution of data collection, starting with free third-party platforms such as Umeng and Flurry, then quickly using Rsync to synchronize logs to a server for storage and calculation, and then quickly developing a simple Python script to support business servers to report logs. Finally, we developed Arachnia, a log collection system on the server side, and AppSDK on the client side.
Figure 4.
Data collection is the source of data and a relatively important link in the whole data link. More attention should be paid to whether the data is complete, whether the data supports real-time reporting, whether the buried data is standardized and accurate, and the maintenance and management cost. Therefore, our log collection system needs to meet the following requirements:
-
Integrated management and maintenance, including automatic deployment, installation, upgrade, and uninstallation of the Agent, hot change configuration, and delay monitoring;
-
In terms of reliability, at least once;
-
Meitu now has multiple IDC, so it needs to be able to support multiple IDC data collection and summary to the data center.
-
The consumption of resources should be as small as possible and the business should not be affected.
Based on the above requirements, we did not use Flume, Scribe and FluentD, and finally chose to develop a collection system, Arachnia.
Figure 5
Figure 5 is a simple architecture diagram of Arachnia, which is centrally managed through the system brain. The Puppet module is used to aggregate Agent metrics, relay metrics, or configure hot update commands within a single IDC. After installation and startup, the operation and maintenance platform pulls configuration data from the brain and collects and reports data to the collector.
Then look at the practice of Arachnia optimization, first at least once reliability assurance. Many systems use WAL to record the data that fails to be reported and report the data again. Our practice is to remove WAL and add a Coordinator to uniformly distribute and manage TX states.
Figure 6.
Before the data collection, the coordinator sends a TXID. After the source receives the signal, the data collection starts and sends the data to sink to send. After the data is sent, the coordinator ack TX and tells the coordinator that the data is committed. Coordinator will verify and confirm, and then send the commit signal to source and sink to update the status. Finally, after tx, source will update the collection progress to the persistent layer (local file by default). If there are problems in the previous three steps, the data is not sent successfully and the operation is not repeated. If the next four steps fail, the data is repeated and the TX is played back.
Based on the above reliability guarantee at least once, some business parties need to be unique. We support to generate a unique ID for each log. Another major practice in data acquisition systems is to uniquely locate a file and to give each log a unique MsgID so that the business can clean it later in case of log duplication based on the MsgID.
We started with filename, but later we found that filename would be changed by many businesses, so we changed it to inode. However, Linux will recycle and reuse inode, and finally hash inode & the contents of the file header as the fileID. The MsgID is uniquely identified by agentID & fileID & offset.
After the data is reported, the collector is responsible for parsing the protocol and pushing the data to Kafka. Then how does Kafka land in HDFS? First, the appeal of Meitu:
-
Support distributed processing;
-
The serialization of multiple data formats, including JSON, AVRO, special delimiters, and so on, needs to be supported because many lines of business are involved.
-
Support the rerunning of data landing failure caused by machine failure, service problems, etc., and need to have a relatively fast rerunning capability, because once this fault, will affect the subsequent data use of each line of business;
-
The HDFS partition policy can be configured to support flexible and different partition configurations for each service line.
-
Support some special business logic processing, including: data verification, expiration filtering, test data filtering, injection, etc.
Based on the above pain points, meitu data service implementation from Kafka to HDFS is shown in Figure 7.
Figure 7.
Based on the characteristics of Kafka and MR, for each Partition of a Kafka topic, the assembly of mapper InputSplit, and then start a Mapper process to consume this batch of Kafka data, After data parsing, business logic processing, verification and filtering, the HDFS file is finally written to the target according to partition rules. After landing, the meta information (including topic, partition, start offset, end offset) will be stored in MySQL. The next time the processing starts, the message is read from the offset at the end of the last processing, and a new batch of data consumption is started.
After the basic functions are implemented, it is inevitable to encounter some problems. For example, the data magnitude of different business topics is different. As a result, a task needs to wait for the mapper with the largest partition data volume and the longest processing time to finish before the whole task is finished. So how do we solve this problem? In the system design, there is an unwritten principle: long separation must be integrated, and long integration must be divided. For the problem of data skewness, we adopted a similar idea.
Figure 8.
First, the partition with small data magnitude is merged into an InputSplit, so that one mapper can process the partition data of multiple services, and finally write multiple files.
Figure 9.
In addition, partition of large data magnitude can be divided into multiple mappers to process the same partition. In this way, mapper processing is more balanced and can better deal with the sudden increase of service magnitude.
In addition to data skewness, data dump to HDFS fails due to various reasons, such as Kafka disk problem, Hadoop cluster node breakdown, network failure, external access rights, etc. The HDFS file may be damaged because it is not closed, and the data needs to be rerun. Then our data time partition is basically in the unit of days, the original way may lead to a day granularity of the file damage, parsing can not be read.
Figure 10.
We adopted a two-stage processing method: Mapper 1 first writes data to a temporary directory, mapper 2 appends data from the Hdfs temporary directory to the target file. If mapper1 fails, the batch can be rerun directly instead of rerunning the whole day’s data. When Mapper2 fails, the final file can be replaced directly from the temporary directory merge data, reducing the process of re-etL day granularity.
In the real-time distribution of data, the data written to Kafka1 is basically the full amount of data for each business, but for the demand side, most businesses only focus on a certain event, a small category of data, rather than any business consuming the full amount of data for processing, so we added a real-time distribution Databus to solve this problem.
Figure 11.
Databus enables businesses to customize the distribution of rules to write data to the downstream Kafka cluster, making it easier for businesses to subscribe to processing the data they want, and allowing for smaller granularity of data reuse.
Figure 12
Figure 12 shows the implementation of Databus, whose body implements the Databus Topology based on Storm. Databus has two spouts, one that supports pulling full quantities and new rules and then updates to downstream distribution bolt to update cache rules, and one that consumes spout from Kafka. Distributionbolt is responsible for parsing the data, rule match, and sending the data downstream to the Kafka cluster.
Open data
With raw data and the ability to do offline, real-time data development, the demand for data development has exploded and data development teams have been overwhelmed. Therefore, we open the data computing and storage capacity through the data platform, and give the business side the ability to develop data.
The implementation of metadata management, task scheduling, data integration, DAG task scheduling, visualization and so on are described in detail. The practice experience of meitu on stability after data opening is mainly introduced.
Data opening and system stability are in a relationship of love and murder. On the one hand, after opening, it is no longer done by researchers with data base, and they often encounter illegal data tasks with high resource consumption, which causes great trouble to the stability of the underlying computing and storage cluster. On the other hand, it’s also because of data openness that we are pushing forward and we have to improve system stability.
For a number of high resource, illegal tasks, we first consider whether the HiveSQL level can do some verification, restrictions. Figure 13 shows the entire process of parsing and compiling HiveSQL into an executable MR:
Figure 13
First do syntax parsing based on Antlr, generate AST, then do semantic parsing, based on AST will generate JAVA object QueryBlock. Based on QueryBlock, the logical plan is generated and then the logical optimization is done. Finally, the physical plan is generated. After the physical optimization, it is finally transformed into an executable MR task.
We mainly generate QueryBlock in the semantic parsing phase, get it to do a lot of statement verification, including: illegal operation, query condition restriction, high resource consumption verification judgment.
The second stability practice, focusing on cluster optimization, includes:
-
We did a complete upgrade to the Hive and Hadoop clusters. This is mainly because some problems were fixed and some community patches were merged in the earlier version, which were fixed in the later new version. Another reason is the feature and performance optimization of the new release. We upgraded Hive from 0.13 to 2.1 and Hadoop from 2.4 to 2.7;
-
HA deployment is optimized for Hive. We split HiveServer and MetaStoreServer apart and deployed multiple nodes separately, so as to avoid the interaction between the combined service deployment and operation in one service.
-
Before, the execution engine was basically On MapReduce. We are also implementing Hive On Spark migration, gradually switching online tasks from Hive On MR to Hive On Spark.
-
Pull an internal branch to do bugfix or merge community patch features for some problems encountered at ordinary times;
The last part of the practice in terms of platform stability is to improve permissions, security, preventing unauthorized access to clusters, data, attacks, etc. There are two main parts to improve permissions: API access and cluster.
Figure 14
-
API Server: as mentioned above, we have OneDataAPI, which provides a unified API for various business systems to access data. In this aspect, an additional unified authentication CA service is implemented. The business system must access the CA to obtain the token and then access the OneDataAPI. OneDataAPI is legally allowed to access data only after being verified by the CA, so as to prevent the business system from accessing all data indicators at will.
-
Cluster: At present, it is mainly based on Apache Ranger to unify all kinds of clusters, including Kafka, Hbase, Hadoop, etc., for the authorization management and maintenance of clusters.
These are some practices and optimization of the stability of meitu’s data platform after it has been built and opened to various business lines.
Then let’s make a simple summary of the data platform construction process.
-
First of all, before building the data platform, we must first understand the business, to see whether the overall volume of the business is relatively large, whether the business line is relatively wide, whether the demand is so much as to seriously affect our productivity. If the answer is yes, then we can consider building a data platform as soon as possible to improve the efficiency of data development and application more efficiently and quickly. If the business magnitude and demand are not large, it is not necessary to set up a set of big data or build a perfect data platform to quickly meet the business support priority.
-
In the process of platform construction, it is necessary to focus on data quality and stability of the platform, such as the integrity, timeliness and unique identification of equipment of data source collection, and to optimize and practice the stability of the platform, so as to provide a stable and reliable platform for business parties.
-
After improving the efficiency of analysis and decision and gradually expanding the scale, we need to do some optimization and thinking about the cost and resources.