The party or xuan


Today’s sharing mainly focuses on the big data platform developed by Qiniu in the past year. At present, our platform has been carrying the operation of the company’s core business. As for our products, we will introduce them from a scene, including the challenges we encountered in the design process and the solutions. We also welcome you to communicate and discuss with us based on these issues.

Scenarios. Products

For operations staff, in the daily routine of online operations, a journal of the volatility of a day visits, online error distribution and other business metrics that data for operations staff is not a transparent process, so how to do something of this visualization, these data are collected or do a unified processing analysis, In fact, it is a relatively complex and difficult to achieve the process. This is called o&M log analysis and is the scenario we mentioned earlier. The details of our product solution scenario will be further analyzed below. Let’s use nginx-log as an example to describe our Pandora product.

Data access Pandora – LogKit configuration running

The first step in any data analysis is data access. Pandora developed logKit, a data access tool that helps users get data into the Pandora platform; At the beginning you need to download logKit, configure it, and run it (Figure 1)

Figure 1

Logkit tools support multiple data sources, such as nginx-log and Kafka data collection, and feed into our data processing platform. To explore Figure 1 in detail, first we need to look at the log format, including the name of the log format. In Figure 1, we specify the path and format of the log store. Finally, enter the configuration file, configure the information that needs to be configured, and specify the path where the data needs to be stored. If you need to hit a message queue, you need to configure the key and run it, then this data will be collected into our platform.

Log retrieval

Figure 2

Figure 2 shows an intuitive visual interface that supports drag and drop. On the left side of the page, you can see “Data Source” and “Log Search”. After logKit is configured, all data will be put into “data source”. The right side of the page displays the name, format and other information of each field in the data source.

Figure 3

Shown in figure 3 is “log retrieval according to the content of the page,” through “log retrieval” we can clearly see some business logic, fill in your query in the search box condition, can full-text retrieval, when need to look at the past moment response to all requests for more than 3 s, then through the “log retrieval” page can also query and displayed clearly. Figure 3 only shows the status of a full-text search, and you can also view a histogram of the distribution of related data on the feature page.

Log polymerization

Figure 4.

As shown in Figure 4, data fed into the data source can be aggregated computationally on a minute-by-minute granularity through a section of SQL. There are many things that can be aggregated, such as the number of requests from a certain IP address, or other related operations, and when the aggregation is complete, the data will flow back into our data source again. To put it simply, we re-flow data to the data source through one calculation for the next step of analysis and processing. The process of calculation and backflow can be continuously cascaded, and many relatively complex data processing can be realized.

Data flows back to the platform

Figure 5

The data backflow to the data source mentioned above is one way of processing. Users build their own HTTP services, and data backflow to their own system through the HTTP interface is another way of data processing. In this way, users can precipitation the analysis results on their own platform, as shown in Figure 5 on the operation page.

Real-time data display and monitoring

Figure 6.

Figure 6 shows our monitoring page intuitively. The configuration of Grafana page is required after the monitoring service is opened. The basic configuration of the page is provided in our official document, and users can download and import it directly.

Figure 7.

Figure 7 shows the data after analyzing the Nginx log. The orange box in the upper left corner (where visits are 0) shows the total number of accessible visits, the green bar chart in the upper right corner shows the number of requests that have occurred in the past and the response time, and the pie chart in the lower right corner shows the percentage of related user visits. The style and location of these diagrams can be configured.

Architecture design

Figure 8.

Figure 8 illustrates Pandora’s business architecture. Data can be imported into our platform through Portal/Logkit/SDK/API and entered into the message queue. The data in the message queue can be calculated repeatedly and flow between the calculation task and the message queue. Of course, these data can also be directly exported. The exported data can be processed by the downstream system (log retrieval/timing data, etc.) to generate data reports. This is the whole flow of data.

Pipeline design objectives and technical selection

Each system will draw up design objectives and corresponding problems to be solved at the time of initial design. First of all, the system must support fast data access, high throughput and low latency. Secondly, as a cloud service, it must support massive users’ concurrent access and massive message queues. The framework of real-time computing and off-line computing should be provided to meet the computing needs. Ultimately it must be a visual operation that meets the user’s operational requirements. After the design goal is proposed, we need to plan the selection, we need to choose a storage system with high throughput, of course, the current seven cattle storage system is undoubtedly the most meet the needs; Second, we need a powerful and flexible big data processing engine. Finally, the developer must ensure that the final product can be developed quickly and iteratively. Based on these requirements, it was easy to choose the appropriate technical support and use Kafka to meet our needs for massive message queue design; Use Spark as the computing engine. In terms of language selection, Golang, which has a profound foundation, was selected. Finally, after determining the selection of these technologies, we began to build the system.

Figure 9.

Figure 9 shows the overall architectural design of our Pipeline, which is responsible for data access and processing in Pandora. Data is imported to the data access layer (apiserver) through Logkit. The data passing apiserver will enter the message queue, and then it will be read and written back by the computing engine, and finally imported to the downstream system (LogDB/TSDB/HTTP/ Quniu Cloud Storage). Today we will focus on the direction of the data flow indicated by the green arrow, and will mention the relevant key points in detail. There are several factors that may determine the efficiency of the system, such as stability, performance, and so on, throughout the flow of data. So I’m going to go from the user to the message queue, through the computation, back to the message queue, and finally export the data.

Data access layer

Figure 10.

Figure 10 shows the data access layer. The data is imported through Apiserver, and the dispatcher manages the source data of some user message queues, including how the data is written to the message queues. The LogKit tool is here not because data flows through Apisever to LogKit and eventually to the message queue, but because it can collect various forms of data. In this case, we used it to collect system audit logs and monitoring information. It is easy to manage and configure.

The container is changed

Figure 11.

In the initial design of the system, expansion was a problem that puzzled us. The access speed of internal users is relatively fast. Therefore, capacity expansion is required at least once or twice a week, which is a heavy burden for operation and maintenance. Then we adopted the container solution. Since the whole data access layer is a stateless component, we containerized it and solved it with our container cloud product. As shown in Figure 11, we laid out Apisever and LogKit together in each POD. Through monitoring data, we summarized all the information of each container including the whole cluster into the scheduler. The scheduler carries the load and total resource information of the entire cluster, and can dynamically expand or shrink the cluster based on the information.

Data write optimization

Figure 12

Figure 12 shows the process of optimizing data writes. The first-generation data writing process is carried out in a serial way. After data import, data is parsed line by line, and then data is written into the message queue after all data is parsed. However, the processing efficiency of this method is very low. Therefore, we use the characteristics of GO language to adopt line channel. Data stream into channel continuously, and then multiple Parsers downstream from channel are used to parse data in parallel. In other words, we use channel to turn processing into a concurrent process, which ultimately improves CPU utilization, reduces the latency rate of user response, and greatly optimizes performance.

To calculate

Figure 13

As shown in Figure 13, our calculation is based on the Spark implementation, which provides a relatively simple SQL that shields the user from the low-level details.

Export to optimize

Figure 14

Data flows into the whole system. Whether it is calculation or storage in the system, these processed data should flow into the downstream system if they need to play a role. Therefore, the process of “exporting data” plays a role of connecting the upstream and downstream. Figure 14 is the overall architecture of this system. At that time, there was no fine-grained task segmentation for the export service, and a single server could not handle large user tasks, so the delay increased in the peak period. Based on this, we finally launched a new version after a month of development.

Figure 15

The improved overall architecture is shown in Figure 15. At the top of the diagram is our master, which controls the scheduling management of all tasks. All tasks are forwarded to the master through the scheduler. The master evaluates the load on each machine, and then schedules the tasks according to some states of the machine itself (CPU usage, network bandwidth, task execution). In addition, we also make more fine-grained task segmentation.

The design of scheduling method firstly considers resource orientation, secondly needs to make full use of heterogeneous machines, and can meet the requirements of automatic adjustment. Resource-oriented, we can understand that the reason for making full use of heterogeneous machines is that our machines have many specifications and can solve inconsistent task intensity. We need to make full use of the resources of the machine, so as not to have insufficient or wasted “machine resources” when processing tasks. As for automatic adjustment, we can ensure that in the face of sudden increase or decrease of users, we have the ability to automatically adjust the distribution of tasks, and its ultimate purpose is to make full use of resources.

Task allocation

Figure 16

Figure 16 is a process diagram of task allocation. Assuming that the initial tasks (T1-T7) are relatively evenly distributed among the three machines, and two other tasks (T8-T9) enter at this time, we need to find some relatively idle machines (S1 or S2) and assign these two tasks to them first. This is just an adjustment for a relatively balanced situation.

Automatically adjust

Figure 17

Figure 18

Of course, there will be imbalance (Figure 17-18), so we need to do some automatic adjustment. For example, a user deletes many tasks, S1 and S2 will be idle compared with S3, and we need to report heartbeat to master through server. This content includes the occupation of resources and the distribution of tasks, according to the results of the more idle machines to make an adjustment to maintain a relatively balanced state.

Horizontal scaling

Figure 19

Figure 19 is a problem that arises when scaling horizontally. All machines are currently in a busy state. At this time, if a new task (T13) comes, but the first 12 tasks have been distributed to the three machines for processing, and there is no room for idle machines to process the new task, then the machine needs to be expanded.

Figure 20

As shown in Figure 20, when the first three machines are in the “busy” state, we need to add Server4. Once S4 is started, it reports heartbeat to the master. The master then detects the presence of this task and S4 and re-evaluates the overall resource distribution and usage. Assign T13 to the relatively idle S4, and even assign tasks waiting to be processed in S1, S2, and S3 to S4.

Resource isolation

Figure 21

In fact, it is very important not only to automatically adjust the tasks but also to share the processing pressure of the machine. For some special tasks, how to ensure that the sudden increase of user traffic will not affect other relatively small users? Or how to ensure that data does not affect other tasks when it is exported to the cloud for compression (a cpu-intensive process). To solve these problems, we put forward the concept of resource isolation (Figure 21) to isolate machines and tasks and provide scheduling groups (scheduling groups are similar groups of machines or tasks). By physically isolating them, they do not affect each other and make full use of resources.

Master the high availability

Figure 22

Figure 23

To sum up, we can see that our system is in one-to-many state (one master versus multiple servers), so in this case, how to solve the problem of ensuring high availability of services in case of single point of failure. A core of our design is shown in Figure 22 to Figure 23. We can see in the figure that the bottom end is a ZooKeeper cluster. We simulate a lock by creating a temporary file, and multiple machines can simultaneously seize the lock. If S1 loses the lock, master2 will preempt the lock and take over the scheduling task and some cluster management tasks. This is the idea of master high availability.

Server high availability

Figure 24

Server is highly available and we take a similar approach. We treat the master as a high availability node, and each server needs to report the heartbeat to the master, which includes the survival of the machine and the execution of the corresponding task. As shown in Figure 24, once the master senses that S3 is down, it will move both tasks (T5-T6) from S3, and it will consider S1 and S2 as relatively suitable choices and move these two tasks to the corresponding server, thus achieving the high availability goal of the server.

System level scaling

Figure 25

Figure 26

In the beginning, it was mentioned that our entire message queue was implemented using Kafka. Kafka actually has a ceiling. In the beginning, we also used kafka as a single cluster (Figure 25). So we extended a single kafka cluster (Figure 26) by splitting a single Kafka cluster directly into multiple clusters, keeping each kafka cluster relatively small. This resulted in a significant performance improvement, as shown in Figure 26. The information provided by the three KafKas is aggregated to our scheduler, which assigns new tasks created by users and new data sources to the appropriate Kafka cluster, either by pressure or by the number of message queues.

Optimization of upstream and downstream protocols

Figure 27

In practice, there will still be poor performance between upstream and downstream. In the beginning, we used Json for upstream and downstream data transmission, but the problem exposed in the log retrieval was that this was too costly to the network, so we decided to use Protobuf for upstream and downstream data transmission. Figure 27 shows a comparison of data results in terms of serialization and deserialization when using Json versus Protobuf. As you can see from the figure, the time consumed with Protobuf is shorter, especially when deserializing, which reduces CPU consumption by nearly an order of magnitude. Therefore, in this way, both the utilization of cluster computing resources and the improvement of network bandwidth can improve the efficiency several times.

Pipeline processing

Figure 28

As for the handling of the assembly line, the first design is actually a serial operation, export services pull data from the message queue, do a push after processing, continue this work process, including processing operations is soon, but the pull and push is relatively slow, such a process, execution efficiency is very low, In addition, the processing time of each operation is different, some are fast and some are slow. As a result, the trend chart of the network detected on the monitoring chart is high and low, which leads to the decrease of the utilization rate. In view of this, we optimized the pipelining operation and adopted the parallelization operation (Figure 28). The result shows that the efficiency of push and pull in this way is higher than that of the above method.

Golang GC

Our entire language selection is based on Golang, which is a language with GC. In fact, there are a lot of situations in which 1/10 of the time in the system is garbage collection instead of working. So at the code level we have made some improvements. One is the use of sync.Pool to reduce the frequency of garbage collection. The second is to reuse objects, reusing an object as much as possible so that the volume of each GC is reduced. After we upgraded Golang to 1.8 we looked at the GC time and saw an increase of nearly two orders of magnitude. This is code level optimization.

Finite resource hypothesis

Finally, we describe an optimization of the resource assumption, that is, to establish the concept of a finite resource assumption. Some time ago, due to the large amount of data access, we need to carry out operation and maintenance by ourselves. If customers suddenly access, the system will be easily overwhelmed. At this time, we will try to add machines or make some adjustments and optimizations in scheduling, but this is not a long-term solution after all. So we’re going to make a finite resource assumption at the beginning, which is to evaluate what we’re going to do with finite resources at the beginning. For example, we need to estimate the number of users with 10 mbit/s bandwidth in advance. This task must have data, and on this basis, we need to make resource estimation and cluster resource planning. According to the estimated data, a water level standard should be set. After the water level is exceeded, expansion should be considered. The client also needs to communicate clearly about our existing processing capacity, so as to ensure that the whole cluster/service is in a relatively healthy state.

results

We have mentioned the implementation of our architecture and an optimization of the whole. The current results are as follows: we support trillions of data points, and can process hundreds of TERabytes of data every day, and support a large number of users. Our system currently maintains very low latency and high processing efficiency; Because we have automated operations, our labor costs are greatly reduced, and our expectation is that we can write code without being distracted by operations. As far as usability is concerned, three 9’s (99.9%) have been achieved so far.