The background,
Recently, the company used FileBeat, so I learned this technology. Filebeat is a lightweight log collection tool developed in Golang that can forward logs to ES, Kafka, etc. Official support for FileBeat is the most comprehensive. Filebeat performs well and is easy to deploy, making it an ideal tool for collecting files. Filebeat was also developed to replace LogStash, which takes up less memory than logStash. Of course, there are also disadvantages. For example, The official functions provided by FileBeat are relatively simple, which often cannot meet our needs. We often collect logs into Kafka and continue processing with the help of tools such as Flink.
Second, the principle of
Here’s how FileBeat works. The Harvester module of FileBeat collects one or more lines of logs after a service is written to the Harvester log. The Harvester aggregate log is then sent to an exporter such as ES or Kafka. To preserve file state, Harvester records the offset of the log to the Registry file. Each input corresponds to a Registry file, which will be restored from if FileBeat is restarted. How does FileBeat ensure that logs are sent successfully at least once? In fact, the registry file is also used. After sending, if there is no confirmation of success, filebat will resend the file until it succeeds.
Docker deployment
Considering docker’s advantages, such as consistent operating environment and easy migration, we use Docker to deploy FileBeat. Here is an example of a Docker-compose deployment
3.1. Configure docker-compose file
version: "3"
services:
elasticsearch:
image: Elasticsearch: 7.11.1
container_name: elasticsearch
hostname: elasticsearch1
environment:
- discovery.type=single-node
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ports:
- 9200: 9200
- 9300: 9300
networks:
- "elk-net"
kibana:
image: Docker. Elastic. Co/kibana/kibana: 7.1.1
environment:
- SERVER_NAME=kibana
- ELASTICSEARCH_URL=http://elasticsearch1:9200
- XPACK_MONITORING_ENABLED=true
ports:
- "5601:5601"
networks:
- "elk-net"
depends_on:
- "elasticsearch"
filebeat:
image: Docker. Elastic. Co/beats/filebeat: 7.1.1
volumes:
- ./filebeat/logs/nginx:/var/log/nginx/
- ./filebeat/logs/biz:/var/log/biz/
- ./filebeat/logs/log4j:/var/log/log4j/
- ./filebeat/filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml
networks:
- "elk-net"
depends_on:
- "elasticsearch"
- "kibana"
networks:
elk-net:
Copy the code
Log4j, Nginx, and business logs will be collected to ES and then displayed to Kibana
3.2. Configure FileBeat
# Details are as follows:
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/nginx/*.log
scan_frequency: 10s
tail_files: true
fields:
index_name: "nginx_log"
- type: log
enabled: true
# Lines that do not begin with [are merged to the end of the previous line
multiline.type: pattern
multiline.pattern: '^[[:space:]]+(at|\.{3})[[:space:]]+\b|^Caused by:'
multiline.negate: false
multiline.match: after
paths:
- /var/log/log4j/*.log
fields:
index_name: "log4j_log"
- type: log
enabled: true
multiline.type: pattern
multiline.pattern: '^[[:space:]]+(at|\.{3})[[:space:]]+\b|^Caused by:'
multiline.negate: false
multiline.match: after
fields:
index_name: "biz_log"
scan_frequency: 10s
pipeline: "extract-traceid-pipeline"
paths:
- /var/log/biz/*.log
This index life cycle needs to be disabled in the # # # 7.x version, otherwise there will be problems when specifying the es index name
setup.ilm.enabled: false
setup.template.name: "my-log"
setup.template.pattern: "my-*"
setup.template.enabled: true
setup.template.overwrite: false
# output to es
output.elasticsearch:
#worker: 1
#bulk_max_size: 1500
hosts: ["elasticsearch1:9200"]
index: "pb-%{[fields.index_name]}- *"
indices:
- index: "pb-nginx-%{+yyyy.MM.dd}"
when.equals:
fields.index_name: "nginx_log"
- index: "pb-log4j-%{+yyyy.MM.dd}"
when.equals:
fields.index_name: "log4j_log"
- index: "pb-biz-%{+yyyy.MM.dd}"
when.equals:
fields.index_name: "biz_log"
Copy the code
Pipeline to es
curl -H "Content-Type: Application/json "- XPUT 'http://127.0.0.1:9200/_ingest/pipeline/extract-traceid-pipeline' [email protected]Copy the code
Results the following
Filebeat key parameters
4.1. How to distinguish different logs
- Fields, adding attachment fields to enable Values, Arrays, dictionaries, or any nested data. You can also add conditional statements to the output, when. Equals, and then output to different indexes
- Enter the Add Tag field, which can be used for grouping
4.2 How to extract parameters, taking Trace as an example
This can be extracted using gork syntax. The steps are as follows
- Write the Pipleline file
"description" : "extract-traceid-pipeline",
"processors" : [
{
"grok" :{
"field" : "message",
"patterns" : ["ERROR\\|%{DATA:trace_id}\\|"]
}
}
]
}
Copy the code
- The test results
www.5axxw.com/tools/v2/gr…
Write to es and FileBeat profiles
curl -H "Content-Type: Application/json "- XPUT 'http://127.0.0.1:9200/_ingest/pipeline/extract-traceid-pipeline' [email protected]Copy the code
Viewing the write Effect
The final result
4.2. How to collect the Java exception stack
multiline.type: pattern
multiline.pattern: '^[[:space:]]+(at|\.{3})[[:space:]]+\b|^Caused by:'
multiline.negate: false
multiline.match: after
Copy the code
4.3 Why doesn’t Pipleline work
The official website is in out, but this version of the actual configuration does not work, need to be added to the input
- type: log
enabled: true
multiline.type: pattern
multiline.pattern: '^[[:space:]]+(at|\.{3})[[:space:]]+\b|^Caused by:'
multiline.negate: false
multiline.match: after
fields:
index_name: "biz_log"
scan_frequency: 10s
pipeline: "extract-traceid-pipeline"
paths:
- /var/log/biz/*.log
Copy the code
4.4 What if gorK cannot handle the complex business logs
This situation is typically output to Kafka and then handled by other middleware, such as Flink.
output.kafka:
# initial brokers for reading cluster metadata
hosts: ["kafka1:9092"."kafka2:9092"."kafka3:9092"]
# message topic selection + partitioning
topic: '%{[fields.log_topic]}'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
Copy the code
4.5. How to improve collection efficiency?
-
Elasticsearch Bulk API bulk API for ElasticSearch Bulk API Bulk API for ElasticSearch Bulk API Bulk API for ElasticSearch Bulk API We have a document(or event) per row, so fileBeat only sends 50 rows per row by default, so when we add hundreds of thousands of rows, we can easily calculate how many times we need to push BULK Request to complete the data entry of the file
-
Worker is also an output. elasticSearch property. We can specify the maximum concurrency that FileBeat uses to send data to and from Elastic. We can also increase this value appropriately. [“10.0.07:9200″,”10.0.08:9200″,”10.0.09:9200”], we can set the worker to 3.
-
The harvester harvester harvester harvester harvester harvester harvester harvester harvester harvester harvester harvester harvester harvester harvester harvester harvester harvester Harvester Harvester Harvester Harvester Harvester If we want to increase the read throughput of certain files, we can adjust the size of this value. You can determine the throughput size of different files by defining multiple inputs, each individually specified
Reference documentation
- 1. Official documents
- 2. What is the relationship between logstash and FileBeat
- 3. Identify sources