Small knowledge, big challenge! This article is participating in the creation activity of “Essential Tips for Programmers”.
This article has participated in the “Digitalstar Project” and won a creative gift package to challenge the creative incentive money.
One, foreword
You can use Filebeat+Kafka+Logstash+Elasticsearch to collect log data from Elasticsearch (ES). And through Kibana visual display and analysis.
This paper introduces the concrete implementation method.
Ii. Background Information
Kafka is a distributed, high-throughput, scalable message queue service that is widely used in big data fields such as log collection, monitoring data aggregation, streaming data processing, online and offline analysis, and has become an indispensable part of the big data ecosystem. In actual application scenarios, to meet the requirements of real-time big data retrieval, you can use Filebeat to collect log data, and Kafka is used as the output end of Filebeat. Kafka receives the data collected by Filebeat in real time and outputs it using Logstash as the output. The output data in the Logstash file may not meet your requirements in format or content. In this case, you can use the Logstash filter plugin to filter the data. Finally, the data that meets the requirements will be output to ES for distributed retrieval, and the data will be analyzed and displayed through Kibana.
The simple processing flow is as follows:
Three, the operation process
- The preparatory work:
- Preparing the environment
- This includes creating corresponding services
- Install Filebeat.
- Configure Filebeat: Set Filebeat input to system logs, outPU to Kafka, and collect log data to a specific Topic in Kafka.
- Configure the Logstash pipe: Configure the Input of the Logstash pipe to Kafka and the output to ES, consume the data in the Logstash Topic and transfer it to ES.
- Check log consumption status: Check the consumption status of log data in Kafka to verify whether log data is successfully collected.
- Filter log data by Kibana: On the Discover page of the Kibana console, Filter kafka-related logs.
Iv. Preparation
CenterOS 7.6. More than 8 GB memory is recommended.
1. Docker environment
Run the following command:
# execute on docker node
# Tencent Cloud Docker Hub image
# export REGISTRY_MIRROR="https://mirror.ccs.tencentyun.com"
# DaoCloud mirror
# export REGISTRY_MIRROR="http://f1361db2.m.daocloud.io"
# AliYun Docker Hub image
export REGISTRY_MIRROR=https://registry.cn-hangzhou.aliyuncs.com
# installation docker
# Reference documentation is below
# https://docs.docker.com/install/linux/docker-ce/centos/
# https://docs.docker.com/install/linux/linux-postinstall/
# Uninstall the old version
yum remove -y docker \
docker-client \
docker-client-latest \
docker-ce-cli \
docker-common \
docker-latest \
docker-latest-logrotate \
docker-logrotate \
docker-selinux \
docker-engine-selinux \
docker-engine
# configure yum Repository
yum install -y yum-utils \
device-mapper-persistent-data \
lvm2
yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
Install and start dockerYum install - y docker - ce - 19.03.11 docker - ce - cli - 19.03.11 containerd. IO - 1.2.13 mkdir/etc/docker | |true
cat > /etc/docker/daemon.json <<EOF { "registry-mirrors": ["${REGISTRY_MIRROR}"], "exec-opts": ["native.cgroupdriver=systemd"], "log-driver": "json-file", "log-opts": { "max-size": "100m" }, "storage-driver": "overlay2", "storage-opts": [ "overlay2.override_kernel_check=true" ] } EOF
mkdir -p /etc/systemd/system/docker.service.d
# Restart Docker
systemctl daemon-reload
systemctl enable docker
systemctl restart docker
# disable firewall
systemctl stop firewalld
systemctl disable firewalld
Close # SeLinux
setenforce 0
sed -i "s/SELINUX=enforcing/SELINUX=disabled/g" /etc/selinux/config
Close # swap
swapoff -a
yes | cp /etc/fstab /etc/fstab_bak
cat /etc/fstab_bak |grep -v swap > /etc/fstab
Copy the code
Verify docker info:
[root@vm-1]# docker info
Client:
Debug Mode: false
Server:
Containers: 16
Running: 11
Paused: 0
Stopped: 5
Images: 22
Server Version: 19.03.11
Storage Driver: overlay2
Backing Filesystem: xfs
Supports d_type: true
Native Overlay Diff: true
Logging Driver: json-file
Cgroup Driver: systemd
Plugins:
Volume: local
Network: bridge host ipvlan macvlan null overlay
Log: awslogs fluentd gcplogs gelf journald json-file locallogentries splunk syslog Swarm: inactive Runtimes: runc Default Runtime: runc Init Binary: docker-init containerd version: 7ad184331fa3e55e52b890ea95e65ba581ae3429 runc version: dc9208a3303feef5b3839f4323d9beb36df0a9dd init version: fec3683 Security Options: seccomp Profile: Kernel Version: 3.10.0-1127.el7.x86_64 Operating System: CentOS Linux 7 (Core) OSType: Linux Architecture X86_64 CPUs: 4 Total Memory: 11.58GiB Name: vm-autotest-server ID: KQ5B:KAG5:LLB5:CUD4:NQZX:4GHL:5XLY:FM7X:KRJ5:X3WK:42GV:QLON Docker Root Dir: /var/lib/docker Debug Mode:false
Registry: https://index.docker.io/v1/
Labels:
Experimental: false
Insecure Registries:
172.16.62.179:5000
127.0.0.0/8
Registry Mirrors:
https://registry.cn-hangzhou.aliyuncs.com/
Live Restore Enabled: false
Copy the code
Docker Compose environment
Docker Compose is a tool for defining and running multiple Docker container applications. With Compose you can configure your application service using a YAML file, and with a single command, you can deploy all of your configured services.
Docker Compose
sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
Change the file's permissions to executable
chmod +x /usr/local/bin/docker-compose
# verify information
docker-compose --version
Copy the code
3. Version preparation
component | version | Deployment way |
---|---|---|
elasticsearch | 7.6.2 | Docker Compose |
logstash | 7.6.2 | Docker Compose |
kibana | 7.6.2 | Docker Compose |
zookeeper | latest | Docker Compose |
kafka | latest | Docker Compose |
filebeat | 7.4.2 | binary |
4. Environment initialization
Run the following command:
You need to set the system kernel parameters, otherwise ES will not start due to insufficient memory
# change Settings
sysctl -w vm.max_map_count=262144
# make effective immediately
sysctl -p
Create the logstash directory and copy the logstash configuration file logstash. Conf to this directory
mkdir -p /mydata/logstash
You need to create the elasticsearch/data directory and set permissions otherwise ES will fail to start with no permissions
mkdir -p /mydata/elasticsearch/data/
chmod 777 /mydata/elasticsearch/data/
Copy the code
5. Service installation
Docker-comemess. yml contains the following contents:
version: '3'
services:
elasticsearch:
image: Elasticsearch: 7.6.2
container_name: elasticsearch
user: root
environment:
- "cluster.name=elasticsearch" Set the cluster name to elasticSearch
- "discovery.type=single-node" Start in single-node mode
volumes:
- /mydata/elasticsearch/plugins:/usr/share/elasticsearch/plugins The plugin file is mounted
- /mydata/elasticsearch/data:/usr/share/elasticsearch/data Mount data files
- /etc/localtime:/etc/localtime:ro
- /usr/share/zoneinfo:/usr/share/zoneinfo
ports:
- 9200: 9200
- 9300: 9300
networks:
- elastic
logstash:
image: Logstash: 7.6.2
container_name: logstash
environment:
- TZ=Asia/Shanghai
volumes:
- /mydata/logstash/logstash.conf:/usr/share/logstash/pipeline/logstash.conf Mount the logstash configuration file
depends_on:
- elasticsearch # Kibana will start after ElasticSearch has started
links:
- elasticsearch:es You can use the es domain name to access elasticSearch
ports:
- 5044: 5044
networks:
- elastic
kibana:
image: Kibana: 7.6.2
container_name: kibana
links:
- elasticsearch:es You can use the es domain name to access elasticSearch
depends_on:
- elasticsearch # Kibana will start after ElasticSearch has started
environment:
- "elasticsearch.hosts=http://es:9200" Set the address to access ElasticSearch
- /etc/localtime:/etc/localtime:ro
- /usr/share/zoneinfo:/usr/share/zoneinfo
ports:
- 5601: 5601
networks:
- elastic
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
volumes:
- /mydata/zookeeper/data:/data
- /mydata/zookeeper/log:/datalog
- /etc/localtime:/etc/localtime:ro
- /usr/share/zoneinfo:/usr/share/zoneinfo
networks:
- elastic
ports:
- "2181:2181"
kafka:
container_name: kafka
image: wurstmeister/kafka
depends_on:
- zookeeper
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- /mydata/kafka:/kafka
- /etc/localtime:/etc/localtime:ro
- /etc/localtime:/etc/localtime:ro
links:
- zookeeper
ports:
- "9092:9092"
networks:
- elastic
environment:
- KAFKA_LISTENERS=INTERNAL://kafka:9092, OUT://kafka:29092
- KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka:9092, OUT://kafka:29092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,OUT:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=OUT
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_MESSAGE_MAX_BYTES=2000000
- KAFKA_CREATE_TOPICS=logs:1:1
networks:
elastic:
Copy the code
To start all services, run the docker-compose up command on the Linux server where the file is uploaded.
[root@vm-1]# docker-compose -f docker-compose.yml up -d
[root@vm-1]# docker-compose ps
Name Command State Ports
-----------------------------------------------------------------------------------------------------------
elasticsearch /usr/local/bin/docker-entr ... Up 0.0.0.0:9200->9200/tcp, 0.0.0.0:9300->9300/tcp kafka start-kafka.sh Up 0.0.0.0:9092->9092/ TCP kibana /usr/local/bin/dumb-init - ... The Up 0.0.0.0:5601 - > 5601 / TCP logstash/usr /local/bin/docker-entr ... Up 0.0.0.0:5044->5044/ TCP, 9600/tcp zookeeper/bin/sh -c /usr/sbin/sshd... Up 0.0.0.0:2181->2181/ TCP, 22/ TCP, 2888/ TCP, 3888/ TCP [root@vm-autotest-server elk]#
Copy the code
Filebeat client installation mode:
Curl - L - O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.4.2-linux-x86_64.tar.gz tar XZVF Filebeat - 7.4.2 - Linux - x86_64. Tar. GzcdFilebeat - 7.4.2 - Linux - x86_64Copy the code
6. Service Settings
When all dependent services are started, some Settings need to be made for the following services.
# ElasticSearch needs the Chinese word analyzer IKAnalyzer installed and restarted.
docker exec -it elasticsearch /bin/bash
This command needs to be run in a container
elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.6.2/elasticsearch-analysis-ik-7.6.2.zip
docker restart elasticsearch
# logSTAS H requires the jSON_lines plug-in to be installed and restarted.
docker exec -it logstash /bin/bash
logstash-plugin install logstash-codec-json_lines
docker restart logstash
Copy the code
5. Configure Filebeat
Modify the contents of filebeat.yml
ilebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/nginx/*.log
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
setup.template.settings:
index.number_of_shards: 1
setup.dashboards.enabled: false
setup.kibana:
host: "http://kafka:5601"
output.kafka:
hosts: ["kafka:9092"]
topic: 'logs'
codec.json:
pretty: false
Copy the code
Parameter Description:
parameter | instructions |
---|---|
type | Input type. Log: indicates that the input source is logs. |
enabled | Set whether the configuration takes effect. True indicates that it takes effect. False indicates that it does not. |
paths | Path of the log file to be monitored. Multiple logs can be written to the log file path on another line under the current path. |
hosts | Message queue Kafka instance access point. |
topic | The Topic for which logs are output to the message queue Kafka, specify a Topic that has been created. |
Note: Add the IP address of kafka server to the client hosts and configure FileBeat using Ansible.
[root@vm-1# cat /etc/hosts
172.16.62.179 kafka
The client starts the service
[root@vm-1#./filebeat &
Copy the code
For more configurations, see:
- Log on filebeat input configuration, see website document www.elastic.co/guide/en/be…
- About filebeat output to kafka’s configuration, see the official document www.elastic.co/guide/en/be…
Configure the Logstash pipeline
Modify the logstash. Conf content:
input {
# # 来源beats
# beats {
# port
# port => "5044"
#}
kafka {
bootstrap_servers => "kafka:29092"
topics => ["logs"]
group_id => "logstash"
codec => json
}
}
# Analysis, filter plug-in, can be multiple
# filter {
# grok {
# match => { "message" => "%{COMBINEDAPACHELOG}"}
#}
# geoip {
# source => "clientip"
#}
#}
output {
# choose elasticsearch
elasticsearch {
hosts => ["http://es:9200"]
#index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
index => "logs-%{+YYYY.MM.dd}"}}Copy the code
Input Parameter description:
parameter | instructions |
---|---|
bootstrap_servers | Message queue Kafka instance access point |
group_id | Specify the name of the created Consumer Group. |
topics | Specifies the name of the Topic that has been created, which must be the same as the Topic name configured in Filebeat. |
codec | If this parameter is set to JSON, jSON-formatted fields are parsed for easy analysis in Kibana. |
Output Parameter description:
parameter | instructions |
---|---|
hosts | Access address of ES. The value ishttp://< ES Intranet address >:9200. |
user | User name accessing ES. Default is elastic. |
password | Password to access ES. |
index | Index name. Set to **logs‐%{+ YYYy.mm. Dd} ** indicates that the index name prefixes logs and suffixes the date, for exampleLogs – 2021.09.28. |
Note: The most critical part of the Logstash is the filter, for debugging the configuration of the filter.
For more configurations, see:
- The logstash kafka – input configuration are shown in the official document: www.elastic.co/guide/en/lo…
- The logstash grok – filter configuration are shown in the official document: www.elastic.co/guide/en/lo…
- The logstash output – elasticsearch configuration are shown in the official document: www.elastic.co/guide/en/lo…
Check kafka log consumption status
The command is as follows:
# Enter container
docker exec -it kafka bash
Kafka is installed in /opt/kafka by default
cd opt/kafka
To query consumption data, you must specify a groupBash - 5.1 -# bin/kafka-consumer-groups.sh --bootstrap-server 172.16.62.179:9092 --list
logstash
# to check the topicBash - 5.1 -# bin/kafka-topics. Sh --list --zookeeper 172.16.62.179:2181
__consumer_offsets
logs
# Check your spendingBash - 5.1 -# bin/kafka-consumer-groupsdescribe --bootstrap-server 172.16.62.179:9092 --group logstashGROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID logstash logs 0 107335 107335 0 Logstash - 0 - c6d82a1c f14 0-4372 - b49f - 8 cd476f54d90/172.19.0.2 logstash - 0# Parameter description:
#--describe displays detailed information
--bootstrap-server specifies the kafka connection address
#--group Specifies a group.
Copy the code
Field Description:
TOPIC | PARTITION | CURRENT-OFFSET | LOG-END-OFFSET | LAG | CONSUMER-ID | HOST | CLIENT-ID |
---|---|---|---|---|---|---|---|
Topic name | The partition id | Number of items currently consumed | The total number of article | Number of items not consumed | Consumer id | Host IP | The client id |
As can be seen from the above information, topic consumes a total of 107,335 pieces of information for logs, and the number of unconsumed pieces is 0. In other words, there is no backlog of consumption data.
Check the content of ES
Check to see if the ES received a log from the Logstash plugin
Filter log data through Kibana
1. Create index-pattern
Click connect to Your Elasticsearch Index
Fill in the Index name in es to support regular matching, enter Index pattern (logs-* is used in this article), and click Next Step.
Select “@timestamp” as the time filtering field, and then click “Create Index Pattern” :
After creation:
2. View logs
In the left navigation bar, click Discover.
From the drop-down list on the left side of the page, select the created index schema (logs-*). In the upper right corner of the page, select a period of time and view the log data collected by Filebeat within the period.
Ten, summary
In actual enterprise projects, ELK is a relatively mature and widely used technical solution. The performance of logStash is weaker than that of FileBeat. Filebeat is recommended because it does not run directly on collection points. Before logs enter ELK, empirically kafka is preinstalled as a queue and buffer on the one hand, and a unified entry channel on the other.
Source code address:
- Github.com/zuozewei/bl…