In the era of cloud native and containerization, container log collection is a seemingly insignificant but important issue that cannot be ignored. For container log collection, we commonly use FileBeat and FluentD, which have their own advantages and disadvantages. Compared with Fluentd based on Ruby, considering customizability, we generally choose Filbeat of Golang technology stack as the main log collection agent by default. Compared with the traditional log collection method, containerized single node will run more services and the load will have a shorter life cycle, which is more likely to cause pressure on the log collection agent. Although FileBeat is lightweight and high performance, if you do not understand its mechanism and properly configure FileBeat, The actual production environment may also bring us unexpected troubles and problems.

The overall architecture

The log collection function is not complicated, the main function is to find the configured log file, then read and process, send to the corresponding backend such as ElasticSearch, Kafka, etc. The fileBeat website has a diagram like this:

filebeat.inputs:
- type: log
  # Paths that should be crawled and fetched. Glob based paths.
  paths:
    - /var/log/*.log
Copy the code

Different harvester goroutine collected log data will be sent to a global queue queue, queue implementation has two kinds: based on memory, and disk queue, queue based on disk or in the alpha stage at present, filebeat enabled by default is based on the memory cache queue. Whenever the data cached in the queue reaches a certain size or exceeds the specified time (1s by default), it is consumed by the registered client from the queue and sent to the configured backend. You can configure kafka, ElasticSearch, and Redis clients.

While all of this seems simple enough, in practice, we need to consider more questions, such as:

  • How are log files found and collected by Filbebeat?
  • How does FileBeat ensure that log collection is sent to remote storage without losing a single piece of data?
  • If FileBeat fails, how can the next collection ensure that all logs are not collected again from the last state?
  • How can I solve the problem that fileBeat occupies too much memory or CPU?
  • How does FileBeat support Docker and Kubernetes? How to configure log collection in container?
  • If you want to send logs collected by FileBeat to the back-end storage, how can you customize the development if the native storage does not support it?

All of these require a deeper understanding of FileBeat. Let’s follow the source code of FileBeat to explore the implementation mechanism.

How is a log collected

The source of FileBeat belongs to the Beats project, which is designed to collect all kinds of data. Therefore, Beats abstracts a libbeat library, based on which we can quickly develop and implement a collection tool. Besides FileBeat, Other official projects like MetricBeat and PacketBeat are also in beats. If we take a look at the code, we can see that libbeat already implements memqueue, several output log sending clients, and data filtering processor, while FileBeat only implements log file reading and other log-related logic.

From a code implementation point of view, FileBeat can be divided into the following modules:

  • Input: Find the configured log file and start Harvester
  • Harvester: Reads the file and sends it to Spooler
  • Spooler: Cache log data until it can be sent to Publisher
  • Publisher: Send the log to the backend with the registrar informed
  • Registrar: Record the status of the log file being collected

1. Locate the log file

For log file collection and lifecycle management, FileBeat abstracts a Crawler structure, which is created according to the configuration after FileBeat is started, and then iterates through and runs each input:

for _, inputConfig := range c.inputConfigs {
	err := c.startInput(pipeline, inputConfig, r.GetStates())
}
Copy the code

In the logic of each input run, the matching log file is first obtained according to the configuration. It should be noted that the matching mode is not regular, but Linux Glob rules. There are some differences between regular and regular.

matches, err := filepath.Glob(path)
Copy the code

After all matched log files are retrieved, complex filtering is performed. For example, if exclude_files is configured, the files are ignored and their status is queried. If the last modification of the file is longer than ignore_older, the file is not collected.

2. Read the log file

After the harvester Goroutine matches the harvester log file, FileBeat starts the Harvester Goroutine for each file and sends the harvester Goroutine to the memqueue. In the (H *Harvester) Run() method, we can see an infinite loop with some omitted logic like this:

for {
	message, err := h.reader.Next()
	iferr ! = nil { switch err {case ErrFileTruncate:
			logp.Info("File was truncated. Begin reading file from offset 0: %s", h.state.Source)
			h.state.Offset = 0
			filesTruncated.Add(1)
		case ErrRemoved:
			logp.Info("File was removed: %s. Closing because close_removed is enabled.", h.state.Source)
		case ErrRenamed:
			logp.Info("File was renamed: %s. Closing because close_renamed is enabled.", h.state.Source)
		case ErrClosed:
			logp.Info("Reader was closed: %s. Closing.", h.state.Source)
		case io.EOF:
			logp.Info("End of file reached: %s. Closing because close_eof is enabled.", h.state.Source)
		case ErrInactive:
			logp.Info("File is inactive: %s. Closing because close_inactive of %v reached.", h.state.Source, h.config.CloseInactive)
		default:
			logp.Err("Read line error: %v; File: %v", err, h.state.Source)
		}
		return nil
	}
	...
	if! h.sendEvent(data, forwarder) {return nil
	}
}
Copy the code

As you can see, the reader.next () method keeps reading the log and, if no exception is returned, sends the log data to the cache queue. The Harvester Goroutine will exit the harvester Goroutine, stop collecting the file, and close the file handle. Filebeat defaults the close_inactive parameter to 5min to prevent the filebeat from occupying too many file handles for collecting log files. If the log file has not been modified within 5min, the above code will enter the ErrInactive case. And then the Harvester Goroutine will be shut down. Another thing to note in this scenario is that if a file is removed from the harvester Goroutine, the disk space occupied by the file will be retained until the harvester Goroutine ends because fileBeat holds the file handle.

3. Cache the queue

When memQueue is initialized, FileBeat creates BufferingEventLoop or DirectEventLoop depending on whether min_event is greater than 1. By default, BufferingEventLoop is buffered.

type bufferingEventLoop struct {
	broker *Broker

	buf        *batchBuffer
	flushList  flushList
	eventCount int

	minEvents    int
	maxEvents    int
	flushTimeout time.Duration

	// active broker API channels
	events    chan pushRequest
	get       chan getRequest
	pubCancel chan producerCancelRequest

	// ack handling
	acks        chan int      // ackloop -> eventloop : total number of events ACKed by outputs
	schedACKS chan chanList // eventloop -> ackloop : active list of batches to be acked pendingACKs chanList // ordered list of active batches to be send to the ackloop ackSeq uint // ack batch sequence number to validate ordering // buffer flush timer state timer *time.Timer idleC <-chan  time.Time }Copy the code

BufferingEventLoop is a structure that implements a Broker with various channels and is primarily used to send logs to consumers for consumption. The BufferingEventLoop’s Run method, which is also an infinite loop, can be thought of as a scheduling center for log events.

for {
	select {
	case <-broker.done:
		return
	case req := <-l.events: // producer pushing new event
		l.handleInsert(&req)
	case req := <-l.get: // consumer asking for next batch
		l.handleConsumer(&req)
	case count := <-l.acks:
		l.handleACK(count)
	case <-l.idleC:
		l.idleC = nil
		l.timer.Stop()
		if l.buf.length() > 0 {
			l.flushBuffer()
		}
	}
}
Copy the code

Harvester Goroutine receives log data and sends it to the Events Chan pushRequest channel in the bufferingEventLoop. If req := < -l.vents is triggered, the handleInsert method adds the data to the bufferingEventLoop’s BUF, the memQueue that actually caches the log data. The flushBuffer() method is called if the buF length exceeds the configured maximum or if the timer in bufferingEventLoop triggers case < -l.idlec. FlushBuffer () flushbuffers req := < -l.set case and executes the handleConsumer method.

	req.resp <- getResponse{ackChan, events}
Copy the code

Here we get the Consumer’s Response channel and send data to this channel. At this point, the consumer is triggered to consume the memqueue. Therefore, memqueue is not continuously consumed by consumers, but is consumed when memqueue notifies consumers, which we can understand as a kind of pulse sending.

4. Consumption queues

In fact, when FileBeat is initialized, an eventConsumer is created and attempts to retrieve log data from the Broker in the loop infinite loop method.

for {
	if! paused && c.out ! = nil && consumer ! = nil && batch == nil { out = c.out.workQueue queueBatch, err := consumer.Get(c.out.batchSize) ... batch = newBatch(c.ctx, queueBatch, c.out.timeToLive) } ... select {case <-c.done:
		return
	case sig := <-c.sig:
		handleSignal(sig)
	case out <- batch:
		batch = nil
	}
}
Copy the code

The consumer gets the log data from the Broker and sends it to the Out channel to be sent by the Output client.

select {
case c.broker.requests <- getRequest{sz: sz, resp: c.resp}:
case <-c.done:
	return nil, io.EOF
}

// if request has been send, we do have to wait for a response
resp := <-c.resp
return &batch{
	consumer: c,
	events:   resp.buf,
	ack:      resp.ack,
	state:    batchActive,
}, nil

Copy the code

GetRequest has the following structure:

type getRequest struct {
	sz   int              // request sz events from the broker
	resp chan getResponse // channel to send response to
}
Copy the code

Structure of getResponse:

type getResponse struct {
	ack *ackChan
	buf []publisher.Event
}
Copy the code

GetResponse contains the log data, and getRequest contains a channel that is sent to the consumer. In the above handleConsumer method of the bufferingEventLoop buffer queue, the parameter received is getRequest, which contains the getResponse channel of the consumer request. If handleConsumer does not send data, the consumer.get method blocks in the SELECT until flushBuffer, when the consumer’s getResponse channel receives the log data.

5. Send logs

When beats is created, a clientWorker will be created. The run method of clientWorker will continuously read log data from the channel sent by consumer, and then call client.Publish to send logs in batches.

func (w *clientWorker) run(a) {
	for! w.closed.Load() {for batch := range w.qu {
			iferr := w.client.Publish(batch); err ! =nil {
				return}}}}Copy the code

The libbeats library includes kafka, ElasticSearch, and Logstash clients, all of which implement the client interface:

type Client interface {
	Close() error
	Publish(publisher.Batch) error
	String() string
}
Copy the code

The most important thing, of course, is to implement the Publish interface and then send out the logs.

In fact, the design of the circulation of log data among various channels in FileBeat is quite complicated and tedious. The author also studied for a long time and drew a long architecture diagram before he could understand the logic. Here is a simplified diagram for your reference:

How to keep at least once

Filebeat maintains a registry file on a local disk that maintains the state of all log files that have been collected. In fact, an ACK event is returned whenever log data is successfully sent to the back end. Filebeat starts an independent Registry coroutine to listen for this event. Upon receiving the ACK event, the State of the log file will be updated to the Registry file. Offset in the State represents the Offset of the file read. So FileBeat guarantees that the log data before Offset is recorded will be received by the backend log store. The State structure is as follows:

type State struct {
	Id          string            `json:"-"` / /local unique id to make comparison more efficient
	Finished    bool              `json:"-"` // harvester state
	Fileinfo    os.FileInfo       `json:"-"` // the file info
	Source      string            `json:"source"`
	Offset      int64             `json:"offset"`
	Timestamp   time.Time         `json:"timestamp"`
	TTL         time.Duration     `json:"ttl"`
	Type        string            `json:"type"`
	Meta        map[string]string `json:"meta"`
	FileStateOS file.StateOS
}
Copy the code

The data recorded in the Registry file looks roughly like this:

[{"source":"/tmp/aa.log"."offset": 48."timestamp":"The 2019-07-03 T13:54:01. 298995 + 08:00"."ttl":-1,"type":"log"."meta":null,"FileStateOS": {"inode": 7048952,"device": 16777220}}]Copy the code

Because files can be renamed or moved, FileBeat marks each log file by inode and device number. If FileBeat is restarted abnormally, every harvester startup will read the Registry file and continue collecting from the state recorded last time, ensuring that you do not send all log files again from scratch. Of course, if fileBeat hangs before ack is returned during log sending, the Registry file will definitely not be updated to the latest state, so that the next collection time, this part of the log will be sent repeatedly, so this means that FileBeat can only guarantee at least once. There is no guarantee of non-repeat sending. Another unusual situation is that in Linux, if the old file is removed, the new file is created immediately. It is very likely that they have the same inode. Because FileBeat marks the offset of file record collection according to inode, the State of the removed file is actually recorded in Registry. The new file collection starts at the old file Offset, thus missing log data. To minimize inodes being reused and prevent registry files from growing larger over time, it is recommended to remove file states that have not been updated or deleted for a long time from Registry using clean_INACTIVE and clean_remove configurations.

Also, we can see that in harvester reading logs, the state of Registry is updated to handle some exception scenarios. For example, if a log file is cleared, FileBeat will return the ErrFileTruncate exception in the Next reader. Next method, set the inode flag file’s Offset to 0, end the harvester, and restart the new harvester, although the file remains the same. But the Offset in Registry is 0, and the collection starts from scratch.

In particular, if you deploy FileBeat using a container, you need to mount the Registry file to the host; otherwise, the registry file will be lost after the container restarts and FileBeat will start collecting log files again.

Filebeat automatically reload updates

Currently, FileBeat supports Reload input and Module configurations, but the reload mechanism only has periodic update. After reload.enable is enabled in the configuration, you can also configure reload.period to indicate the interval for automatic reload configuration. When FileBeat starts, it creates a coroutine just for reload. For harvester harvesters running, FileBeat adds them to a global Runner list. At intervals, it triggers a diff for configuration files. If harvesters need to be stopped, it adds them to the stopRunner list and shuts them down. The new Runner joins the startRunner list and starts the new Runner.

Filebeat support for Kubernetes

The official documentation of FileBeat provides the deployment mode based on Daemonset under Kubernetes. The most important configuration is as follows:

    - type: docker
      containers.ids:
      - "*"
      processors:
        - add_kubernetes_metadata:
            in_cluster: true
Copy the code

That is, set input to docker type. Because all of the container to the standard output log by default in the node/var/lib/docker/containers / < containerId > / * – json. The log path, so essentially collected is such a log file. Different from traditional deployment, if the service is deployed on Kubernetes, we can view and retrieve logs not only by nodes and services, but also by podName, containerName, etc. So we need to mark each log to add kubernetes meta information before sending to the back end. Filebeat: kubernetes watch: kubernetes Watch: kubernetes Watch: Kubernetes Watch: Kubernetes Watch: Kubernetes Watch The latest events of the POD belonging to this node are then synchronized to the local cache. Destruction of nodes in the event of a container to create, / var/lib/directory under the docker/containers/there will be a change, filebeat containerId are extracted based on path, according to containerId found in the local cache pod information, Data such as podName and label can be retrieved and added to the meta information fields of the log. Filebeat also has a beta feature called AutoDiscover, which is designed to centralize the management of FileBeat profiles scattered across different nodes. Currently, Kubernetes is also supported as a provider, which essentially listens for Kubernetes events and then collects docker standard output files. The general structure is as follows:

However, in the actual production environment, it is not enough to collect only the standard output logs of the container. We often need to collect the custom log directory mounted by the container, control the log collection mode of each service, and more customization functions.

On the light boat container cloud, we have developed an agent that listens to Kubernetes events and automatically generates fileBeat configuration. Through CRD, it supports the functions of customizing the container internal log directory, customizing fields, and supporting multi-line reading. At the same time, it can manage all kinds of log configuration in Kubernetes, and automatically complete the generation and update of log configuration in various scenarios without user awareness of POD creation, destruction and migration.

Performance analysis and tuning

The Beats series is lightweight, and while FileBeat written in Golang does have a much better memory footprint than JVA-BASED Logstash and others, it’s not that simple. Normal fileBeat startup usually takes up only 3 or 40MB of memory, but occasionally we will find that the Memory usage of FileBeat on some nodes exceeds the pod limit (usually set to 200MB) and trigger OOM repeatedly. Harvesters are created to harvest logs because of the number of containers running in a containerized environment, especially on bare metal machines. If FileBeat is not properly configured, there is a high probability that memory will increase dramatically. Of course, fileBeat memory occupies a large part of memqueue. All collected logs will be sent to memqueue aggregation first, and then sent out through output. Filebeat’s default configuration is 4096 events in the memqueue cache, which can be set using queue.memm. events. By default, the maximum event size of a log is 10MB, which can be set by using max_bytes. 4096 * 10MB = 40GB, as you can imagine, in extreme scenarios fileBeat takes up at least 40GB of memory. Especially when multiline is configured, if multiline is incorrectly configured and a single event collects thousands of logs, the memqueue may occupy a large amount of memory, resulting in memory explosion. Therefore, only by properly configuring the matching rules for log files, limiting the size of single-line logs, and configuring the number of memqueue caches based on actual conditions can the memory usage of FileBeat be avoided.

How to extend FileBeat

In general, FileBeat can meet most of the log collection requirements, but it still needs to be customized in some special scenarios. Of course, The design of FileBeat also provides good scalability. Beats currently only provides several types of output clients such as ElasticSearch, Kafka and Logstash. If we want fileBeat to send directly to other backends, we need to customize and develop our own output. Similarly, if you need to filter logs or add meta information, you can also customize the Processor plug-in. Whether it’s adding output or writing a processor, FileBeat provides the same general idea. Generally speaking, there are three ways:

  1. Fork FileBeat directly, developing on existing source code. Output or Processor provide interfaces like Run, Stop, etc. You just need to implement these interfaces and register the corresponding plug-in initialization methods in the init method. Of course, since the init method in Golang is called when the package is imported, you need to manually import fileBeat in the code that initializes it.
  2. Copy fileBeat’s main.go, import our own plugin library, and recompile. It’s essentially not that different from method 1.
  3. Filebeat also provides a plugin mechanism based on Golang Plugin. You need to compile your own plugin into a.so shared link library, and then specify the path of the library through -plugin in fileBeat startup parameter. Golang plugin is not yet mature and stable, and a plugin needs to rely on the same libbeat library and compile with the same Golang version.