When importing data into Elasticsearch, it is often beneficial to document it with additional information that can later be used to search or view the data. Enrichment is the process of merging data from authoritative sources into a document, which is ingested into Elasticsearch.

For example, this can be augmented with the GeoIP processor, which processes documents containing IP addresses and adds information about the geographic location associated with each IP address. Geolocation rich documentation is useful when importing data because it allows for quick query-time operations, such as querying by location or efficiently displaying information on a map.

While the GeoIP processor is a good example of understanding rich functionality, there are many other situations where custom data may be required to enrich documents. It’s not hard to imagine a scenario where some device records data to Elasticsearch, and the data sent from these devices needs to be fleshed out using master data. This master data can include information such as device location, team ownership of a given device, device type, and so on.

Historically, data enrichment was only available in Logstash, but since the enrich processor was introduced in Elasticsearch 7.5.0, it was possible to enrich Elasticsearch directly without having to configure a separate service/system. If you want to know how this works in Logstash, see my previous article “Logstash: Enriching Our Data with JDBc_Streaming”.

Since master data that is typically used for enrichment is usually created in CSV files, in this blog, we walk through how data in CSV files can be used to enrich data using the Enrich processor running on the ingestion node.

Sample CSV data

You can use Kibana to import the following sample master data in CSV format and then use it to enrich the document when it is absorbed into Elasticsearch. For the example in this blog, we store the master data in a file called test.csv. This data represents the device in the organization list.

test.csv

"Device ID","Device Location","Device Owner","Device Type"
"device1","London","Engineering","Computer"
"device2","Toronto","Consulting","Mouse"
"device3","Winnipeg","Sales","Computer"
"device4","Barcelona","Engineering","Phone"
"device5","Toronto","Consulting","Computer"
"device6","London","Consulting","Computer"
Copy the code

Note that the CSV Data should not contain any additional Spaces, as the current version of Data Visualizer requires that the Data be accurately formatted. This is documented in this Github question.

Import the CSV data into Elasticsearch

We can import the data directly using Kibana. Open Kibana:

Click the Import a CSV, NDJSON, or log file link above:

Click Select or drag and drop a file, then Select the test.csv file we created earlier:

Click the Import button:

We’ll call the imported index master_datA_from_csv. Click the Import button:

This completes our master_datA_froM_CSV index creation. We can view the imported data by selecting any of the four options at the bottom of the screen above.

Leverage our master data to enrich documents

In this section, we demonstrate how to use Enrich Processor to merge master data into documents in the input data stream. On enrich Processor, I wrote another article detailing “Elasticsearch: Enrich Processor (new in release 7.5)”.

The first step is to create a rich policy that defines which fields we will use to match master data with documents in the input data stream. Here is a sample policy that works for our data:

PUT /_enrich/policy/enrich-devices-policy
{
  "match": {
    "indices": "master_data_from_csv",
    "match_field": "Device ID",
    "enrich_fields": [
      "Device Location",
      "Device Owner",
      "Device Type"
    ]
  }
}
Copy the code

Run the above policy. We then create an enrich index for the policy using the Execute Enrich Policy API:

PUT /_enrich/policy/enrich-devices-policy/_execute
Copy the code

Next, we create an Ingest pipeline that uses our rich strategy.

PUT /_ingest/pipeline/device_lookup { "description": "Enrich device information", "processors": [ { "enrich": { "policy_name": "enrich-devices-policy", "field": "device_id", "target_field": "my_enriched_data", "max_matches": "1"}}]}Copy the code

We insert a document and let it use the ingest pipeline defined above, as follows:

PUT /device_index/_doc/1? pipeline=device_lookup { "device_id": "device1", "other_field": "some value" }Copy the code

We can use the GET API to view the imported document as follows:

GET device_index/_doc/1
Copy the code
{
  "_index" : "device_index",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 1,
  "_seq_no" : 0,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "my_enriched_data" : {
      "Device Location" : "London",
      "Device Owner" : "Engineering",
      "Device ID" : "device1",
      "Device Type" : "Computer"
    },
    "device_id" : "device1",
    "other_field" : "some value"
  }
}
Copy the code

Above, we can see that in the returned document information, there is an additional field called my_enriched_data. It contains the Device Location, Device Owner, Device ID, and Device Type. This information comes from the test.csv file we imported earlier. The enrich processor gets this information from the master_datA_from_CSV index by associating device_ID with device1. In other words, we have more data, which is what we were talking about before.

 

Specify pipeline in index Settings

Above, we invoke the Enrich Processor with the pipeline specified at import time, but in real application scenarios, we prefer to set this configuration to index rather than specify a specific pipeline in the requested URL. We can do this by adding index.default_pipeline to the index configuration.

PUT device_index/_settings
{
  "index.default_pipeline": "device_lookup"
}
Copy the code

All documents sent to device_index will now go through the device_lookup pipe without having to use Pipeline =device_lookup in the URL. We can verify that it works using the following PUT command.

PUT /device_index/_doc/2
{
  "device_id": "device2",
  "other_field": "some value"
}
Copy the code

Execute the following command to view the document we just extracted:

GET device_index/_doc/2
Copy the code

Then you can see the following document:

{
  "_index" : "device_index",
  "_type" : "_doc",
  "_id" : "2",
  "_version" : 1,
  "_seq_no" : 1,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "my_enriched_data" : {
      "Device Location" : "Toronto",
      "Device Owner" : "Consulting",
      "Device ID" : "device2",
      "Device Type" : "Mouse"
    },
    "device_id" : "device2",
    "other_field" : "some value"
  }
}
Copy the code

conclusion

You usually want to enrich the documents at import time to ensure that the documents in Elasticsearch contain the information needed to search or view them. In this blog, we demonstrated how the Enrich processor running on the ingest node can be augmented with CSV data, which is useful for merging master data into documents while absorbing master data into Elasticsearch.