What is the ELK
ELK is an acronym for ElasticSearch, Logstash, and Kibana. ELK is a complete log collection and front-end presentation solution provided by Elastic.
Logstash is responsible for log processing, such as log filtering, log formatting, etc. ElasticSearch is a log storage container because of its powerful text search capability. Kibana was responsible for the front end presentation.
The ELK architecture is shown below:
Filebeat was added for collecting logs from different clients and passing them to Logstash for unified processing.
The construction of the ELK
Because ELK is three products, you can choose to install them one by one.
Here you choose to install ELk using Docker.
Docker ELk installation can also choose to download the images of the three products and run them respectively, but this time, the three-in-one image of ELk is directly downloaded to install.
Therefore, first of all, we must ensure that there is already a Docker running environment. Please check the construction of the Docker running environment:
Pull the mirror
Once you have the Docker environment, run the following command on the server:
This command is to download the elK three-in-one image from the Docker warehouse, with a total size of more than 2 GIGABytes. If the download speed is too slow, you can replace the source address of the Docker warehouse with the domestic source address. Once the download is complete, view the image:Copy the code
Logstash configuration
Create beats-input.conf in /usr/config/logstash for log input:
input {
beats {
port => 5044
}
}Copy the code
Create a new output.conf file for ElasticSearch from Logstash:
output {
elasticsearch {
hosts => ["localhost"]
manage_template => false
index => "%{[@metadata][beat]}"
}
}Copy the code
The index is the index output to ElasticSearch.
Run the container
After you have the image, you can start it directly:
-D means background running container; The default ports for ElasticSearch are 9200 and 9300. I have 3 instances of ElasticSearch running on my machine, so I have changed the mapping port. -v mean hosting file | folder: container file | folder, here the container elasticsearch mount of data to the host machine ` / var/data/elk `, to prevent the loss of container restart data; And mount the logstash configuration file to the '/usr/config/logstash' directory of the host machine. -- Name means to name the container so that it is easier to manipulate the container later. If you have built ElasticSearch before, you will find that there are various errors in building ElasticSearch, but there are no errors in building ELk with Docker. Docker ps'! [see container] (HTTP: / / https://p1-jj.byteimg.com/tos-cn-i-t2oaga2asx/gold-user-assets/2019/10/24/16dfc531cc514905~tplv-t2oaga2asx-image. 'docker exec it elk /bin/bash' 'Docker restart elk' ## check the Kinaba browser by typing http://my_host:5601/ to see the Kinaba interface. At this time, no data is available for ElasticSearch. You need to install Filebeat to collect data from ElK. Filebeat is used to collect data and report it to a Logstash or ElasticSearch server. Download Filebeat from the server where you need to collect logs and decompress itCopy the code
Go to FileBeat and modify filebeat.yml.Copy the code
filebeat.prospectors:
- type: log
The configuration takes effect only when it is set to true
enabled: truepath:
Configure the path for collecting logs
- /var/log/*.log
You can type a tag and use it later
tag: [“my_tag”]
Type of ElasticSearch
documenttype: mytypesetup.kibana:
Here is the IP address and port of Kibana, namely, Kibana :5601
host: “”output.logstash:
Here is the IP address and port of the logstash, namely, logstash:5044
host: [“”]
The value must be set to true; otherwise, it does not take effect
enabled: true
ElasticSearch = output ElasticSearch = output ElasticSearch = output ElasticSearch
## Run Filebeat:Copy the code
Now you can see that Filebeat sends the configured path log to the Logstash file; Then in elK, the Logstash data is processed and sent to ElasticSearch. However, we want to do data analysis via ELK, so the data imported into ElasticSearch must be in JSON format.
This is the format of my single log before:
The 2019-10-22 10:44:03. 441 INFO RMJK. Interceptors. IPInterceptor Line: 248 - {"clientType":"1","deCode":"0fbd93a286533d071","eaType":2,"eaid":191970823383420928,"ip":"xx.xx.xx.xx","model":"HONOR STF-AL10","osType":"9","path":"/applicationEnter","result":5,"session":"ef0a5c4bca424194b29e2ff31632ee5c","timestamp":15 71712242326, "uid" : "130605789659402240", "v" : "2.2.4}"Copy the code
After importing the log, it was not easy to analyze. Then I decided to use grok in the Logstash filter to process the log into JSON format and import it into ElasticSearch. However, since the parameters in the log were not fixed, I found it too difficult, so I switched to Logback. After formatting the log directly into JSON, Filebeat sends it.
Logback configuration
My project is Spring Boot, add dependencies to the project:
<dependency> <groupId>net.logstash.logback</groupId> <artifactId>logstash-logback-encoder</artifactId> The < version > 5.2 < / version > < / dependency >Copy the code
Then add logback.xml to the project’s resource directory:
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<!--
说明:
1、日志级别及文件
日志记录采用分级记录,级别与日志文件名相对应,不同级别的日志信息记录到不同的日志文件中
例如:error级别记录到log_error_xxx.log或log_error.log(该文件为当前记录的日志文件),而log_error_xxx.log为归档日志,
日志文件按日期记录,同一天内,若日志文件大小等于或大于2M,则按0、1、2...顺序分别命名
例如log-level-2013-12-21.0.log
其它级别的日志也是如此。
2、文件路径
若开发、测试用,在Eclipse中运行项目,则到Eclipse的安装路径查找logs文件夹,以相对路径../logs。
若部署到Tomcat下,则在Tomcat下的logs文件中
3、Appender
FILEERROR对应error级别,文件名以log-error-xxx.log形式命名
FILEWARN对应warn级别,文件名以log-warn-xxx.log形式命名
FILEINFO对应info级别,文件名以log-info-xxx.log形式命名
FILEDEBUG对应debug级别,文件名以log-debug-xxx.log形式命名
stdout将日志信息输出到控制上,为方便开发测试使用
-->
<contextName>service</contextName>
<property name="LOG_PATH" value="logs"/>
<!--设置系统日志目录-->
<property name="APPDIR" value="doctor"/>
<!-- 日志记录器,日期滚动记录 -->
<appender name="FILEERROR" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 正在记录的日志文件的路径及文件名 -->
<file>${LOG_PATH}/${APPDIR}/log_error.log</file>
<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 归档的日志文件的路径,例如今天是2013-12-21日志,当前写的日志文件路径为file节点指定,可以将此文件与file指定文件路径设置为不同路径,从而将当前日志文件或归档日志文件置不同的目录。
而2013-12-21的日志文件在由fileNamePattern指定。%d{yyyy-MM-dd}指定日期格式,%i指定索引 -->
<fileNamePattern>${LOG_PATH}/${APPDIR}/error/log-error-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<!-- 除按日志记录之外,还配置了日志文件不能超过2M,若超过2M,日志文件会以索引0开始,
命名日志文件,例如log-error-2013-12-21.0.log -->
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>2MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
</rollingPolicy>
<!-- 追加方式记录日志 -->
<append>true</append>
<!-- 日志文件的格式 -->
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level %logger Line:%-3L - %msg%n</pattern>
<charset>utf-8</charset>
</encoder>
<!-- 此日志文件只记录info级别的 -->
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>error</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<!-- 日志记录器,日期滚动记录 -->
<appender name="FILEWARN" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 正在记录的日志文件的路径及文件名 -->
<file>${LOG_PATH}/${APPDIR}/log_warn.log</file>
<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 归档的日志文件的路径,例如今天是2013-12-21日志,当前写的日志文件路径为file节点指定,可以将此文件与file指定文件路径设置为不同路径,从而将当前日志文件或归档日志文件置不同的目录。
而2013-12-21的日志文件在由fileNamePattern指定。%d{yyyy-MM-dd}指定日期格式,%i指定索引 -->
<fileNamePattern>${LOG_PATH}/${APPDIR}/warn/log-warn-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<!-- 除按日志记录之外,还配置了日志文件不能超过2M,若超过2M,日志文件会以索引0开始,
命名日志文件,例如log-error-2013-12-21.0.log -->
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>2MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
</rollingPolicy>
<!-- 追加方式记录日志 -->
<append>true</append>
<!-- 日志文件的格式 -->
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level %logger Line:%-3L - %msg%n</pattern>
<charset>utf-8</charset>
</encoder>
<!-- 此日志文件只记录info级别的 -->
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>warn</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<!-- 日志记录器,日期滚动记录 -->
<appender name="FILEINFO" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 正在记录的日志文件的路径及文件名 -->
<file>${LOG_PATH}/${APPDIR}/log_info.log</file>
<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 归档的日志文件的路径,例如今天是2013-12-21日志,当前写的日志文件路径为file节点指定,可以将此文件与file指定文件路径设置为不同路径,从而将当前日志文件或归档日志文件置不同的目录。
而2013-12-21的日志文件在由fileNamePattern指定。%d{yyyy-MM-dd}指定日期格式,%i指定索引 -->
<fileNamePattern>${LOG_PATH}/${APPDIR}/info/log-info-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<!-- 除按日志记录之外,还配置了日志文件不能超过2M,若超过2M,日志文件会以索引0开始,
命名日志文件,例如log-error-2013-12-21.0.log -->
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>2MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
</rollingPolicy>
<!-- 追加方式记录日志 -->
<append>true</append>
<!-- 日志文件的格式 -->
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level %logger Line:%-3L - %msg%n</pattern>
<charset>utf-8</charset>
</encoder>
<!-- 此日志文件只记录info级别的 -->
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>info</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<appender name="jsonLog" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 正在记录的日志文件的路径及文件名 -->
<file>${LOG_PATH}/${APPDIR}/log_IPInterceptor.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_PATH}/${APPDIR}/log_IPInterceptor.%d{yyyy-MM-dd}.log</fileNamePattern>
</rollingPolicy>
<encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
<jsonFactoryDecorator class="net.logstash.logback.decorate.CharacterEscapesJsonFactoryDecorator">
<escape>
<targetCharacterCode>10</targetCharacterCode>
<escapeSequence>\u2028</escapeSequence>
</escape>
</jsonFactoryDecorator>
<providers>
<pattern>
<pattern>
{
"timestamp":"%date{ISO8601}",
"uid":"%mdc{uid}",
"requestIp":"%mdc{ip}",
"id":"%mdc{id}",
"clientType":"%mdc{clientType}",
"v":"%mdc{v}",
"deCode":"%mdc{deCode}",
"dataId":"%mdc{dataId}",
"dataType":"%mdc{dataType}",
"vid":"%mdc{vid}",
"did":"%mdc{did}",
"cid":"%mdc{cid}",
"tagId":"%mdc{tagId}"
}
</pattern>
</pattern>
</providers>
</encoder>
</appender>
<!-- 彩色日志 -->
<!-- 彩色日志依赖的渲染类 -->
<conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter"/>
<conversionRule conversionWord="wex"
converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter"/>
<conversionRule conversionWord="wEx"
converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter"/>
<!-- 彩色日志格式 -->
<property name="CONSOLE_LOG_PATTERN"
value="${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}"/>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!--encoder 默认配置为PatternLayoutEncoder-->
<encoder>
<pattern>${CONSOLE_LOG_PATTERN}</pattern>
<charset>utf-8</charset>
</encoder>
<!--此日志appender是为开发使用,只配置最底级别,控制台输出的日志级别是大于或等于此级别的日志信息-->
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>debug</level>
</filter>
</appender>
<!-- 指定项目中某个包,当有日志操作行为时的日志记录级别 -->
<!-- rmjk.dao.mappe为根包,也就是只要是发生在这个根包下面的所有日志操作行为的权限都是DEBUG -->
<!-- 级别依次为【从高到低】:FATAL > ERROR > WARN > INFO > DEBUG > TRACE -->
<logger name="rmjk.dao.mapper" level="DEBUG"/>
<logger name="rmjk.service" level="DEBUG"/>
<!--显示日志-->
<logger name="org.springframework.jdbc.core" additivity="false" level="DEBUG">
<appender-ref ref="STDOUT"/>
<appender-ref ref="FILEINFO"/>
</logger>
<!-- 打印json日志 -->
<logger name="IPInterceptor" level="info" additivity="false">
<appender-ref ref="jsonLog"/>
</logger>
<!-- 生产环境下,将此级别配置为适合的级别,以免日志文件太多或影响程序性能 -->
<root level="INFO">
<appender-ref ref="FILEERROR"/>
<appender-ref ref="FILEWARN"/>
<appender-ref ref="FILEINFO"/>
<!-- 生产环境将请stdout,testfile去掉 -->
<appender-ref ref="STDOUT"/>
</root>
</configuration>Copy the code
The key points are:
<logger name="IPInterceptor" level="info" additivity="false">
<appender-ref ref="jsonLog"/>
</logger>Copy the code
Introduce SLf4J in the file that needs to be printed:
private static final Logger LOG = LoggerFactory.getLogger("IPInterceptor");Copy the code
Information to be printed is stored in the MDC:
MDC.put("ip", ipAddress);
MDC.put("path", servletPath);
MDC.put("uid", paramMap.get("uid") == null ? "" : paramMap.get("uid").toString());Copy the code
Now if you use itLOG.info("msg")
If so, the printed content will be entered into the log message in the following format:
Modify the Logstash configuration
Change the beth-input. conf file in /usr/config/logstash:
input {
beats {
port => 5044
codec => "json"
}
}Copy the code
Just codec => “JSON” is added, but Logstash will parse the input in JSON format.
Restart ELk because of configuration changes:
docker restart elk
In this way, when our logs are generated, we can import them into ELk using Filebeat and analyze them through Kibana.Recomments are the biggest encouragement