This article is shared by Xu Shuai, head of Bigo computing platform, and mainly introduces the construction practice of Bigo real-time computing platform. The contents include:
- Bigo real-time computing platform development history
- Features and Improvements
- The business scenario
- efficiency
- Conclusion outlook
I. Development history of Bigo real-time computing platform
Today, I would like to share with you the construction process of Bigo real-time computing platform, some problems we solved in the construction process, as well as some optimization and improvement. The first part is the development of Bigo real-time computing platform.
Let’s start with a brief introduction to Bigo’s business. It has three major apps: Live, Likee and Imo. Among them, Live offers Live streaming services to users around the world. Likee, an App that creates and shares short videos, is very similar to Kuaishou and Douyin. Imo is a global free communication tool. These major products are all about users, so our business is all about improving conversion and retention. And real-time computing platform as the basis of the platform, mainly for the above business services, Bigo platform construction should also focus on the above business scenarios to do some end-to-end solutions.
The evolution of Bigo real-time computing can be divided into three phases.
- Before 2018, there were very few real-time jobs, and we used Spark Streaming to do some real-time business scenarios.
- From ’18 to’ 19, with the rise of Flink, it was widely believed that Flink was the best real-time computing engine, and we started using Flink, discrete development. Each line of business builds its own Flink for easy use.
- Starting in 2019, we have unified all the businesses using Flink onto Bigo real-time computing platform. After two years of construction, all real-time computing scenarios are currently running on Bigo platform.
As you can see below, this is the current state of Bigo’s real-time computing platform. At the Data Source end, our Data are all user behavior logs, mainly from APP and client. There are also some user information in MySQL.
This information will pass through the message queue, and finally collected in our platform. The main message queue is Kafka, and Pulsar is increasingly being used. However, MySQL logs mainly enter the real-time computing platform through BDP. In the real-time computing platform, the bottom layer is also based on the more commonly used Hadoop ecosystem to do dynamic resource management. The engine layer above has been unified to Flink, where we do some of our own development and optimization. In this one-stop development, operation and maintenance and monitoring platform, we have built an internal BigoFlow management platform. Users can develop, debug and monitor on BigoFlow. Finally, for data storage, we also connected with Hive, ClickHouse, HBase, etc.
Bigo real-time computing platform features and improvements
Let’s take a look at the features of the Bigo computing platform and the improvements we’ve made. As a growing company, our focus on building our platform is to make it as easy for business people to use as possible. So as to promote the development of business and expand the scale. We hope to build a one-stop development, operation and maintenance, monitoring platform.
First of all, On BigoFlow, users can be very convenient development. Our development features and improvements include:
- Powerful SQL editor.
- Adjust and configure the graphical topology.
- One-click multi-cluster deployment.
- Unified version management and convergence as possible.
In addition, we have made many improvements in operation and maintenance:
- Perfect savepoint management mechanism.
- Logs are automatically collected to ES, with built-in error checking rules.
- The task history is saved for easy comparison and problem tracking.
Finally, monitoring, our features are as follows:
- Monitoring is automatically added, and users do not need to manually configure it.
- Automatically analyzes resource usage and recommends appropriate resource allocation for users.
Our metadata is stored in three main places. These are Kafka, Hive, and ClickHouse. Currently we are able to access metadata across all storage systems. This will greatly facilitate users, while reducing the cost of use.
- Kafka metadata, once opened, can be imported once, unlimited use, no DDL required.
- Flink and Hive are also fully connected. Users can use Hive tables without DDL.
- ClickHouse also tracks Kafka’s topics automatically.
What we offer today is not just a platform, but an end-to-end solution in a common scenario. In the ETL scenario, our solutions include:
- Universal rBI fully automated access.
- Users do not need to develop any code.
- Data is stored in Hive.
- Automatically update meta.
In terms of monitoring, our features include:
- Automatic data source switchover.
- The monitoring rules remain unchanged.
- The results are automatically stored in Prometheus.
The third scenario is the ABTest scenario. Traditional ABtests are offline and can produce results after another day. So today we will convert ABTest to real-time output, and greatly improve the efficiency of ABTest through the integration of stream and batch.
The improvement of Flink is mainly reflected in the following aspects:
- First, at the connector level, we have customized many connectors and connected with all the systems used by the company.
- Second, in terms of data formatting, we have very complete support for Json, Protobuf and Baina formats. Users do not need to do their own parsing, direct use can be.
- Third, the company is ahead of the community in Hive usage because all of its data is directly in Hive. Includes streaming reading, EventTime support, dimension table partitioning filtering, Parquet complex type support, and more.
- Fourth, we have also made some optimizations at the State level. Includes SSD support, as well as RocksDB optimizations.
3. Typical business scenarios of Bigo
The traditional repository is Kafka to Flume, then Hive, and finally ClickHouse. Of course, most of ClickHouse is imported from Hive, and some of ClickHouse is written directly from Kafka.
This link is very old and has the following problems:
- First, unstable, flume once there is an exception, often there will be data loss and repetition.
- Second, the expansion ability is poor. In the face of sudden traffic spikes, it’s hard to scale.
- Third, business logic is not easy to adjust.
So after we built Flink, we did a lot of work. To replace the original Flume to Hive process, today all ETL is through Kafka, and then through Flink, all dots will enter Hive offline data warehouse, as a historical storage, so that data is not lost. At the same time, because many jobs require real-time analysis, we go directly from Flink to the ClickHouse real-time data bin on another link.
In the process, we did some core transformation, divided into three pieces. First of all, in terms of user access, our transformation includes:
- Keep it as simple as possible.
- Universal dotting automatic.
- The meta message is cleared without DDL.
In addition, in the aspect of Flink itself, our transformation includes:
- Parquet writes optimizations.
- Concurrency adjustment.
- SSDS support large – status jobs.
- RocksDB optimization for better control of memory.
Finally, in the data Sink area, we did a lot of customization to support both Hive and ClickHouse.
Iv. Efficiency improvement brought by Flink to the business
Here are some changes we made in the ABTest scenario. For example, after all the data has fallen to Hive, an offline computation can be started, which may go through countless workflows before eventually producing a large and wide table. There may be many dimensions on the table, recording the results of grouping experiments. After the data analyst gets the results, he analyzes which experiments are better.
While this structure is simple, the process is too long, the results are late, and it is not easy to add dimensions. The main problem is Spark, which has countless workflows to execute, and one workflow can’t be scheduled until the other is finished. And offline resources are not very well guaranteed. The biggest problem we had was that the results of ABTest on one day could not be output until the afternoon of the next day. Data analysts often reported that they could not work in the morning and could only start the analysis when they were about to leave work in the afternoon.
So we started using Flink’s real-time computing power to solve the timeliness problem. Unlike Spark tasks that wait for a result to be output, Flink is consumed directly from Kafka. We can basically get the results in the morning. However, at that time, because the final output of the result has many dimensions, maybe hundreds of dimensions, State is very large at this time, and OOM is often encountered.
Therefore, we took a compromise in the first step of transformation. Instead of directly using Flink to join all dimensions in one job, we split it into several jobs. Each job calculates a portion of the dimensions, then makes a join using HBase and imports the join results into ClickHouse.
In the process of renovation, we found a problem. It may be necessary to adjust the logic frequently, and then check the result after tuning, so it requires a 1-day time window. If you read history data directly, Kafka has to store data for a long time. When reading history data, Kafka has to read it to disk, which puts a lot of pressure on Kafka. If you do not read the history data, because only zero can be triggered, then you change the logic today and have to wait a day to see the result, resulting in very slow debugging iterations.
As mentioned earlier, all of our data was in Hive. At that time, it was version 1.9, so we supported streaming data from Hive. Because all of this data is triggered by EventTime, we support EventTime on Hive. For the sake of stream batch unification, Spark is not used here because two sets of logic need to be maintained if Spark is used for job verification.
On Flink, we use stream batch integration to do offline data replenishing or offline job verification. And the real-time one is used for the production of daily operations.
As mentioned earlier, this is a compromise solution because it relies on HBase and does not fully utilize the capabilities of Flink. So we did a second round of transformation to completely eliminate the dependency on HBase.
After the second iteration, today we are able to hold the day level of window trading on Flink. This unified streaming batch solution is already online. We calculate the entire large and wide table directly through Flink, write the results directly to the ClickHouse after the daily window triggers, and basically produce the results in the early morning.
During the whole process, our optimization of Flink includes:
- State SSDS are supported.
- Read Hive streams and support EventTime.
- Hive dimension table join supports partition load.
- Complete ClickHouse Sinker.
After optimization, our hour-level tasks were no longer delayed, and the completion time of day-level was advanced from afternoon to before work, which greatly accelerated the iteration efficiency.
V. Summary and prospect
Summarize the current state of real-time computing in Bigo. First, very close to the business. Second, it works seamlessly with all of the ecology used in the company, essentially eliminating the need for users to do any development. Additional, real-time number storehouse already showed rudiment. Finally, our scene is not rich enough compared with dachang. For some typical real-time scenarios, many businesses have not really switched to real-time scenarios because the business requirements are not so high.
Our development plan has two big pieces.
- The first is to expand more business scenarios. These include real-time machine learning, advertising, risk control and real-time reporting. In these areas, more efforts should be made to promote the concept of real-time computing and connect with the business.
- The other piece is on top of Flink itself, we have a lot of scenes to do internally. For example, large Hive dimension table joins, automated resource configuration, CGroup isolation, and more are supported. So that’s what we’re going to do in the future.