background

The log collection system of Meituan is responsible for collecting all business logs of Meituan and providing offline data for Hadoop platform and real-time data flow for Storm platform respectively. Meituan log collection system is designed and built based on Flume.

“Flume-based Meituan Log Collection System” will be divided into two parts to present the readers meituan log collection system architecture design and practical experience.

The first part, architecture and design, will focus on the overall architecture design of the log collection system, and why such a design is necessary.

The second part of improvement and optimization will mainly focus on the actual deployment and use of the problems encountered in the process of functional modification and optimization of Flume.

1 Introduction to the log collection system

Log collection is the cornerstone of big data.

Many companies’ business platforms generate large amounts of log data on a daily basis. Collecting business log data for use by offline and online analytics systems is what log collection systems are designed to do. High availability, high reliability and scalability are the basic characteristics of log collection system.

The commonly used open source log collection systems include Flume, Scribe and so on. Flume is a highly available, highly reliable, distributed system for massive log collection, aggregation, and transmission provided by Cloudera. Flume is a sub-project of Apache. Scribe is Facebook’s open source log collection system that provides a simple, scalable, and highly fault tolerant solution for distributed log collection and unified processing.

2 Comparison of common open source log collection systems

The following compares various aspects of the common open source log collection systems, Flume and Scribe. In comparison, Flume mainly uses FluME-ng under Apache as the reference object. At the same time, we divide the common log collection system into three layers (Agent layer, Collector layer and Store layer) for comparison.

Compare the item Flume-NG Scribe
Use the language Java c/c++
Fault tolerance Fault tolerance is provided between Agent and Collector, and between Collector and Store, and three levels of reliability are guaranteed. There is fault tolerance between Agent and Collector and between Collector and Store.
Load balancing The Agent and Collector and the Collector and Store work in LoadBalance and Failover modes There is no
scalability good good
Agent richness Provides a variety of agents, including Avro/Thrift socket, text, tail, and so on Mainly thrift ports
Store richness Can directly write HDFS, text, console, TCP; Supports compression of text and sequence when writing HDFS. Buffer, network, file(HDFS, text), etc
The code structure The system has a good framework, distinct modules and is easy to develop The code is simple

3 Architecture of Meituan log collection system

The log collection system of Meituan is responsible for collecting all business logs of Meituan and providing offline data for Hadoop platform and real-time data flow for Storm platform respectively. Meituan log collection system is designed and built based on Flume. Currently, about T level of log data is collected and processed every day.

Below is the overall framework of meituan’s log collection system.

Meituan log collection system architecture

  • A. The whole system is divided into three layers: Agent layer, Collector layer and Store layer. One process is deployed on each machine at the Agent layer to collect logs of the single machine. The Collector layer is deployed on the central server and is responsible for receiving logs sent by the Agent layer and writing logs to the corresponding Store layer according to routing rules. The Store layer is responsible for providing permanent or temporary log storage services, or directing log flows to other servers.
  • B. Agent to Collector Use the LoadBalance policy to evenly send all logs to all Collectors to achieve load balancing and handle the failure of a single Collector.
  • C. The Collector layer has three main targets: SinkHdfs, SinkKafka and SinkBypass. It provides offline data to Hdfs and real-time log flow to Kafka and Bypass respectively. SinkHdfs is divided into SinkHdfs_b, SinkHdfs_m, and SinkHdfs_s according to the log volume to improve the performance of writing to Hdfs. For details, see the following section.
  • D. For stores, Hdfs stores all logs permanently. Kafka stores the latest 7-day log and provides a real-time log stream to Storm. The Bypass provides real-time log flows to other servers and applications.

The following figure shows the module decomposition of meituan’s log collection system. The relationship between Source, Channel and Sink in Agent, Collector and Bypass is explained in detail.

Meituan log collection system architecture

  • A. Module naming rules: All Source starts with SRC, all channels start with CH, and all sinks start with Sink;
  • B. Channel uniformly uses DualChannel developed by Meituan. The specific reasons will be detailed later; NullChannel is used for filtered logs. The reasons are detailed later.
  • C. Avro interface is uniformly used for internal communication between modules;

4. Architectural design considerations

The following is a detailed analysis of the above architectures in terms of availability, reliability, extensibility, and compatibility.

4.1 Availability

For log collection systems, availablity refers to the total time the system runs without failure during a fixed cycle. In order to improve the availability of the system, it is necessary to eliminate the single point of the system and improve the redundancy of the system. Let’s take a look at usability considerations for Meituan’s log collection system.

4.4.1 Agent die

The Agent can die in two ways: the machine freezes or the Agent process dies.

In the case of a machine crash, the process that generates logs also dies, so no new logs are generated and no service is not provided.

In the case of Agent process death, it does reduce the availability of the system. There are three ways to improve the usability of the system. First, all agents start under Supervise, and if the process dies, the system restarts it immediately to provide service. Secondly, the survival monitoring of all agents is carried out, and the alarm is immediately reported when the Agent is found dead. Finally, for very important logs, it is recommended that the application directly write logs to disks, and the Agent use spooldir to obtain the latest logs.

4.1.2 the Collector die

The central server provides peer and undifferentiated services, and the Agent accesses the Collector using LoadBalance and retry mechanisms. So when a Collector is unavailable, the Agent’s retry policy sends data to other available collectors. So the entire service is not affected.

4.1.3 Hdfs Stops Normally

We provide an on-off option in the Collector’s HdfsSink to control the Collector to stop writing Hdfs and cache all events to FileChannel.

4.1.4 Hdfs is Abnormally Stopped or Inaccessible

If Hdfs is abnormally shut down or inaccessible, the Collector cannot write Hdfs. Because we are using DualChannel, the Collector can cache the events it receives into a FileChannel, save them to disk, and continue to serve. After Hdfs resumes service, events cached in FileChannel are sent to Hdfs. This mechanism is similar to Scribe and can provide better fault tolerance.

4.1.5 The Collector Becomes Slow or the Agent/Collector network becomes slow

If the Collector’s processing speed is slow (for example, the machine load is too high) or the network between the Agent and the Collector is slow, the Agent may be slow to send to the Collector. Similarly, in this case, we use DualChannel on the Agent side. The Agent can cache the events received to FileChannel, save them on disk, and continue to provide service. When the Collector resumes service, events cached in the FileChannel are sent back to the Collector.

4.1.6 Hdfs slower

When Hadoop has many tasks and a large number of READ and write operations, Hdfs reads and writes data slowly. This is very common due to the peak use periods of every day and every week.

We also use DualChannel to solve the Hdfs slowdown problem. When Hdfs writes quickly, all events only transfer data through MemChannel to reduce disk IO and achieve high performance. When Hdfs writes are slow, all events pass data only through FileChannel, and there is a large data cache space.

4.2 Reliability

For the log collection system, reliability refers to Flume ensuring the reliable transfer of events during data flow transmission.

For Flume, all events are stored in Agent’s Channel and then sent to the next Agent in the data stream or the final storage service. When will events in an Agent’s Channel be deleted? If and only if they are saved to the next Agent’s Channel or to the final storage service. This is the basic single-hop messaging semantics that Flume provides for point-to-point reliability assurance in data flows.

So how does Flume achieve the above basic messaging semantics?

First, transaction exchange between agents. Flume uses transactions to ensure the reliable delivery of events. Sources and sinks are encapsulated in transactions, respectively, provided by stores that hold events or provided by channels. This ensures that events are reliable in point-to-point transfers of data flows. In multilevel data flow, as shown in the figure below, both sinks at the upper level and sources at the lower level are included in the transaction to ensure reliable data transfer from one Channel to another.

Meituan log collection system architecture

Second, the persistence of channels in data flows. Flume memoryChannels are likely to lose data (when an Agent dies), while FileChannel is persistent and provides a logging mechanism similar to mysql to ensure data is not lost.

4.3 Scalability

In log collection systems, scalability refers to the system’s ability to scale linearly. When the log volume increases, the system can simply increase the number of machines to achieve linear expansion.

For flume-based log collection system, each layer of the design needs to be able to provide services linearly. The extensibility of each layer is explained below.

4.3.1 Agent layer

For the Agent layer, one Agent is deployed on each machine, which can be horizontally extended without limitation. On the one hand, the log collection capability of the Agent is limited by the performance of the machine. Normally, one Agent can provide sufficient services for a single machine. On the other hand, if you have a large number of machines, you may be limited by the services provided by the back-end Collector, but there is a Load Balance mechanism from Agent to Collector that allows the Collector to scale linearly to improve its capabilities.

4.3.2 the Collector layer

For the Collector layer, there is a Load Balance mechanism from Agent to Collector, and the Collector provides undifferentiated services, so it can be linearly scaled. Its performance is largely limited by the capabilities provided by the Store layer.

4.3.3 Store layer

For the Store layer, Hdfs and Kafka are distributed systems that can scale linearly. The Bypass is a temporary application and only corresponds to a certain type of logs. The performance of the Bypass is not the bottleneck.

4.4 Channel selection

Flume1.4.0 provides MemoryChannel and FileChannel for you to choose from. Its advantages and disadvantages are as follows:

  • MemoryChannel: All events are stored in memory. The advantage is high throughput. The disadvantages are limited capacity and loss of data in memory when an Agent dies.
  • FileChannel: All events are saved in a file. The advantages are large capacity and data recovery on death. The disadvantage is slow speed.

The above two channels have opposite advantages and disadvantages, and each has its own suitable scene. However, for most applications, we expect channels to provide both high throughput and large caches. Based on this, we developed DualChannel.

  • DualChannel: Developed based on MemoryChannel and FileChannel. When the number of events stacked in Channel is less than the threshold value, all events are stored in MemoryChannel, and Sink reads data from MemoryChannel. When the number of events accumulated in the Channel exceeds the threshold, all events are automatically stored in the FileChannel, and Sink reads data from the FileChannel. This allows us to use the high throughput features of MemoryChannel when the system is running properly; When the system has exceptions, we can take advantage of the FileChannel’s large cache.

4.5 is compatible with Scribe

From the beginning of the design, we required a category for each log type, and Flume Agent provided AvroSource and ScribeSource services. This will remain comparable to the previous Scribe, reducing change costs for the business.

4.6 Permission Control

In the current log collection system, we use only the simplest permission controls. Only the specified category can be added to the storage system. So the current permission control is category filtering.

If the permission control is placed on the Agent side, the advantage is that garbage data can be better controlled in the system flow. However, the disadvantage is that it is difficult to modify the configuration. Every time you add a log, you need to restart or reload the configuration of the Agent.

If the permission control is placed on the Collector side, the advantage is that configuration changes and loading are easy. The disadvantage is that some unregistered data may be transferred between Agent/Collector.

Considering that the log transmission between Agent and Collector is not a system bottleneck and log collection is an internal system, the collector-side control is selected.

4.7 Providing real-time streams

Some of Meituan’s services, such as real-time recommendation and anti-crawler service, need to deal with real-time data flow. So we want Flume to export a live stream to Kafka/Storm.

A very important requirement is that real-time data flow should not be affected by the speed of other sinks to ensure the speed of real-time data flow. At this point, we set different channels in the Collector for isolation, and the large capacity of DualChannel ensures that log processing is not affected by Sink.

5 System Monitoring

Monitoring is an essential part of a large complex system. Properly designed monitoring can detect abnormal situations in time. As long as there is a mobile phone, you can know whether the system is working normally. For meituan’s log collection system, we set up multi-dimensional monitoring to prevent the occurrence of unknown anomalies.

5.1 Sending speed, congestion, and Hdfs writing speed

With the data sent to Zabbix, we can chart the number of deliveries, congestion, and Hdfs writing speed. For unexpected congestion, we will call the police to find the cause.

Flume Collector HdfsSink write data to Hdfs

Meituan log collection system architecture

Here is a screenshot of the amount of congested events data in the FileChannel of the Flume Collector:

Meituan log collection system architecture

5.2 Flume Write HFDS Status Monitoring

Flume writes Hdfs files to TMP files. For important logs, we check every 15 minutes to see whether all the collectorsgenerated TMP files. For those collectorsand logs that do not normally generate TMP files, we check whether there are exceptions. In this way, Flume and log exceptions can be found in a timely manner.

5.3 Monitoring log Size Anomalies

For important logs, we will monitor the log size every hour to see if there is a big fluctuation in the week compared to the previous year, and give reminders. This alarm has effectively found abnormal logs, and found the abnormal logs sent by the application party for many times, giving feedback to the other party in time, helping them repair their own system abnormalities as soon as possible.

Through the above explanation, we can see that The Meituan log collection system based on Flume has been a distributed service with high availability, high reliability, scalability and other characteristics.

 

Source:

Meituan Technology DJU Alex


🍎QQ group [837324215] 🍎 pay attention to my public number [Java Factory interview officer], learn together 🍎🍎🍎