For Logstash data enrichment, in addition to the GeoIP filter we introduced earlier, I also introduced “using JDBC_Streaming to enrich our data”. In today’s article, we show how to use Elastcsearch filters to enrich our data.

Search for the last log event in Elasticsearch and copy some of its fields into the current event. Here are two complete examples of how to use this filter.

The first example uses traditional query parameters, where the user is limited to Elasticsearch Query_string. Whenever the LogStash receives an “end” event, it uses this Elasticsearch filter to find a matching “start” event based on some action identifier. It then copies the @TIMESTAMP field from the “start” event to the new field of the “end” event. Finally, using a combination of the “date” filter and the “Ruby” filter, we can calculate the duration (in hours) between two events.

 

Hands-on practice

Open Kibana, and then we use the following command to create an index called enrich_index:

PUT enrich_index
{
  "mappings": {
    "properties": {
      "@timestamp": {
        "type": "date"
      },
      "operation": {
        "type": "keyword"
      },
      "type": {
        "type": "keyword"
      },
      "status": {
        "type": "text"
      }
    }
  }
}
Copy the code

Above, we create enrich_index mapping. Use the following command to create a document:

PUT enrich_index/_doc/1 {"@ TIMESTAMP ": "2020-06-16T9:23:40.423707z ", "type": "start", "operation": "1", "status": "OK"} PUT enrich_index/_doc/2 {"@ TIMESTAMP ": "2020-06-16T10:23:40.423707z ", "type": "start", "operation": "1", "status": "Bad" }Copy the code

In this document, we see that type is “start” and its operation is “1”. The above @timestamp and status data are the data we want to enrich in Logstash. Here, I have consciously entered two documents, and their type and operation are the same.

We create the following Logstash configuration file:

logstash_enrich.conf

input {
  generator {
    message => "type=end&opid=1"
    count => 1
  }
}
 
filter {
    kv {
        source => "message"
        field_split => "&?"
	}

    # remove message field
    mutate {
        remove_field => ["message"]
    }    

    if [type] == "end" {
        elasticsearch {
            hosts => ["localhost:9200"]
            index => "enrich_index"
            query_template => "template.json"
            fields => { 
                "@timestamp" => "started"
                "status" => "status"
            }
        }
    }

    date {
        match => ["[started]", "ISO8601"]
        target => "[started]"
    }

    ruby {
        code => "event.set('duration_hrs', (event.get('@timestamp') - event.get('started')) / 3600)"
    }    
}
 
output {
	stdout { 
		codec => rubydebug 
	}
}
Copy the code

In the input section above, we used a generator to produce a message. Then in the filter part, kv filter is used to extract the key and value pairs. Then, we use a condition that if type is end, then we use the ElasticSearch filter to enrich the data. In order for it to work properly, we must define the following files:

template.json

{
  "size": 1,
  "sort" : [ { "@timestamp" : "desc" } ],
  "query": {
    "query_string": {
      "query": "type:start AND operation:%{[opid]}"
    }
  },
  "_source": ["@timestamp", "status"]
}
Copy the code

Above, we perform the query with query_string. Above, we just want a return (size = 1), in descending order by @timestamp. That is, if there are multiple documents that meet the requirement, only the most recent at @timestamp will be returned.

We start the Logstash with the following command:

sudo ./bin/logstash -f logstash_enrich.conf 
Copy the code

The results are as follows:

From the above result, we can see that the type and status of the document with status as bad are copied over. The reason for this is that the @timestamp of this document is relatively recent, and we only define one document in template.json.

 

conclusion

In Logstash, if we want to use database for data enrichment, besides using jdbc_steaming, we can also use Elasticsearch filter for data enrichment.

 

Reference:

【 1 】 www.elastic.co/guide/en/lo…