The Beats framework guarantees at least one delivery to ensure that no data is lost when events are sent to validation-enabled outputs such as Elasticsearch, Logstash, Kafka, and Redis. It would be great if everything went according to plan. However, if Filebeat is shut down during processing or disconnects before the event is confirmed, you may end up with duplicate data. So how do we avoid repeating imported data?
What causes duplicate items in Elasticsearch?
When the output is blocked, the retry mechanism in Filebeat attempts to resend events until the output confirms them. If the output receives an event but cannot confirm the event, the data may be sent to the output multiple times. Since the document ID is usually set by Elasticsearch after it receives data from Beats, duplicate events are indexed as new documents.
The sample
Let’s import the following sample. JSON file as in the previous article “Beats: Using Filebeat to import JSON log files” :
sample.json
{ "id": "1", "user_name": "arthur","verified": false, "event": "logged_in"}
{ "id": "2", "user_name": "arthur", "verified": true, "event": "changed_state"}
Copy the code
In the above document, we assume that the “ID” field is unique, meaning that no document has the same “ID” value as any other document. There are only two documents here, though, and that alone should suffice. Import Elasticsearch as shown in the Beats: Import JSON log files with Filebeat article:
./filebeat -e -c filebeat.yml
Copy the code
After the document is imported, you can run the following command to view the imported document:
GET logs_json/_search
Copy the code
Or how many documents to view with the following command:
GET logs_json/_count
Copy the code
The command above shows:
{
"count" : 2,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
}
}
Copy the code
It has two documents in the logs_JSON index.
Under normal circumstances, Filebeat framework can guarantee at least one delivery, but if Filebeat is broken without receiving confirmation from Elasticsearch, then the next time Filebeat is started, It repeats documents that have already been sent. Or if the message is not received correctly for some reason during transmission, FileBeat will resend the sent document within the specified time. Elasticsearch automatically assigns unique _id values to documents that are sent repeatedly. We can verify this through the following experiments. First we install fileBeat in the Registry directory.
The Registry file for Filebeat stores the state and location information that Filebeat used to track the last location read.
data/registry
for.tar.gz
and.tgz
Archive Installation/var/lib/filebeat/registry
For DEB and RPM installation packagesc:\ProgramData\filebeat\registry
For Windows ZIP files
For my local installation:
$PWD/Users/liuxg/elastic1 / filebeat - 7.11.0 - Darwin - x86_64 $ls LICENSE. TXT filebeat module NOTICE. TXT filebeat.reference.yml modules.d README.md filebeat.yml sample.json data filebeat1.yml fields.yml kibana $ rm -rf data/registry/Copy the code
In Registry it holds all the information that has ever been imported into a file. We delete the folder above to indicate that the previous records do not exist. For the files that have been imported, we perform the import action again and the previous documents will be imported again. This simulates the loss of confirmation information. Restart FileBeat to import the file you just imported:
./filebeat -e -c filebeat.yml
Copy the code
Then we get the number of imported documents again in Kibana:
GET logs_json/_count
Copy the code
{
"count" : 4,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
}
}
Copy the code
From the above values, we can see that the document format has changed from 2 to 4, which means that the previously imported document has also been re-imported. So how do we solve this problem? The desired outcome is to re-import the document regardless of whether the validation information is missing, and we do not want the same document to appear.
In the next experiment, we delete the imported index logs_json in Kibana:
DELETE logs_json
Copy the code
How to avoid duplication?
Instead of allowing Elasticsearch to set the document ID, set the ID in Beats. This ID is stored in the Beats@metadata._id field and is used to set the document ID during indexing. This way, if Beats sends the same event to Elasticsearch multiple times, Elasticsearch overwrites the existing document instead of creating a new one. The @metadata._id field is passed with the event, so you can use it to set the document ID after Filebeat publishes the event, and then before Elasticsearch receives the event. For example, see the Logstash pipe example. There are several ways to set a document ID in Beats:
Add_id Processor:
If your data does not have a natural key field and you cannot derive a unique key from an existing field, use the add_id handler. This example generates a unique ID for each event and adds it to the @metadata._id field:
filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /Users/liuxg/data/processors/sample.json
processors:
- decode_json_fields:
fields: ['message']
target: ''
overwrite_keys: true
- drop_fields:
fields: ["message", "ecs", "agent", "log"]
- add_id: ~
setup.template.enabled: false
setup.ilm.enabled: false
output.elasticsearch:
hosts: ["localhost:9200"]
index: "logs_json"
bulk_max_size: 1000
Copy the code
We delete the Registry directory and then execute the following command:
./filebeat -e -c filebeat.yml
Copy the code
Above, we set a unique ID for our document using the processor add_id. The final document _id in Elasticsearch is generated by Filebeat. If we delete Registry again and re-import the document, there will be duplicate documents in Elasticsearch, although we can set the final document’s _id in this way.
For the following exercise, we remove the logs_JSON index in Kibana, along with the Registry directory.
Fingerprint processor
This is the same method THAT I introduced earlier in Logstash. Read more about Logstash: Using fingerprint Filters to handle duplicate documents. Use the Fingerprint processor to derive unique keys from one or more existing fields. This example uses the values of id and user_name to derive the unique key and add it to the @metadata._id field:
filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /Users/liuxg/data/processors/sample.json
processors:
- decode_json_fields:
fields: ['message']
target: ''
overwrite_keys: true
- drop_fields:
fields: ["message", "ecs", "agent", "log"]
- fingerprint:
fields: ["id", "user_name"]
target_field: "@metadata._id"
setup.template.enabled: false
setup.ilm.enabled: false
output.elasticsearch:
hosts: ["localhost:9200"]
index: "logs_json"
bulk_max_size: 1000
Copy the code
Rerun fileBeat and import data into Elasticsearch:
./filebeat -e -c filebeat.yml
Copy the code
We query the number of documents with the following command:
GET logs_json/_count
Copy the code
{
"count" : 2,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
}
}
Copy the code
Next, we delete the Registry directory again, but leave the logs_JSON index in Elasticsearch. Let’s run fileBeat again to import the sample.json file.
./filebeat -e -c filebeat.yml
Copy the code
After running, we re-check the number of documents in Kibana:
GET logs_json/_count
Copy the code
{
"count" : 2,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
}
}
Copy the code
We can see that the document format is still 2 this time, which means that even though the document was re-imported, because of fingerprint, it generates a unique @metadata_id, so when the same document is imported into Elasticsearch, it doesn’t generate a new document, it updates the previous document, So no new documents are generated.
Encode_json_fields processor
When decoding JSON strings that contain natural key fields, use the document_ID setting in the encode_jSON_fields processor.
For our sample.json file, we assume that the “ID” field is a unique value. This example takes the value of id from a JSON string and stores it in the @metadata._id field:
filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- sample.json
processors:
- decode_json_fields:
fields: ['message']
target: ''
overwrite_keys: true
- drop_fields:
fields: ["message", "ecs", "agent", "log"]
- decode_json_fields:
document_id: "id"
fields: ["message"]
max_depth: 1
target: ""
setup.template.enabled: false
setup.ilm.enabled: false
output.elasticsearch:
hosts: ["localhost:9200"]
index: "logs_json"
bulk_max_size: 1000
Copy the code
Again, before we experiment, delete the Registry directory, delete the logs_JSON index, and start FileBeat again to import data. We get the number of indexed documents by using the following command:
GET logs_json/_count
Copy the code
{
"count" : 2,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
}
}
Copy the code
We delete the Registry directory again and import the data at the same time. Instead of dropping the logs_json index this time, let’s check again to see if the number of documents has increased:
{
"count" : 4,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
}
}
Copy the code
For some reason this method may be buggy and cannot specify the _id of the document.
JSON input Settings
If you want to extract jSON-formatted data that has a natural key field, enter the Settings using json.document_id.
This example takes the value of id from a JSON document and stores it in the @metadata._id field:
sample.json
{"id": "1", "user_name": "arthur", "verified": false, "evt": "logged_in"}
{"id": "2", "user_name": "arthur", "verified": true, "evt": "changed_state"}
Copy the code
filebeat.yml
filebeat.inputs:
- type: log
enabled: true
tags: ["i", "love", "json"]
json.message_key: evt
json.keys_under_root: true
json.add_error_key: true
json.document_id: "id"
fields:
planet: liuxg
paths:
- sample.json
output.elasticsearch:
hosts: ["localhost:9200"]
index: "json_logs1"
setup.ilm.enabled: false
setup.template.name: json_logs1
setup.template.pattern: json_logs1
Copy the code
Select * from json_logs1; select * from json_logs1;
{
"count" : 2,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
}
}
Copy the code
Delete the Registry directory and restart FileBeat to import data:
{
"count" : 2,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
}
}
Copy the code
GET json_logs1/_search
Copy the code
{ "took" : 1, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : {" total ": {" value" : 2, the "base" : "eq"}, "max_score" : 1.0, "hits" : [{" _index ":" json_logs1 ", "_type" : "_doc", "_id" : "1", "_score" : 1.0, "_source" : {" @ timestamp ":" the 2021-02-24 T02:24:56. 084 z ", "evt" : "logged_in", "input" : { "type" : "log" }, "fields" : { "planet" : "liuxg" }, "agent" : { "id" : "E2b7365d-8953-453c-87b5-7e8a65a5bc07 ", "name" : "liuxg", "type" :" fileBeat ", "version" : "7.11.0", "hostname" : Liuxg ", "ephemeral_id" : "7b309AC8-48D1-46D4-839F-70948Fddd428"}, "ECS" : {"version" : "1.6.0"}, "host" : { "name" : "liuxg" }, "log" : { "offset" : 0, "file" : { "path" : "/ Users/liuxg/elastic1 / filebeat - 7.11.0 - Darwin - x86_64 / sample. The json"}}, "user_name" : "Arthur," "verified" : false, "tags" : [ "i", "love", "json" ] } }, { "_index" : "json_logs1", "_type" : "_doc", "_id" : "2", "_score" : 1.0, the "_source" : {" @ timestamp ":" the 2021-02-24 T02:24:56. 084 z ", "fields" : {" planet ":" liuxg} ", "input" : {" type ": "log" }, "host" : { "name" : "liuxg" }, "agent" : { "ephemeral_id" : "7b309ac8-48d1-46d4-839f-70948fddd428", "id" : "E2b7365d-8953-453c-87b5-7e8a65a5bc07 ", "name" : "liuxg", "type" :" fileBeat ", "version" : "7.11.0", "hostname" : "liuxg" }, "evt" : "changed_state", "user_name" : "arthur", "verified" : true, "log" : { "file" : { "path" : "/ Users/liuxg/elastic1 / filebeat - 7.11.0 - Darwin - x86_64 / sample. The json"}, "offset" : 74}, "tags" : [" I ", "love", "json"], "ecs" : {" version ":" 1.6.0 "}}}}}]Copy the code
The above shows that this method works. Again, two documents. You can use this method to set the document ID of Elasticsearch.