In this article today, the Logstash focus on data conversion, analysis, extraction and core operation of convenient content. First, hopefully you’ve already set up a Logstash as I did in my previous article “How to Install a Logstash in an Elastic stack.”
Logstash data source
We know that Logstash can be used in many applications. It has various data sources, such as:
The data is rich and colorful. In order for the data to end up in Elasticsearch for analysis, we have to do a lot of processing on the data source to form useful information.
Logstash plugins
When you run the Logstash instance, in addition to starting the configured pipe, it also starts the Logstash monitoring API endpoint on port 9600. Note that the Logstash monitoring API is only available in Logstash 5.0+ and later. We can view all our installed plugins in our browser at the following address:
http://localhost:9600/_node/plugins?pretty
Copy the code
Logstash is a very easy framework to scale. It can analyze and process all kinds of data. This relies on the more than 200 plugins currently available. First, let’s take a look at what plugins are currently available:
Input plugins:
We first go to the bin subdirectory under the Logstash installation directory and type the following command on the command line:
$ ./logstash-plugin list --group input
Copy the code
Display:
logstash-input-azure_event_hubs
logstash-input-beats
logstash-input-couchdb_changes
logstash-input-elasticsearch
logstash-input-exec
logstash-input-file
logstash-input-ganglia
logstash-input-gelf
logstash-input-generator
logstash-input-graphite
logstash-input-heartbeat
logstash-input-http
logstash-input-http_poller
logstash-input-imap
logstash-input-jdbc
logstash-input-jms
logstash-input-kafka
logstash-input-pipe
logstash-input-rabbitmq
logstash-input-redis
logstash-input-s3
logstash-input-snmp
logstash-input-snmptrap
logstash-input-sqs
logstash-input-stdin
logstash-input-syslog
logstash-input-tcp
logstash-input-twitter
logstash-input-udp
logstash-input-unix
Copy the code
Filter plugs:
Type the following command on the command line:
$ ./logstash-plugin list --group filter
Copy the code
logstash-filter-aggregate
logstash-filter-anonymize
logstash-filter-cidr
logstash-filter-clone
logstash-filter-csv
logstash-filter-date
logstash-filter-de_dot
logstash-filter-dissect
logstash-filter-dns
logstash-filter-drop
logstash-filter-elasticsearch
logstash-filter-fingerprint
logstash-filter-geoip
logstash-filter-grok
logstash-filter-http
logstash-filter-jdbc_static
logstash-filter-jdbc_streaming
logstash-filter-json
logstash-filter-kv
logstash-filter-memcached
logstash-filter-metrics
logstash-filter-mutate
logstash-filter-prune
logstash-filter-ruby
logstash-filter-sleep
logstash-filter-split
logstash-filter-syslog_pri
logstash-filter-throttle
logstash-filter-translate
logstash-filter-truncate
logstash-filter-urldecode
logstash-filter-useragent
logstash-filter-uuid
logstash-filter-xml
Copy the code
Output plugins:
Type the following command on the command line:
$ ./logstash-plugin list --group output
Copy the code
logstash-output-cloudwatch
logstash-output-csv
logstash-output-elastic_app_search
logstash-output-elasticsearch
logstash-output-email
logstash-output-file
logstash-output-graphite
logstash-output-http
logstash-output-lumberjack
logstash-output-nagios
logstash-output-null
logstash-output-pipe
logstash-output-rabbitmq
logstash-output-redis
logstash-output-s3
logstash-output-sns
logstash-output-sqs
logstash-output-stdout
logstash-output-tcp
logstash-output-udp
logstash-output-webhdfs
Copy the code
Codec plugins:
Type the following command on the command line:
$ ./logstash-plugin list codec
Copy the code
logstash-codec-avro
logstash-codec-cef
logstash-codec-collectd
logstash-codec-dots
logstash-codec-edn
logstash-codec-edn_lines
logstash-codec-es_bulk
logstash-codec-fluent
logstash-codec-graphite
logstash-codec-json
logstash-codec-json_lines
logstash-codec-line
logstash-codec-msgpack
logstash-codec-multiline
logstash-codec-netflow
logstash-codec-plain
logstash-codec-rubydebug
Copy the code
It shows that all the plugins have been configured for us after we installed the Logstash. We can develop our own plugin and install it ourselves. We can also install a plugin that someone else has already developed.
As can be seen from above, since file is in input and output, we can even do the following configuration:
Input {file {path => "C:/Program Files/Apache Software Foundation/Tomcat 7.0/logs/*access*" type => "Apache"}} output { file { path => "C:/tpwork/logstash/bin/log/output.log" } }Copy the code
So we read the input file into the Logstash file, and after processing it, we get output like this:
0:0:0:0:0:0:0:1 - - [25/Dec/2016:18:37:00 +0800] "GET/HTTP/1.1" 200 11418Copy the code
{"path":"C:/Program Files/Apache Software Foundation/Tomcat 7.0/logs/ localhost_access_log.2016-12-25.txt", "@ timestamp" : "the 2016-12-25 T10:37:00. 363 z", "@ version" : "1", "the host" : "Dell - PC," "Message" : "0:0:0:0:0:0:1-0 - [25 / Dec / 2016:18:37:00 + 0800] \" GET/HTTP / 1.1 \ "200\11418 r", "type" : "apache", "tags" : []}Copy the code
Install the plugins
In the standard Logstash, there are many plugins already installed, but in some cases, we need to manually install the required plugins, such as the Exec Output Plugin. We can start with the following command in the bin directory:
./bin/logstash-plugin install logstash-output-exec
Copy the code
To check if the plugin has been successfully installed, use the following command:
./bin/logstash-plugin list --group output | grep exec
Copy the code
$ ./bin/logstash-plugin list --group output | grep exec Java HotSpot(TM) 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in Version 9.0 and will likely be removed in a future release. WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.bouncycastle.jcajce.provider.drbg.DRBG (file: / Users/liuxg/elastic/logstash - 7.4.0 / vendor/app/lib/ruby/stdlib/org/bouncycastle/bcprov - jdk15on / 1.61 / bcprov - jdk15 On - 1.61. Jar) to the constructor sun. Security. The provider. The sun () WARNING: Please consider reporting this to the maintainers of org.bouncycastle.jcajce.provider.drbg.DRBG WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release logstash-output-execCopy the code
Reading log files
Logstash is easy to set up to read a log file. For example, we can read an Apache log file as follows:
input {
file {
type => "apache"
path => "/Users/liuxg/data/apache_logs"
start_position => "beginning"
sincedb_path => "null"
}
}
output {
stdout {
codec => rubydebug
}
}
Copy the code
We can even read multiple files:
# Pull in application-log data. They emit data in JSON form.
input {
file {
path => [
"/var/log/app/worker_info.log",
"/var/log/app/broker_info.log",
"/var/log/app/supervisor.log"
]
exclude => "*.gz"
type => "applog"
codec => "json"
}
}
Copy the code
Serialization of data
We can serialize our data using the provided Codec, for example:
// Deserialize newline separated JSON file {path => "/some/sample.log", codec => json } } output { // Serialize to the msgpack format redis { codec => msgpack } stdout { codec => rubydebug } }Copy the code
With our Longstash up and running, we can add content to the sample.json file ina terminal with the following command:
$ echo '{"name2", "liuxg2"}' >> ./sample.log
Copy the code
We can see the following output:
{" @ version "= >" 1 ", "message" = > "{\" name2 \ ", \ "liuxg2 \"} ", "@ timestamp" = > 2019-09-12 T07:37:56. 639 z, "host" => "localhost", "tags" => [ [0] "_jsonparsefailure" ], "path" => "/Users/liuxg/data/sample.log" }Copy the code
The most commonly used codec
1) Line uses the data in “Message” to convert each row into a Logstash event. You can also format the output as a custom line.
2) Multiline: allows you to form arbitrary boundaries for “message”. Often used for stackfunctions, etc. This can also be done in FileBeat.
3) JSON_lines: Parse JSON data separated by newlines
4) JSON: Parse all JSON. Only for message-oriented input/output, such as Redis/Kafka/HTTP, etc
There are many other codecs.
Analysis and extraction
Grok Filter
filter {
grok {
match => [
"message", "%{TIMESTAMP_ISO8601:timestamp_string}%{SPACE}%{GREEDYDATA:line}"
]
}
}
Copy the code
The above example makes it easy to turn the following log information into an organizational data:
2019-09-09T13:00:00Z Whose woods these are I think I know.
Copy the code
More grok patterns can be found at the address grok Pattern.
Date filter
filter {
date {
match => ["timestamp_string", "ISO8601"]
}
}
Copy the code
The Date filter can help us convert a string to a desired format and assign that value to the @TIMESTAMP field.
Dissect filter
Is a faster, lighter and smaller Grok:
The filter {4} {mapping = > {" message "= >" % {id} {server} {function - >} % % "}}}Copy the code
The format of the field and delimiter modes is similar to Grok.
Example:
Input {generator {message = > "< 1 > Oct 16 20:21:22 www1 1201 6/10/16 20:21:20, 3 -- kyoui, SCAN, 6201 6/10/16 20:21:20, 8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45 , 46,47,48,49,50,51,52,53,54 "count = > filter {1}} if [message] = ~" -- kyoui, "{4 {mapping = > {message = > "<%{priority}>%{syslog_timestamp} %{+syslog_timestamp} %{+syslog_timestamp} %{logsource} %{pan_fut_use_01},%{pan_rec_time},%{pan_serial_number},%{pan_type},%{pan_subtype},%{pan_fut_use_02},%{pan_gen_time},%{pa n_src_ip},%{pan_dst_ip},%{pan_nat_src_ip},%{pan_nat_dst_ip},%{pan_rule_name},%{pan_src_user},%{pan_dst_user},%{pan_app}, %{pan_vsys},%{pan_src_zone},%{pan_dst_zone},%{pan_ingress_intf},%{pan_egress_intf},%{pan_log_fwd_profile},%{pan_fut_use_ 03},%{pan_session_id},%{pan_repeat_cnt},%{pan_src_port},%{pan_dst_port},%{pan_nat_src_port},%{pan_nat_dst_port},%{pan_fl ags},%{pan_prot},%{pan_action},%{pan_misc},%{pan_threat_id},%{pan_cat},%{pan_severity},%{pan_direction},%{pan_seq_number },%{pan_action_flags},%{pan_src_location},%{pan_dst_location},%{pan_content_type},%{pan_pcap_id},%{pan_filedigest},%{pan _cloud},%{pan_user_agent},%{pan_file_type},%{pan_xff},%{pan_referer},%{pan_sender},%{pan_subject},%{pan_recipient},%{pan _report_id},%{pan_anymore}" } } } } output { stdout { codec => rubydebug } }Copy the code
After the operation:
{" @ timestamp "= > 2019-09-12 T09: who fell. The 514 z," pan_dst_ip "= >" 9 ", "pan_nat_src_ip" = > "10", "the sequence" = > 0, "logsource" => "www1", "pan_session_id" => "23", "pan_vsys" => "16", "pan_cat" => "34", "pan_rule_name" => "12", "pan_gen_time" => "2016/10/16 20:21:20", "pan_seq_number" => "37", "pan_subject" => "50", .... "The message" = > "< 1 > Oct 16 20:21:22 www1 1201 6/10/16 20:21:20, 3, -- kyoui, SCAN, 6201 6/10/16 20:21:20, 8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45 , 46,47,48,49,50,51,52,53,54 pan_fut_use_02 ", "" = >" 6 ", "pan_flags" = > ", "" 29 syslog_timestamp" = > "Oct 16 20:21:22", 53 "pan_anymore" = > ", "}Copy the code
For more information, please visit the address.
KV filter
An easy way to parse data in key/value pairs
Filter {kv {source => "message" target => "parsed" value_split => ":"}}Copy the code
Let’s run a conf file like this:
input { generator { message => "pin=12345~0&d=123&[email protected]&oq=bobo&ss=12345" count => 1 } } filter { kv { source => "message" target => "parsed" field_split => "&?" } } output { stdout { codec => rubydebug } }Copy the code
The results are as follows:
{" @ timestamp "= > 2019-09-12 T09:46:04. 944 z," the host "= >" localhost ", "parsed" = > {" ss "= >" 12345 ", "e" = > "[email protected]." "pin" => "12345~0", "oq" => "bobo", "d" => "123" }, "message" => "pin=12345~0&d=123&[email protected]&oq=bobo&ss=12345", "sequence" => 0, "@version" => "1" }Copy the code
For KV Flter, we can also use a target to organize information into an object, such as:
Filter {kv {source => "message" target => "parsed" value_split => ":"}}Copy the code
Core operations
Mutate filter
The filter provides a number of features:
- Converting field types (from string to integer, etc.)
- Add/rename/replace/copy fields
- Uppercase/lowercase conversion
- Concatenate arrays together (useful for Array => String operations)
- Merge hash
- Split fields into arrays
- Strip whitespace
input { generator { message => "pin=12345~0&d=123&[email protected]&oq=bobo&ss=12345" count => 1 } } filter { kv { source => "message" field_split => "&?" } if [pin] == "12345~0" { mutate { add_tag => [ 'metrics' ] } mutate { split => ["message", "&"] add_field => {"foo" => "bar-%{pin}"} } } } output { stdout { codec => rubydebug } if "metrics" in [tags] { stdout { codec => line { format => "custom format: %{message}" } } } }Copy the code
The results are as follows:
{ "foo" => "bar-12345~0", "e" => "[email protected]", "sequence" => 0, "message" => [ [0] "pin=12345~0", [1] "d=123", [2] "[email protected]", [3] "oq=bobo", [4] "ss=12345" ], "pin" => "12345~0", "d" => "123", "host" => "localhost", "Ss" = > "12345", "@ timestamp" = > 2019-09-14 T15:03:15. 141 z, "oq" = > "bobo", "@ version" = > "1", "tags" => [ [0] "metrics" ] } custom format: pin=12345~0,d=123,[email protected],oq=bobo,ss=12345Copy the code
Core transformed filters
- Mute – Modify/add each item
- Split – Converts one event to multiple events
- Drop – To Drop an event
Conditional logic
- if/else
- You can use regexps with =~
- You can check a member in an array
Filter {mutate {lowercase => "account"} if [type] == "Batch" {split {field => Actions target => action}} if { Action =~ /special/} {drop {}}}Copy the code
GeoIP
GeoIP filter rich IP address information:
Filter {geoip {fields => "my_geoIP_field"}}Copy the code
Run the following configuration:
Input {generator {message => "83.149.9.216" count => 1}} filter {grok {match => {"message" => '%{IPORHOST:clientip}' } } geoip { source => "clientip" } } output { stdout { codec => rubydebug } }Copy the code
The following information is displayed:
{" host "= >" localhost ", "@ version" = > "1", "clientip" = > "83.149.9.216", "message" = > "83.149.9.216." "@timestamp" => 2019-09-15T06:54:46.695z, "sequence" => 0, "geoip" => {"timezone" => "Europe/Moscow", Region_code = > "MOW," "latitude" = > 55.7527, "country_code3" = > "RU", "continent_code" = > "EU", "Longitude" = > 37.6172, "country_name" = > "Russia", "location" = > {" lat "= > 55.7527," says lon "= > 37.6172}, "IP" = > "83.149.9.216", "postal_code" = > "102325", "country_code2" = > "RU", "region_name" = > "Moscow", "city_name" => "Moscow" } }Copy the code
We can see that under geoIP, there is a lot of concrete information.
DNS filter
Enrich host names with DNS information for more information
Filter {DNS {fields => "my_dns_field"}}Copy the code
We define a Logstash configuration file as follows:
input { generator { message => "www.google.com" count => 1 } } filter { mutate { add_field => { "hostname" => "172.217.160.110"}} DNS {reverse => ["hostname"] action => "replace"}} output {stdout {codec => rubydebug}Copy the code
Here is Google’s address, so its output is:
{"host" => "localhost", "sequence" => 0, "message" => "www.google.com", "@timestamp" => 2019-09-15t11:35:43.791z, "Hostname" = > "tsa03s06-in-f14.1e100.net", "@ version" = > "1"}Copy the code
Here we can see the hostname value.
Useragent filer
Enrich the browser userAgent information. We use the following Logstash configuration:
Input {generator {message => '83.149.9.216 -- [17/May/2015:10:05:50 +0000] "GET / presentations/logstash - monitorama - 2013 / images/kibana - dashboard. HTTP / 1.1 PNG ", 200, 321631 "Http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla / 5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, Like Gecko) Chrome/32.0.1700.77 Safari/537.36"' count => 1}} filter {grok {match => {"message" => '%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response:int} (? :-|%{NUMBER:bytes:int}) %{QS:referrer} %{QS:agent}' } } useragent { source => "agent" target => "useragent" } } output { stdout { codec => rubydebug } }Copy the code
The result is:
{ "request" => "/presentations/logstash-monitorama-2013/images/kibana-dashboard.png", "useragent" => { "name" => "Chrome", "build" => "", "device" => "Other", "os_major" => "10", "os" => "Mac OS X", "minor" => "0", "major" => "32", "os_name" => "Mac OS X", "patch" => "1700", "os_minor" => "9" }, "sequence" => 0, "The message" = > "83.149.9.216 - [17 / May / 2015:10:05:50 + 0000] \" GET / presentations/logstash - monitorama - 2013 / images/kibana - dashboard. HTTP / 1.1 PNG \ ", 200, 321631 \ \ "http://semicomplete.com/presentations/logstash-monitorama-2013/\" "Mozilla / 5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\", "timestamp" => "17/May/2015:10:05:50 +0000", "Referrer" = > "\" http://semicomplete.com/presentations/logstash-monitorama-2013/\ ""," clientip "= >" 83.149.9.216." "ident" => "-", "auth" => "-", "response" => 200, "@version" => "1", "verb" => "GET", "host" => "localhost", "@timestamp" => 2019-09-15t12:03:34.650z, "httpversion" => "1.1", "bytes" => 321631, "Agent" = > "\" Mozilla / 5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\"}Copy the code
We can see more detailed information in userAgent.
Translate Filter
Use local data to enrich the data. We use the following Logstash configuration file:
Input {generator {message => '83.149.9.216 -- [17/May/2015:10:05:50 +0000] "GET / presentations/logstash - monitorama - 2013 / images/kibana - dashboard. HTTP / 1.1 PNG ", 200, 321631 "Http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla / 5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, Like Gecko) Chrome/32.0.1700.77 Safari/537.36"' count => 1}} filter {grok {match => {"message" => '%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response:int} (? :-|%{NUMBER:bytes:int}) %{QS:referrer} %{QS:agent}' } } translate { field => "[response]" destination => "[http_status_description]" dictionary => { "100" => "Continue" "101" => "Switching Protocols" "200" => "OK" "500" => "Server Error" } fallback => "I'm a teapot" } } output { stdout { codec => rubydebug } }Copy the code
The result of the run is:
{ "auth" => "-", "host" => "localhost", "timestamp" => "17/May/2015:10:05:50 +0000", "The message" = > "83.149.9.216 - [17 / May / 2015:10:05:50 + 0000] \" GET / presentations/logstash - monitorama - 2013 / images/kibana - dashboard. HTTP / 1.1 PNG \ ", 200, 321631 \ \ "http://semicomplete.com/presentations/logstash-monitorama-2013/\" "Mozilla / 5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\", "Httpversion" = > "1.1", "@ version" = > "1" and "response" = > 200, "clientip" = > "83.149.9.216", "verb" = > "GET", "sequence" => 0, "referrer" => "\"http://semicomplete.com/presentations/logstash-monitorama-2013/\"", "Agent" = > "\" Mozilla / 5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\", "Http_status_description" => "OK", "ident" => "-", "@timestamp" => 2019-09-15t12:30:09.575z, "bytes" => 321631, "request" => "/presentations/logstash-monitorama-2013/images/kibana-dashboard.png" }Copy the code
We can see an http_status_description entry that changes to “OK”.
Elasticsearch Filter
Get data from index in Elasticsearch and enrich events. To do this test, we’ll create an index called elasticSearch_filter:
PUT / _doc / 1 c {" name ":" liuxg ", "age" : 20, "@ timestamp" : "2019-09-15"}Copy the code
What I must point out here is that we must have an item called @timestamp, otherwise we will get an error. This is used to do sort.
We use the following Logstash configuration:
input {
generator {
message => "liuxg"
count => 1
}
}
filter {
elasticsearch {
hosts => ["http://localhost:9200"]
index => ["elasticsearch_filter"]
query => "name.keyword:%{[message]}"
result_size => 1
fields => {"age" => "user_age"}
}
}
output {
stdout {
codec => rubydebug
}
}
Copy the code
Running the above example shows the result:
{ "user_age" => 20, "host" => "localhost", "message" => "liuxg", "@version" => "1", "@timestamp" => 2019-09-15t13:21:29.742z, "sequence" => 0}Copy the code
We can see that user_age is 20. This is obtained by searching for name:liuxg.
Reference:
【1】Getting started with Logstash