This article describes SmartNews’s practice of using Flink to speed up Hive calendar production and seamlessly integrate Flink into Airflow and Hive dominated batch systems. The technical challenges encountered in the process and solutions are detailed for the community to share. The main contents are:
- Project background
- Definition of the problem
- Project objectives
- Technology selection
- Technical challenges
- Overall plan and challenge response
- Project results and prospects
- Afterword.
I. Project background
SmartNews is a machine learning-driven Internet company. Founded in 2012 in Tokyo, Japan, the company has offices in the United States and China. After more than 8 years of development, SmartNews has grown to be the no.1 news app in Japan and the fastest growing news app in the US, covering more than 150 countries around the world. As of early 2019, SmartNews has been downloaded more than 50 million times worldwide for iOS and Android.
SmartNews has built large datasets based on Airflow, Hive, EMR and other technology stacks over the last 9 years. As the volume of data grows, the processing time of these offline tables gradually lengthens. In addition, with the acceleration of iteration rhythm of business side, the real-time performance of tables is also put forward higher requirements. Therefore, SmartNews internally initiated Speedy Batch’s project to speed up the production efficiency of existing offline tables.
This sharing is an example of a Speedy Batch project that accelerates the practice of user actions tables.
User behavior logs reported by the APP and daily tables generated by Hive jobs are the source of many other tables. This job requires three hours to run, which in turn increases Latency for many downstream tables, significantly impacting the experience of users such as data scientists and product managers. So we need to speed up these operations to make the tables available earlier.
The company’s services are basically on the public cloud. The original server logs are uploaded to the cloud storage in the form of files and partitioned on a daily basis. The current job is scheduled to run on EMR using Airflow to generate Hive daily table and the data is stored in cloud storage.
Ii. Definition of the problem
1. The input
The news server uploads a raw log file every 30 seconds to the cloud storage directory of the corresponding date and hour.
2. The output
Raw logs are partitioned by day (DT) and action (action) after being processed by ETL. Action types of about 300, not fixed, often increase or decrease.
3. The user
The use of this table is extensive and multi-channel. There are queries from Hive as well as Presto, Jupyter, and Spark, and we’re not even sure that’s all there is to it.
Objective of the project
- Shorten the delay for actions tables from 3 hours to 30 minutes;
- Be transparent to downstream users. Transparency has two aspects:
- Function: users do not need to modify any code, do not feel completely
- Performance: Tables generated by a new project should not degrade performance when read downstream
Four, technology selection
Before this project, my colleague has made several rounds of improvement on this homework, but the effect is not very significant.
Solutions tried included adding resources and putting in more machines, but encountered cloud storage IOPS limitations: Each prefix supports a maximum of 3000 concurrent reads and writes. This problem is particularly obvious in the output stage, that is, when multiple reducer output to the same action subdirectory at the same time, this limitation is easy to be encountered. In addition, we also tried to pre-process by hour, and then merge into the daily table at dawn every day, but the merging process also took a lot of time, the overall delay was still around 2.5 hours, the effect was not significant.
Since server-side logs are uploaded to the cloud in near real time, the team proposed streaming processing, which eliminates the day-long, three-hour wait for batch jobs and instead spreads computations throughout the day, reducing the processing time at the end of the day. The team had a good background with Flink and Flink’s recent improvements to Hive, so they decided to adopt a Flink-based solution.
5. Technical challenges
The challenges are manifold.
1. Output the RC file format
The current Hive table file format is RCFile. In order to ensure transparency to users, we can only do in-place upgrade on the existing Hive table, that is, we need to reuse the current table, then Flink output file format should conform to RCFile format. A Hive table can have only one format.
RCFile belongs to bulk format (corresponding to Row format) and must be output at each checkpoint. If we checkpoint every 5 minutes, each action must output a file every 5 minutes, which greatly increases the number of resulting files and affects downstream read performance. Especially for low-frequency actions, the file count increases a hundredfold. We know about Flink’s file merge function, but it’s a merge of multiple sink data within a checkpoint, which doesn’t solve our problem, what we need is file merge across checkpoint.
The team considered exporting in row format (e.g. CSV) and then implementing a custom Hive SerDe compatible with RCFile and CSV. But we quickly abandoned this idea because it would have required the Hybrid SerDe to be implemented for each query scenario, such as Presto, Spark, and so on.
-
On the one hand, we can’t put so many resources into it;
-
On the other hand, that kind of solution is also user friendly, after all, users still need to install the custom SerDe.
We proposed to generate a table in a new format, but it was also rejected because it was not transparent to users.
2. The perceptibility and integrity of Partition
How can downstream jobs sense that the partition is ready on that day? The actions table is divided into two levels: partition, DT and action. Actions Belong to the Dynamic partition of Hive. The number of actions is large and unfixed. The current Airflow downstream job is waiting for the insert_Actions Hive task to complete. This is fine, because when insert_actions ends, all the partitions for the action are ready. However, for Flink jobs, there is no end signal, it can only submit partition after partition to Hive, such as DT =2021-05-29/action=refresh. Due to the large number of actions, the partition submission process can take several minutes, so we should not allow the Airflow job to be aware of a DT level partition, which would likely trigger downstream with only partial actions.
3. Read cloud storage files in streaming mode
The input to the project is constantly uploaded cloud storage files, not from MQ (Message Queue). Flink supports FileStreamingSource, which can stream in files, but that’s based on the timed list directory to find new files. However, this solution is not suitable for our scenario, because our directory is too large to complete the cloud storage list operation.
4. Exactly Once
Because of the importance of the Actions table, the user cannot accept any data loss or duplication, so the entire solution needs to be processed exactly once.
6. Overall plan and challenge response
1. Output RCFile and avoid small files
The solution we finally chose was a two-step process, with the first Flink job output in JSON (Row Format) and then another Flink job to convert JSON to RC format. This solves the problem that Flink cannot happily output RC files of the appropriate size.
Output json intermediate results, so we can control the size of the output file with Rolling Policy. We can save the output file across multiple checkpoints to be large enough, or long enough, and then export it to the cloud storage. Flink uses the Multi Part Upload (MPU) function of cloud storage. At each checkpoint, Flink also Upload data saved at the current checkpoint to the cloud storage, but the output is not a file, but a Part. Finally, when multiple parts reach the required size or time, the cloud storage interface can be invoked to merge them into a file. The merge operation is completed on the cloud storage, and the application end does not need to read the part again to merge the file and then upload the file. However, Bulk formats require global processing at a time. Therefore, they cannot be uploaded in segments and then merged. They must be uploaded at a time.
When the second job senses that a new JSON file has been uploaded, it loads it, converts it to RCFile, and then uploads it to the final path. The latency associated with this process is small, and a file can be kept within 10 seconds, which is acceptable.
2. Elegantly perceive input files
At the input end, Flink’s FileStreamingSource is not adopted, but the Event Notification of cloud storage is used to sense the generation of new files, and the file will be loaded after receiving the notification.
3. The perceptibility and integrity of Partition
On the output side, we print the DT level success file to make the downstream reliably aware of the ready of the calendar. We implement a custom StreamingFileWriter that outputs partitionCreated and partitionInactive signals, and by implementing a custom PartitionCommitter that determines the end of the calendar based on the above signals.
The mechanism is as follows: Each cloud storage writer sends a partitionCreated signal when writing an action, and sends a partitionInactive signal when writing an action. PartitionCommitter checks whether all partittions are inactive within a certain day. If so, all data of the day is processed, and success file of dt level is output. This document shall be sensed by Airflow to determine whether Flink has completed the calendar processing.
4. Exactly Once
The Event Notification of cloud storage provides At Least once guarantee. The Flink job deduplicates the file level and adopts the Exactly Once checkpoint setting. The cloud storage file output based on MPU mechanism is equivalent to supporting TRUNCate. Therefore, the cloud storage file output is equivalent to idempotent and therefore equivalent to end-to-end Exactly Once.
Vii. Project Achievements and prospects
The project has been online, and the delay is about 34 minutes, including 15 minutes of waiting for late documents.
-
The first Flink job takes about 8 minutes to checkpoint and output, and the json to RC job takes 12 minutes to complete all processing. We can continue to compress this time, but considering timeliness and cost, we choose the current state.
-
The conversion from JSON to RC takes longer than expected, because the last checkpoint of the upstream job outputs too many files, resulting in a longer overall time. This can be linearly reduced by increasing the concurrency of the job.
-
The number of files output is increased by about 50% compared with the number of files output by a batch job. This is the disadvantage of streaming over batch processing, which requires the output of a file when the time is up and the file size may not be as large as expected. Fortunately, an increase in file count of this magnitude does not significantly affect downstream performance.
-
We achieved complete transparency of the downstream, and did not receive any abnormal feedback from users before and after the launch.
This project allowed us to demonstrate in a production environment the use of Flink, a streaming processing framework, for seamless access to a batch system, with local improvements that the user does not feel. In the future, we will use the same techniques to speed up the production of more other Hive tables and broadly provide the production of more fine-grained Hive representations, such as the hour level. On the other hand, we will explore the use of Data Lake to manage batch stream data and realize the gradual convergence of the technology stack.
Eight, afterword.
With a completely different computing framework and the need to be completely consistent with batch systems, the team stumbled through a number of holes that were too small to list. So let’s pick a few representative questions for readers to consider:
-
To verify that the output of the new job is consistent with the original Hive output, we need to compare the output of the two. How to compare the consistency of two Hive tables efficiently? In particular, there are billions of data per day, each with hundreds of fields, and of course complex types (array, Map, array, etc.).
-
Must the checkpoint mode of both Flink jobs be Exactly Once? Which one can not, and which one must be?
-
The StreamFileWriter receives partitionCreated and partitionInactive signals only at checkpoint, So can we print it in its snapshotState() function to the downstream (which will be saved to state)?
-
Last question: Do you have a better plan for our reference?