This is the fifth day of my participation in the August More text Challenge. For details, see: August More Text Challenge
Logstash is a free and open server-side data processing pipeline that collects data from multiple sources, transforms it, and sends it to your favorite “repository.
Logstash is an open source data collection engine with real-time pipelining capabilities. Logstash dynamically unites data from different sources and normalizes the data to a destination of your choice. Clean up and democratize all your data for a variety of high-level downstream analysis and visualization use cases.
Install the Logstash
Install from a downloaded binary
Logstash binaries are available from www.elastic.co/cn/download. .
Download the Logstash installation file – targ. GZ, DEB, ZIP, or RPM – for your host environment.
Start the logstash
Test from the console with input stdin and output stdout
CD logstash-7.13.4./bin/logstash -e 'input {stdin {}} output {stdout {}}'Copy the code
At this point, the console enters anything as an input stream and press enter, and the output stream is printed to the console.
Specifying a configuration file
-f specifies the configuration file. –config.reload.automatic automatically reloads the configuration file
bin/logstash -f first-pipeline.conf --config.reload.automatic
Copy the code
The configuration file consists of three parts: Input Filter Output
Codec plugins
The CODEC plug-in can process data in the input and output streams and change the format of the data.
Common codec are
- Json reads the content in JSON format and creates an event for each element in the JSON array
- Json_lines reads JSON separated by newlines
- Plain reads plain text with no separation between events
- Mutiline combines multiple line messages into a single event
Enter the Kafka log message to logStash
Topics specify topic JSON for listening to convert the message to JSON format
input {
kafka {
id = > "my_plugin_id"
bootstrap_servers = > "localhost:9092"
topics = > ["logger-channel"]
auto_offset_reset = > "latest"}}filter {
#json
json {
source = > "message"
}
date {
match = > ["time"."yyyy-MM-dd HH:mm:ss.SSS"]
remove_field = > ["time"]}}output {
stdout{}}Copy the code
Start the service
./bin/logstash -f ./config/kafka-std.conf --config.reload.automatic
Copy the code
The console received a log message from Kafka
{ "logger" => "com.paw.kafka.elk.controller.KafkaLogController", "@version" => 1, "thread" => "http-nio-8080-exec-7", "Timestamp" => 2021-08-01T07:10:27.273z, "appName" => "paw-kelk", "message" => "cost time: 23", "env" => "dev", "caller" => { "file" => "KafkaLogController.java", "method" => "kafka", "class" => "com.paw.kafka.elk.controller.KafkaLogController", "line" => 35 }, "level" => "INFO" } { "logger" => "com.paw.kafka.elk.controller.KafkaLogController", "@version" => 1, "Thread" => "http-niO-8080-exec-7 ", "levelVal" => 10000, "@timestamp" => 2021-08-01t07:10:27.273z, "appName" => "paw-kelk", "message" => "debug time: 23", "env" => "dev", "caller" => { "file" => "KafkaLogController.java", "method" => "kafka", "class" => "com.paw.kafka.elk.controller.KafkaLogController", "line" => 36 }, "level" => "DEBUG" }Copy the code
At this point, the kafka log writing to logStash is complete. Logsstash serves as a consumer of the Kafka log topic. Kafka sends logs to LogStash. Logtash finishes the log data as an input stream and outputs the log data to the output stream such as ElasticSearch after filter processing.