In the context of high concurrency and large traffic with 400 million monthly live users, weibo video platform should not only ensure users’ microblog production and consumption experience, but also support rapid business iteration to ensure correctness, stability and high availability. This performance will focus on the architecture design of microblog video large-scale video offline processing system to bring the first-line practical experience of large-scale distributed system architecture design, performance optimization and high availability guarantee.
By Huo Donghai
Organizing/LiveVideoStack
Hi, everyone. I am Huo Donghai, an architect from the R&D department of Sina Weibo video platform and Weibo platform. I joined Weibo in 2017. Recently, I focused on the improvement of Video platform technology system to help improve user experience, and led the construction of Weibo Streaming Video Engine (SVE) system, which supports parallel transcoding of videos in large concurrent scenarios and greatly improves transcoding efficiency.
1. Background
Weibo itself has the characteristics of large concurrency and large traffic, with more than 400 million monthly activities. At the same time, Weibo is also an open platform, supporting a variety of third-party sharing, and millions of video sharing needs to be processed every day.
Weibo video services can be roughly divided into two amateur forms, one is vertical short video sharing as shown in the left picture, and the other is slightly longer horizontal short video playing as shown in the right picture.
Micro-blog videos also have some special scenes. For example, the PC endpoint video button on Micro-blog will jump to Kuran.com, which is a 5-15-minute short variety video sharing website. As shown in the picture, below are some videos shared by youku, IQiyi, Tencent and other video websites to micro-blog.
Our microblog video team is faced with extremely complex business scenarios, in which we need to solve the problem of video processing. As shown in the figure, we have microblog video, cool burning video, paid video, Microblog story, Miaopai, and video sharing websites accessed through open platforms. Microblog has access to many business parties at the top. The service scheduling center (service scheduling layer) is introduced to schedule upper-layer services.
In addition, data synchronization. All videos are presented on Weibo in the form of blog posts, and we always need to interact with our own system. Another function of the business scheduling layer is to analyze video content. The next layer is the file storage, media library layer. File storage includes file upload, file storage and other problems. Media library is the source information of video objects, such as video resolution URL, video length and width, user ID, blog content and other information storage. The lowest level is transcoding service. What we focus on is the thinking of transcoding service in the micro-blog scenario.
2. Architecture and challenges of weibo video transcoding service
2.1 Traditional architecture of video processing system
Before we talk about the problems weibo faces, let’s take a look at the traditional architecture of video processing systems. For example, a user has a 1080p, 5Mbps video that needs to be uploaded on a PC or mobile phone. In a traditional architecture, files are first sent to the file upload service, which sends them to the underlying storage. After the file is transferred to the storage, the file upload service notifies the transcoding service that the file needs to be transcoded. During transcoding, the transcoding service transmits the transcoding task to the transcoding server in the corresponding transcoding cluster through the scheduler. Real transcoding machine, download the source file uploaded by the user from the storage, convert it into a specific format and then store it back in the storage.
2.2 Microblog video transcoding service – complex business
For micro bo video, we have a very complex business, such as business will have different watermarks, some users will have special request for your video, other systems need to be able to meet the needs of online validation optimized algorithm for transcoding, plus transcoding service itself will provide draw frames and other basic services, to bring these together quickly and easily to support business needs, We have big challenges ahead of us.
2.3 Microblog video transcoding service — speed up and optimization
In addition, when optimizing the basic video experience, we will propose parallel uploading to improve the success rate of user uploading, and do similar functions like resumable uploading. We will also do parallel transcoding to complete the fragment transcoding proposed by cloud manufacturers. We even made it possible for the user to transfer and save the video at the same time, so that the video could be fragmented on the user’s mobile phone and uploaded at the same time. Transcoding was carried out in the background at the same time of uploading, and transcoding could be completed at the same time of uploading. Finally, the combined video could be sent. This greatly improves users’ experience in the process of uploading videos to micro blogs.
Speed up optimization example
The first one is sequential uploading. At present, the sequential uploading process is usually binary slicing. After slicing, the whole system will have a long delay. In parallel uploading, for example, two processes will upload at the same time, which is twice as fast as the sequential uploading.
In parallel transcoding, it is equivalent to uploading videos in binary fragments and merging them together for transcoding. During transcoding, the video is divided into different length segments for fragment transcoding, and the video is merged after completion. In this way, the delay is reduced by increasing parallelism. In the pass-by-transfer mode, after the client uploads and stores the video, the client transcodes immediately. The client operates in parallel with the server. Finally, the server merges the source video and target video respectively.
2.4 Challenges of microblog video processing system
We face the dual challenge of multifarious business and basic service optimization. In addition, weibo business has a strong real-time, which requires us to complete every link quickly, including the time of our code implementation and the time of the access business to go online. We must realize a video transcoding service with low delay, high concurrency, high availability and high performance.
The video transcoding service itself requires a lot of computing and requires a large scale cluster to support this service. Another challenge we faced was managing a large number of clusters. Since we use the optimization method of fragment transcoding and transfer at the same time, when a video is cut into ten pieces, the transcoding amount will become ten times, which leads to a sharp increase in transcoding tasks and a more fine-grained scheduling. Slicing brings us more complex task dependencies, and we need to manage the task dependencies in the whole process of slicing, sharding parallel transcoding and merging. The more steps in a process, the higher the failure rate, the more robust the system is required to reduce failure.
Today we mainly talk about how to achieve a low delay, high concurrency, high availability, high performance system, I will mainly from the following aspects to explain. The first is a highly flexible configuration generation system, which is the equivalent of taking business related things out of the main system and putting them into the configuration system so that the main system focuses on basic performance optimization and basic services. The second point is that a DAG based logical organization framework uses a workflow engine to organize dependencies between tasks. Finally, we will talk about the important role of high availability, high performance task scheduler on the system.
3. Architecture design of microblog video transcoding service
3.1 wood Tomlinson
Highly flexible configuration generation system
For flexible configuration, we named it Mullinson. It is a rule engine based on tree structure, that is, our configuration results are tree structure, multiple trees can form a forest, so we named it Wood Forest. Woodson supports flexible configuration generation. In some weibo business scenarios, the product side only requires speed and does not care about the video output attributes. In this case, we can directly connect the existing output and input services to complete business access. In this way, new service access efficiency can be improved. Here is a simple example.
As shown in the figure, for example, we have the native video access service of Weibo, and the hundreds of millions of videos to be accessed now hope to have the same output as the native video. At this point, we do not need to change the output from the output business to the transcoding output. We only need to connect the nodes, and the input video of 100 million acts has the same output as the native video of Weibo. Just click config in the background to access the video. The figure on the right shows the output configuration of weibo video.
In complex scenarios, the output services of native video, instant video, and VIP video are configured as shown in the figure. The system outputs different videos on different clients. This section describes how to remove service logic in complex scenarios.
3.2 DAG
Logical organization framework based on DAG
We have implemented a workflow engine framework to support our business. First, we introduce the idea of the framework. We are based on Java development, using Java as an example. For the general upload system, the code only download, transcoding, upload process. On the basis of this section of code, we want to achieve a fragment to code, side pass side turn and other complex logic flow. The easiest way to do this is to copy and change the code we normally upload, as shown on the right. The uploading process becomes downloading, slicing, uploading the slicing results, downloading the slicing, transcoding the slicing, uploading the slicing, and then the process repeats itself. At this time, multiple machines can work in parallel, and finally the slicing is merged. Because of the complexity of the process, we wanted to use directed acyclic graphs to connect organizations, solidify basic services, and organize different functions through scripts. At this time, whether we download the file transcoding upload or fragment upload, just a simple connection, the code between the two downloads does not need to change. We solidify the curable parts, break the code into closures that can be executed independently, and implement the closure execution inside the DAG by managing the relationship between packages. That’s how we think about the DAG framework.
This is the illustration of our transcoding service. In the figure, the Center part is the service of central dispatch, the Runner part is the service of transcoding task, and videoTrans is the script of DAG to organize the relationship between tasks. Our script is implemented in Groovy. The framework, named Olympiadane, performs concatenated relationships through the Groovy engine, where groups are independently schedulable units, shown in white. The Task first passes through the scheduler, which distributes the Task to the executor according to the situation. The executor executes the Task in the order of dependency, in this case, downloading, transcoding, and uploading. The other machine is the same. This decouples the execution flow from the business. If we want to access other new services, we just need to implement another Task and put the dependencies of that Task into the script. The Job in the figure is also generated by the script. This is our DAG architecture.
After the DAG framework is implemented, supporting services can be quickly accessed through scripts. Since scripts change but features are not frequently changed, features can be shared between scripts or tested independently, so that independent components that can be assembled can be completed. These independent components can be independently tested, easily expanded, easily deployed, and have high performance features.
This describes the process of DAG. The sharding transcoding process mentioned above includes downloading, slicing and uploading the sharding results.
For example, there are three machines at the same time to complete the work of downloading fragments, transcoding and uploading results. When the parallel process ends, there will be a new dependency, as shown in the figure, downloading all the fragments, merging the transcoded video, and files of different sharpness are working in parallel on different machines. This is a practice of transcoding service optimization for us through DAG organization.
In this picture, the gray part turns green, indicating that the process can be observed, which is an advantage of DAG. We can see where the mission is going, and we can locate problems more quickly. In addition, since we split the Feature, we can carry out DAG section for the independent Feature. For example, we can count its time consumption, so that we can know which type of business takes a long time, which is also helpful to observe the stability of the system.
In terms of DAG optimization, we achieved fast script execution through bytecode compilation technology. In addition, we introduced some Protostuff technology to do resource storage quickly.
The 3.3 scheduler
High availability, high performance task scheduler
We pull out the business system through Mullinson, and pull out the dependency relationship during implementation through DAG system, so we need a good scheduling system to support. Because of the slicing, the task was scheduled ten thousand times per second. We will also need more fine-grained scheduling tasks, with higher performance requirements on the underlying components than coarse-grained scheduling. Another of our design goals for the scheduler was to have less than five percent dispatch, which meant lower system loss. This has a very high demand on the scheduler, we want to make ninety-nine percent of the scheduling tasks to be dispatched to the corresponding machine within 10ms, and we want its scheduling to be optimal, that is, to be able to accurately assign tasks to idle machines.
We also did some thinking when designing the scheduler. We compared the central scheduler with the non-central scheduler. Centralized scheduler has high scheduling accuracy. It puts resource queue information into centralized storage and is more friendly to monitoring. However, it has a lot of resource dependence. We put the queue into the resource, so the resource access and read will have certain dependence, and also have certain performance loss. For the decentralized scheduler, it is more scalable, but it has the problem of inaccurate scheduling. Finally, we chose the centralized scheduling mode.
The picture above shows the scheduler scheduling process. On the left is the scheduler and on the right is the executor. The scheduler and actuator are registered by heartbeat, and the heartbeat time is configurable. After registration, the machine information will be put into the machine queue. There is a task priority queue in the central resource. We can map different priorities for different tasks. The other is the machine idle priority queue, where we map machine idle to priority. In the process of distribution, we will take the high-priority task, take the actuator with high idle priority, and then send the task to the specified machine, so that the task can be put into the execution queue. The important role of execution queues will be discussed later. When the execution is complete, a callback is performed to remove the task from the execution queue. We complete task scheduling through three queues. Since there is resource dependence, we hash these resources. Different machines can use different resources, and tasks can be assigned as long as the resources are satisfied.
But there’s a problem here. The heartbeat reports back, but there is a delay, and if the Executor differs from the state stored in the resource, the task may be dispatched to a machine that is not working. To solve this problem, we design a double dispatch with lock. As described earlier, we still select machines from the queue. The difference is that we take a random machine to dispatch while we get the best out of the idle priority queue. After dispatching, the executor calls the scheduler again to confirm who has completed the task before executing it. When the optimal task is not executable, another machine can complete the task. In this way, we can achieve the goal of 99 percent of tasks being dispatched within 10 milliseconds. In the same way, double – dispatch with lock also has hash computation. At the same time, we use WatchDog to observe whether the task in the execution queue is completed at the specified time. If not, we will retrigger the scheduler to dispatch the task. This way we can effectively slow down the increase in failure rates.
Through the above design, our scheduler can achieve millisecond dispatching. For weibo business, there may be a large emergency flow, we also consider the horizontal expansion mode in the design, so that it supports elastic expansion and contraction capacity. Through the WatchDog mechanism, we can realize the automatic removal of downtime. In practical application, we will expand and shrink the capacity of the machine every day. Our approach is to let the machine to be expanded not accept tasks, and finish the existing tasks first, and then do the work of expanding and shrinking the capacity of the machine. However, there are still unfinished tasks. Through the WatchDog mechanism, we can ensure that these tasks are reassigned and completed. At the same time we achieved four nines.
3.4 the deployment
In terms of transcoding service deployment, we deployed two identical sets of resources in two IDCs with independent domain names and independent deployment. The advantage of this is that we can cut traffic between the two rooms at will, and if there is a problem in either room, we can switch, but the deployment of the two rooms is not one-to-one redundancy. Our standing machine room is a large cluster, and the other machine room is a small one, maybe a tenth of the size of the standing machine room. The two rooms can be separated in use. For example, when we transfer some transcoding output that does not affect users’ Posting, we can use the small room to complete the task. In this way, when a “catastrophic” situation occurs in the large room, the traffic can be cut to the small room. Of course, a small machine room cannot meet such a large flow, but the queue of the scheduler itself has the characteristics of stacking, which can slowly execute the stacked tasks. You can make full use of machines without a lot of machine redundancy.
4. To summarize
Next, I’ll make a summary of everything. Through the optimization and improvement of transcoding service, our overall transcoding speed increased by 5 times, and the standard deviation of cluster utilization rate decreased by 50 percent. The standard deviation here refers to the situation where the CPU utilization rate of the machine goes up and down due to the unreasonable assignment of tasks and the assignment of tasks with different leisure degrees. Through good scheduling, the utilization standard deviation of the cluster is greatly reduced. In addition, the efficiency of our business support has increased exponentially. We decouple in a variety of ways, putting things that change a lot and things that change a little in different places.
In the process of service architecture design and development, we use a lot of parallel means, including machine parallelism, process parallelism, thread parallelism, algorithm, CPU core parallelism, etc., through these means to maximize the value of the machine. The purpose of our design today is a low delay and high availability system, we use parallel upload, parallel transcoding, extreme upload and other means are used in the algorithm of divide-and-conquer, recursive, greedy ideas. In the aspect of high availability, it uses the idea of high cohesion and low coupling, and uses the means of static and static separation, automatic fault tolerance, and remote live. Finally, I want to share with you that when we do system architecture and design optimization and do not know how to achieve it, we can mindlessly apply these common ideas such as high cohesion and low coupling, space for time to the system, and maybe we can get the desired results. Optimization architecture is not so complicated and profound. What we need to think about is whether these methods summarized by predecessors can achieve the effect on our own system. If satisfactory results are obtained, then these ideas will be transformed into our own things.