This is the fifth day of my participation in the August More text Challenge. For details, see: August More Text Challenge

Logstash is a free and open server-side data processing pipeline that collects data from multiple sources, transforms it, and sends it to your favorite “repository.

Logstash is an open source data collection engine with real-time pipelining capabilities. Logstash dynamically unites data from different sources and normalizes the data to a destination of your choice. Clean up and democratize all your data for a variety of high-level downstream analysis and visualization use cases.

Install the Logstash

Install from a downloaded binary

Logstash binaries are available from www.elastic.co/cn/download. .

Download the Logstash installation file – targ. GZ, DEB, ZIP, or RPM – for your host environment.

Start the logstash

Test from the console with input stdin and output stdout

CD logstash-7.13.4./bin/logstash -e 'input {stdin {}} output {stdout {}}'Copy the code

At this point, the console enters anything as an input stream and press enter, and the output stream is printed to the console.

Specifying a configuration file

-f specifies the configuration file. –config.reload.automatic automatically reloads the configuration file

bin/logstash -f first-pipeline.conf --config.reload.automatic
Copy the code

The configuration file consists of three parts: Input Filter Output

Codec plugins

The CODEC plug-in can process data in the input and output streams and change the format of the data.

Common codec are

  • Json reads the content in JSON format and creates an event for each element in the JSON array
  • Json_lines reads JSON separated by newlines
  • Plain reads plain text with no separation between events
  • Mutiline combines multiple line messages into a single event

Enter the Kafka log message to logStash

Topics specify topic JSON for listening to convert the message to JSON format

input {
  kafka {
    id = > "my_plugin_id"
    bootstrap_servers = > "localhost:9092"
    topics = > ["logger-channel"]
    auto_offset_reset = > "latest"}}filter {
  #json
	json {
		source = > "message"
	}
	date {
		match = > ["time"."yyyy-MM-dd HH:mm:ss.SSS"]
		remove_field = > ["time"]}}output {
  stdout{}}Copy the code

Start the service

./bin/logstash -f ./config/kafka-std.conf --config.reload.automatic
Copy the code

The console received a log message from Kafka

{ "logger" => "com.paw.kafka.elk.controller.KafkaLogController", "@version" => 1, "thread" => "http-nio-8080-exec-7", "Timestamp" => 2021-08-01T07:10:27.273z, "appName" => "paw-kelk", "message" => "cost time: 23", "env" => "dev", "caller" => { "file" => "KafkaLogController.java", "method" => "kafka", "class" => "com.paw.kafka.elk.controller.KafkaLogController", "line" => 35 }, "level" => "INFO" } { "logger" => "com.paw.kafka.elk.controller.KafkaLogController", "@version" => 1, "Thread" => "http-niO-8080-exec-7 ", "levelVal" => 10000, "@timestamp" => 2021-08-01t07:10:27.273z, "appName" => "paw-kelk", "message" => "debug time: 23", "env" => "dev", "caller" => { "file" => "KafkaLogController.java", "method" => "kafka", "class" => "com.paw.kafka.elk.controller.KafkaLogController", "line" => 36 }, "level" => "DEBUG" }Copy the code

At this point, the kafka log writing to logStash is complete. Logsstash serves as a consumer of the Kafka log topic. Kafka sends logs to LogStash. Logtash finishes the log data as an input stream and outputs the log data to the output stream such as ElasticSearch after filter processing.