Share the guest
Dasha is a senior technical specialist at Alibaba, responsible for real-time computing of Flink SQL. He previously worked at Facebook USA, Apache Flink Committer.
Real-time computing in Alibaba
Since 1999, Ali has been continuously expanding its business from e-commerce platforms, and has derived many products in various fields such as finance, payment, logistics and entertainment, such as e-commerce platforms based on Taobao and Tmall, Ali Mom advertising platform, Ant Financial Alipay, Ali Cloud and Dayentertainment. Today, Alibaba is not just an e-commerce platform, but a huge application ecosystem. Alibaba is now the world’s largest e-commerce platform, with revenue of $550 billion in fiscal 2016. There are 500 million users on alibaba’s platform, equivalent to one third of China’s population, and nearly 10 million users make transactions through alibaba’s platform every day.
Alibaba has become a huge commercial aircraft carrier, on which a large number of users and applications will inevitably generate a large amount of data. At present, Alibaba’s data level has reached EB level, the daily growth of PB level, real-time computing daily peak processing data volume can reach 100 million per second, this year’s Double 11 is a staggering 470 million per second.
Real-time computing is widely used within Alibaba. With the emergence and development of new economies, technological innovation and user demand improvement, people increasingly need the ability of real-time computing. Its biggest advantage is that it can update the status and results of big data processing based on real-time changing data. Next, I will give two examples to illustrate the application scenarios of real-time computing in Alibaba:
1. Double 11 large screen
Alibaba aggregates valuable data to present to the media on November 11 every year, and the LARGE SCREEN of GMV is one of them. The whole GMV large screen is a very typical real-time calculation. Every transaction data is aggregated and displayed on the large screen. Data is written to the DataBase, processed in HBase in real time, and displayed on a large screen. The link in the whole process is very long. There are many challenges to the overall app:
1) Large screen display requires second-level delay, which requires real-time calculation of sub-second delay
2) A large amount of data on Double 11 needs to be aggregated in one Job
3)Exactly-Once keeps the accuracy of data calculation
4) The system is highly available, and there is no lag or unavailability
This application scenario has a very high SLA, requires second latency and data accuracy, but it is not complicated to calculate, and more complex applications will be introduced next.
2. Real-time machine learning
Machine learning generally has two important components: Feature and Model. Traditional machine learning uses batch computing to collect features and train models, so the update frequency is too low to adapt to the needs of constantly changing data applications. For example, on Double 11, commodity prices and activity rules are completely different from normal times, so training based on previous data cannot achieve optimal results. Therefore, only by collecting features and training models in real time can satisfactory results be fitted. To do this, we developed a real-time machine learning platform.
This real-time machine learning platform mainly consists of two parts: real-time Feature calculation and real-time Model calculation. The system also has a number of challenges, including the following:
1) Machine learning requires all kinds of Metrics, and there are many DataSource’s
2) Multiple dimensions, such as user dimension and commodity dimension. The stacking of dimensions and even the Cartesian product leads to the final Metrics being massive, the states being huge
3) Machine learning is complex and consumes a lot of CPU
4) Some data cannot be stored in State, which requires external storage and a large number of external IO
3. Real-time A/B Testing
A user’s Query can also change over time. A typical example is real-time A/B Testing.
Algorithm engineers will involve a variety of models when tuning models, and different models have different calculation modes and methods, resulting in different calculation results. Therefore, there are often different Query subscriptions to real-time data, and the results are generated and the Model is iterated based on user feedback to finally get the optimal Model. The challenge of A/B Tesing is that algorithm engineers tend to calculate A lot of Metrics, and all Metrics are calculated in real time, which wastes A lot of resources.
In view of this challenge, we designed the framework development platform of A/B Testing. It’s used to synchronize Metrics that are of interest to algorithm engineers, aggregate them, and send them to the Druid engine. In this way, the algorithm engineer washes the data to Druid for different jobs, and finally performs statistical analysis on different Metrics on Druid to find the optimal algorithm Model.
To sum up, real-time computing has the following challenges within Alibaba:
1) Large business, multiple scenarios and a large number of machine learning requirements lead to complex computing logic
2) Large amount of data, many operations, so the whole real-time computing machine scale is very large
3) To ensure low latency and data accuracy, while meeting the requirements of high throughput
Selection and optimization of Flink
To address these challenges, we investigated a number of computing frameworks and finally chose Flink for the following reasons:
1.Flink has introduced and designed State in a good way, and complex logical calculations based on State such as JOIN can be well described
2.Flink introduced chandy-Lamport algorithm, which can perfectly realize Exactly-Once and achieve high throughput under low delay.
However, Flink still has many defects in State, Chandy-Lamport algorithm and other aspects, for which Ali opened a project named Blink.
Blink is a combination of open source Flink and Alibaba Improvement, mainly divided into two parts:
1.BlinkRuntime
When Flink is used by different companies, there are many differences in storage, scheduling and underlying optimization. Alibaba blink also makes a lot of personalized optimization for Runtime, which is not consistent with Apache Flink community. We call it Blink Runtime.
2.Flink SQL
Native Flink only has a relatively low-level DataStream API. Users need to design and implement a lot of code when using DataStream. In addition, DataStream itself has some design flaws. For the convenience of users, the Alibaba team designed Flink SQL for streaming computing and pushed it back to the community. Blink SQL is named Flink SQL instead of Blink SQL, mainly because Blink and Flink are completely unified with the community in SQL user API. In addition, most functions of Apache Flink are contributed by Alibaba, so Flink SQL is Blink SQL. There’s no big difference.
BlinkRuntime core optimization decryption
1. Deployment and model optimization
Optimization includes the following:
1) Solve the problem of mass deployment. A Cluster in Flink has only one JobMaster to manage all jobs. As the number of jobs continues to increase, a single Master cannot accept more jobs, resulting in a bottleneck. Therefore, we restructured the architecture so that each Job has its own Master.
2) In early Flink, TaskManager managed many tasks, and problems of one Task would cause TaskManager to crash, thus affecting other jobs. We have enhanced Job isolation by giving each Job its own TaskManager.
3) Import ResourceManager. ResourceManager communicates with JobMaster to dynamically adjust resources in real time to achieve optimal cluster deployment.
4) We have applied these optimizations not only to YarnCluster, but also to Mesos and Standalone deployments.
With this work, Flink can be applied to large-scale cluster deployments.
2.Incremental Checkpoint
Real-time computing requires constant checkpoint time to preserve computing status. Flink’s early checkpoint design had a defect. At each checkpoint, it would read all the old status data, merge it with the new data, and write it to disk in full. As the State increases, the number of data reads and writes required at each checkpoint is huge. As a result, the checkpoint interval of the Job must be larger than one minute. The larger the checkpoint interval is, the larger the calculation rollback during failover is, and the more serious the data delay is.
In order to reduce checkpoint interval, we propose Incremental checkpoint design. At checkpoint, only incremental state changes are stored. Historically, the data at each checkpoint has been saved, and later checkpoint data only needs to be stored. In this way, the amount of data to be updated at each checkpoint is very small, so that the checkpoint can be completed in several seconds. This greatly reduces the latency that can be caused by a failover.
3. The asynchronous I/o
Most of the time we have to put the data in external storage, so we need to read the data through network IO during the calculation. Traditionally, sync-IO is used to wait until the next data request is returned, which wastes CPU resources because the CPU is waiting for the network IO request to return most of the time. Sync-io does not maximize CPU resource utilization, which significantly affects CPU throughput per CPU. In order to improve the computational throughput, we design the data reading framework of Async-IO, which allows asynchronous multithreading data reading.
After each data request is sent, the next data request is continued without waiting for the data to return. When the data request is returned from the external store, the computing system calls the callback method to process the data. If the data computation does not require order preservation, the data will be computed quickly after it is returned. If the user needs to preserve the order of data calculation, we use buffer to temporarily store the first data and send them in batches after all the front data arrives. After using async-IO, the computational throughput can be increased by dozens or even hundreds of times according to the set buffer size, which greatly improves the unit CPU utilization and overall computing performance.
It’s worth noting that all of the Blink Runtime optimizations described above have been contributed to the Apache Flink community.
Flink SQL core function decryption
1. Alibaba completed 80% of the R&D work of Apache Flink SQL
1)SQL is a very general descriptive language, which is suitable for users to easily describe the needs of jobs.
2)SQL has a better optimization framework, so that users only need to focus on the design of business logic without concern about state management, performance optimization and other complex design, which greatly reduces the threshold of use.
3)SQL is easy to understand and suitable for people in different fields. SQL users often do not need much computer programming foundation, from product design to product development of all kinds of personnel can quickly master the use of SQL.
4) THE API of SQL is very stable, which can be used without modifying the user’s Job when upgrading the organization or even changing the computing engine.
5) Some application scenarios require streaming update and batch validation. Use SQL to unify batch and stream computed query. To actually implement a Query, the same result.
2. Stream vs. batch
To design streaming SQL that is consistent with batch processing, you need to understand the difference between streaming and batch processing. The core difference between the two is that the data in stream is infinite and the data in batch is finite. This essential distinction leads to three more specific distinctions:
1) Stream processing is constantly producing results without ending, and batch processing tends to only return a final result and end. For example, if you want to count the transaction amount of Singles’ Day, using batch calculation, you need to start counting the total amount spent by all buyers after all transactions on singles’ Day and get a final number. Streaming, on the other hand, requires tracking transactions in real time, calculating and updating the results in real time.
2) Checkpoint and state retention are required for flow calculation, so that failover can be rapidly continued. Batch computing, because its input data is often persisted, does not require state retention.
3) The stream data will be updated constantly. For example, the total amount of a buyer’s spending is constantly changing, while the batch data is the total amount of a day’s spending, which is fixed and will not change. Stream data processing is an advance observation of the final result, which usually requires Retraction of the result calculated in advance to make changes, while batch computing does not.
3.Query Configuration
None of the differences mentioned above relate to the user’s business logic, which means they do not reflect differences in SQL. We think these differences are just attributes of a job. In order to describe some properties unique to flow computing, such as when to generate flow results and how to preserve state, we design a Query Configuration that allows users to configure. It consists of two parts:
1.Latency SLA
Defines the delay from data generation to presentation, such as double 11 large screen is second level. Users can configure different SLAs according to their own needs. Our SQL system will optimize according to the requirements of SLA, so as to achieve the optimal system performance while meeting the needs of users.
2.State Retention/TTL
Stream computing is never stopped, but states in stream data do not need to be retained for a long time, which is bound to be a waste of storage and greatly affect performance. So we allow the user to set a reasonable TTL (expiration time) for better computing performance.
We use Query Configuration to describe some of the properties that differ between streams and batches. Next we need to consider how to design streaming SQL.
4. Dynamic-table
In the figure on the left is the input stream. We generate dynamic-table for each piece of data, and then send out the changes of the Table using Changelog. After these two changes, the data in the input and output streams remains the same, proving that there is no semantic or data loss with the introduction of dynamic-table.
With the concept of dynamic tables, we can apply traditional SQL to the flow. It is worth mentioning that dynamic-table exists virtually and does not require actual storage to land. Let’s look at another example:
As shown, we perform continuous queries when there is an input stream. We understand Stream as a dynamic-table. A Dynamic query generates a new dynamic-table based on dynamic-table. If a new dynamic-table is needed, a Stream can be generated. Here, the left and right streams have been transformed because of the aggregation calculation of continuous queries. In short, the introduction of dynamic tables gives us the ability to do continuous SQL queries on streams.
5.Stream SQL is unnecessary
From the above discussion, we found that with dynamic-table we did not need to create any new semantics for streaming SQL. Therefore, we conclude that streaming SQL is unnecessary. ANSI SQL is perfectly capable of describing the semantics of Stream SQL, and maintaining the standard semantics of ANSI SQL was one of our basic principles in building Flink SQL.
6.ANSI SQL function implementation
Based on the above theoretical foundation, we then implemented several ANSI SQL functions required for flow computing, including: DML, DDL, UDF/UDTF/UDAF, Join Join, Retraction, Window aggregation, etc., in addition to these functions, we also do a lot of query optimization, so as to ensure that Flink SQL can meet the needs of users of various queries, and at the same time have excellent query performance. Here are a few of them:
1)JOIN
Streams and dynamic tables have duality. An SQL join that appears to be a join of a Table is actually a join of a stream.
For example, the realization principle of Inner Join is as follows: the data will come from either side of the input stream, the data on one side will be stored in the State first, and the State on the other side will be queried according to Joining key. If there is, the result will be output, and the result will not be output until the opposite data comes.
In a word, the two streams have two states. The data on one side is saved and waited for the data on the other side after arrival. After arrival of all the data, the inner join produces the result. In addition to joins for the two streams, we also introduced joins for streams and external tables. Our machine learning platform stores a large amount of data in HBase, and querying HBase data is actually joining an external table. There are usually two modes for joining external tables:
A)Look up Query the external table immediately as the stream data arrives to get the results.
B) the Snapshot. When the stream data arrives, the version information of snapshot is immediately sent to the external storage service to query the data. The external table store returns the result according to the version information.
It is worth mentioning that the flow and external table association function we designed did not introduce any new syntax and was implemented according to SQL-2011 standards. The same query applies to batch computing.
2)Retraction
Retracting is an important concept in stream computation. here is an example: calculating word frequency
The calculation of word frequency is to count the frequency of all English words and finally count the number of different words at different frequencies according to the frequency. For example, if the initial state of a statistic has only three words “Hello World Bark” and each word appears only once, then the final result of word frequency is that there are three words with occurrence frequency of 1 (none with occurrence frequency of other times), so the result table has only one line “1 — 3”. When the word is updated and another Hello is added, we insert a new row of “2 — 1” into the result table of word frequency because the frequency of “Hello” becomes 2 times.
Obviously, if there is one word that appears twice, then “2 — 1” is correct, but the number of words that appear once is already wrong, and should be two, not three. The essence of this problem is that the output result of stream calculation is an observation of calculation in advance. With the continuous update of data, the calculation result will inevitably change, which requires us to make retraction of the previous result and then send out the updated result, otherwise the data result will not be wrong. For the example above, when the frequency of Hello changes from 1 to 2, not only do we need to insert the “2 — 1” row in the result table, but we also need to undo the update on the “1 — 3” row.
It is worth noting that the user does not need to be aware of when to withdraw and when not to use the SQL Query Optimizer. The user only needs to describe his business calculation logic in SQL. As shown in the figure, the first scenario does not need to be withdrawn and the second does, which is entirely up to the optimization framework and not the user. This is a great example of the benefits of using SQL and taking advantage of the natural optimization framework that exists in SQL.
3) the Window aggregation
Window aggregation is an important capability of Flink SQL. In this example, we do aggregate statistics for each hour of data. In addition to this Tumble Window we also support Sliding Window and Session Window. User-defined Windows will also be supported in the future.
4) Query Optimization
In addition to adding new features, we also did a lot of query optimization. Such as micro – batching. Without micro-batching, processing each piece of data would be accompanied by several IO reads and writes. With micro-batching we can process thousands of pieces of data in a few IO processes. In addition, we have also done a lot of filter/ Join/Aggregate Pushdown and TopN optimization. Here are some examples to explain TopN optimization:
As shown in the figure above, we want to take the top three cities by sales. There are two underlying implementations of the user’s Query:
A) One way is to sort all the saved cities when no data comes, and then intercept the first three cities. This design will rearrange all cities for each data and new data, which is bound to waste a lot of computing resources.
B) Our Query Optimizer automatically recognizes the Query statement and optimizes the calculation. In the actual execution process, only the top three cities need to be constantly updated, which greatly optimizes the calculation complexity and improves performance
Alibaba real-time computing application
We developed two computing platforms based on stream computing SQL.
1. Alibaba Cloud Stream computing development platform
One is Alibaba’s streamCompute platform, which allows users to write SQL and debug debug within the platform. After correct debugging, users can directly publish their jobs through this platform and deploy them on Ali Cloud cluster. After deployment, check the operation and maintenance online. Therefore, this platform integrates all the requirements of real-time computing and integrates development, Debug, online deployment, operation and maintenance into one, greatly accelerating the efficiency of user development and online. It is worth mentioning that the vast majority of alibaba Group’s real-time computing jobs were released through this platform during The Double 11 in 2017. Since September this year, we have opened the platform to external enterprises through Ali Cloud, including public cloud and private cloud, so that they can use Alibaba’s real-time computing capabilities.
2. Porsche, alibaba’s real-time machine learning platform
In order to facilitate the algorithm students to develop machine learning tasks, we designed and implemented an online machine learning platform — Porsche, which is oriented to algorithm personnel and supports visual self-service development operation and maintenance based on Flink SQL and Hbase. As shown in the figure above, the user visually drags components onto the canvas in the Porsche platform’s IDE, configates component properties, and defines the complete computational DAG. This DAG is translated into SQL and eventually submitted to Blink for execution. In addition, it is worth mentioning that the Porsche platform also supports Tensorflow. This year’s Double 11 is also brilliant. The platform eliminates the cost of learning SQL for algorithm students and is only open to internal users for the time being.
Double 11 real-time calculation summary
The figure above shows alibaba’s real-time computing architecture, with thousands of physical machines at the bottom, Resource Management and Storage deployed uniformly on top, and Blink Runtime and Flink SQL. Users submit jobs via StreamCompute and Porsche platforms, and now hundreds of engineers within Ali support nearly a thousand Flink SQL jobs. That’s the state of alibaba’s real-time computing.
With the help of real-time computing, Alibaba Double 11 achieved a brilliant result of 168.2 billion yuan. The contribution of real-time computing is mainly reflected in the following points:
1. This Double 11 is the largest concurrency in Internet history. The real-time aggregation statistics operation of hundreds of thousands of transactions and payments per second is all brought by Blink calculation
The presentation of 10 billion Data in 2.3 minutes and 01 seconds not only requires high throughput of Data Base, but also tests the speed of real-time computing
3. The algorithm platform helped the algorithm students achieve good search and recommendation effects, and achieved the overall GROWTH of GMV
In short, real-time computing not only meets the diverse needs within Alibaba, but also improves GMV. We hope to export Blink real-time computing capacity to all enterprises outside Ali through StreamCompute, so that they can benefit from it. That’s all for this sharing, thank you.