In my previous article “How to handle events using the Pipeline API in Elasticsearch”, I went into detail on how to create and use an Ingest pipeline. Simply put, a pipeline is a definition of a series of processors that execute in the order they are declared. The pipeline contains two main fields: description and processor list:
It is important to note here that Pipelines run on top of ingest nodes. All ingest pipelines are stored in cluster State.
How does Pipeline work
Here is an example of defining a pipeline:
PUT _ingest/pipeline/apache-log
{
"description": "This is an example for apache logs",
"processors": [
{
"grok": {
"field": "message",
"patterns": ["%{COMMONAPACHELOG}"]
}
},
{
"date": {
"field": "timestamp",
"formats": ["dd/MMM/yyyy:HH:mm:ss Z"]
}
},
{
"remove": {
"field": "message"
}
}
]
}
Copy the code
The processors above will be executed in turn. We can use the following example to make the call:
PUT logs/_doc/1? Pipeline =apache-log {"message": "83.149.9.216 - - [17/May/2015:10:05:03 +0000] \"GET/HTTP/1.1\" 200 24"}Copy the code
The output of the above command is:
{ "took" : 20, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : , "hits" : {0} "total" : {" value ": 1, the" base ":" eq "}, "max_score" : 1.0, "hits" : [{" _index ": "Logs", "_type" : "_doc", "_id" : "1", "_score" : 1.0, "_source" : {" request ":"/", "auth" : "-", "the ident" : "-", "verb" : "GET", "@ timestamp" : "the 2015-05-17 T10:05:03. 000 z" and "response" : "200", "bytes" : "24", "clientip" : "" httpversion 83.149.9.216", ":" 1.1 ", "timestamp" : "17 / May / 2015:10:05:03 + 0000"}}}}]Copy the code
To query documents that have been processed by apache-log pipeline, run the following command:
GET logs/_doc/1
Copy the code
The command above will return:
{ "_index" : "logs", "_type" : "_doc", "_id" : "1", "_version" : 2, "_seq_no" : 1, "_primary_term" : 1, "found" : true, "_source" : { "request" : "/", "auth" : "-", "ident" : "-", "verb" : "GET", "@timestamp" : "2015-05-17T10:05:03.000Z", "Response" : "200", "bytes" : "24", "clientip" : "83.149.9.216", "httpversion" : "1.1" and "timestamp", "17 / May / 2015:10:05:03 + 0000"}}Copy the code
As we can see from the above, we successfully structured and enriched our data with a set of processors from the Apache-log pipeline. We use the Grok processor to structure the data; Set @timestamp to the same value as timestamp through the date processor; Remove the message field by removing it.
When designing a pipeline, we rarely let it work directly on our document. More often than not, we want to check the correctness of our pipeline with some test documentation. Otherwise an incorrect pipeline will corrupt our data. We can use _SIMULATE to perform the test. In our case:
POST _ingest/pipeline/apache-log/_simulate { "docs": [ { "_source": { "message": "83.149.9.216 - [17 / May / 2015:10:05:03 + 0000] \" GET/HTTP / 1.1 \ "200 24"}}}]Copy the code
Here Docs can define a variety of possible document types for testing. It’s an array. We can define multiple documents at the same time for testing. The result of the above command is:
{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "request" : "/", "auth" : "-", "the ident" : "-", "verb" : "GET", "@ timestamp" : "the 2015-05-17 T10:05:03. 000 z" and "response" : "200", "bytes" : "24", "clientip" : "83.149.9.216", "httpversion" : "1.1", "timestamp" : "17 / May / 2015:10:05:03 + 0000"}, "_ingest" : {"timestamp" : "2020-11-17t11:09:35.351117z"}}}]}Copy the code
We can see the results of the simulation.
The above document was processed by a group of pipeline processor, and we could not see the processing result of each processor. In this case, we can add verbose to check the processing of each processor:
POST _ingest/pipeline/apache-log/_simulate? verbose { "docs": [ { "_source": { "message": "83.149.9.216 - [17 / May / 2015:10:05:03 + 0000] \" GET/HTTP / 1.1 \ "200 24"}}}]Copy the code
The result of the above is:
{ "docs" : [ { "processor_results" : [ { "processor_type" : "grok", "status" : "success", "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "request" : "/", "auth" : "-", "ident" : "-", "verb" : "GET", "message" : "" "83.149.9.216 - [17 / May / 2015:10:05:03 + 0000] "GET/HTTP / 1.1" 24 "200", "" response" : "200", "bytes" : "24", "clientip" : "83.149.9.216", "httpversion" : "1.1", "timestamp" : "17/May/2015:10:05:03 +0000" }, "_ingest" : { "pipeline" : "apache-log", "timestamp" : ": the 2020-11-17 T11". 039149 z "}}}, {" processor_type ":" date ", "status" : "success", "doc" : {" _index ": "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "request" : "/", "auth" : "-", "ident" : "-", "verb" : "GET", "message" : "" "83.149.9.216 - [17 / May / 2015:10:05:03 + 0000] "GET/HTTP / 1.1" 24 "200", "" @ timestamp" : "2015-05-17T10:05:03.000Z", "Response" : "200", "bytes" : "24", "clientip" : "83.149.9.216", "httpversion" : "1.1", "timestamp" : "17/May/2015:10:05:03 +0000"}, "_ingest" : {"pipeline" : "apache ", "timestamp" : "2020-11-17T11:11:43.039149z"}}}, {"processor_type" : "remove", "status" : "success", "doc" : {"_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "request" : "/", "auth" : "-", "ident" : "-", "verb" : "GET", "@ the timestamp" : "the 2015-05-17 T10:05:03. 000 z" and "response" : "200", "bytes" : "24", "clientip" : "" httpversion 83.149.9.216", ":" 1.1 ", "timestamp" : "17 / May / 2015:10:05:03 + 0000"}, "_ingest" : {" pipeline ": "Apache-log ", "timestamp" :" 2020-11-17t11:11:42.039149z "}}}]}]}Copy the code
The results processed by each processor are recorded in detail above. It made it very easy for me to break down the performance of each processor and troubleshoot our errors.
How to handle pipeline errors
When we use pipeline to process a document, sometimes not all of the document is very standard, so this time there will be a document can not be properly parsed or processed:
When it fails to parse properly, it returns an error message to the client indicating that it was not processed correctly. This is a default action. Alternatively, we can handle our errors with on_failure:
When an error occurs, we can create another set of processors to handle our error. In general, we can use the set Processor to do some related processing on the document. For example, it can help us to record the wrong document information and save it in another index. We can then examine the index and resolve the problem based on the error message. In this case:
In this case, the following processor will not be executed. In this case, the client will no longer receive failure messages. We can store the error information in another index in on_failure. We can define a set of processors to handle this error, such as:
Above, we can use remove and set processor to process the failed document. We can even do additional on_failure processing for this failure processor group. Above, we can use set Process to do some processing. For example, if the current date is wrong, we can set a default date, or use the current date, and then re-queue the document for processing:
This is entirely up to your own business needs to handle.
Below, I will use an example to demonstrate:
GET _ingest/pipeline/_simulate { "pipeline": { "processors": [ { "grok": { "field": "message", "patterns": [ "%{COMMONAPACHELOG}" ] } }, { "date": { "field": "timestamp", "formats": [ "dd/MMM/yyyy:HH:mm:ss Z" ] } }, { "remove": { "field": "message" } } ] }, "docs": [ { "_source": { "message": "" "83.149.9.216 - [17 / May / 2015:10:05:03 + 0000] "GET/HTTP / 1.1" 24 "200" "}, "_index" : "my_index}]}"Copy the code
Above, we write the document to my_index, although we are just emulating:
{ "docs" : [ { "doc" : { "_index" : "my_index", "_type" : "_doc", "_id" : "_id", "_source" : { "request" : "/", "auth" : "-", "the ident" : "-", "verb" : "GET", "@ timestamp" : "the 2015-05-17 T10:05:03. 000 z" and "response" : "200", "bytes" : "24", "clientip" : "83.149.9.216", "httpversion" : "1.1", "timestamp" : "17 / May / 2015:10:05:03 + 0000"}, "_ingest" : {" timestamp ":" the 2020-11-17 T11:55:43. 679709 z "}}}}]Copy the code
We can see above that bytes is a string processed by grok. We can convert this field to an integer by using the convert processor:
GET _ingest/pipeline/_simulate { "pipeline": { "processors": [ { "grok": { "field": "message", "patterns": [ "%{COMMONAPACHELOG}" ] } }, { "date": { "field": "timestamp", "formats": [ "dd/MMM/yyyy:HH:mm:ss Z" ] } }, { "remove": { "field": "message" } }, { "convert": { "field": "bytes", "type": "integer" } } ] }, "docs": [ { "_source": { "message": "" "83.149.9.216 - [17 / May / 2015:10:05:03 + 0000] "GET/HTTP / 1.1" 24 "200" "}, "_index" : "my_index}]}"Copy the code
So here’s the result:
{ "docs" : [ { "doc" : { "_index" : "my_index", "_type" : "_doc", "_id" : "_id", "_source" : { "request" : "/", "auth" : "-", "the ident" : "-", "verb" : "GET", "@ timestamp" : "the 2015-05-17 T10:05:03. 000 z" and "response" : "200", "bytes" : 24, "clientip" : "83.149.9.216", "httpversion" : "1.1", "timestamp" : "17 / May / 2015:10:05:03 + 0000"}, "_ingest" : {" timestamp ":" the 2020-11-17 T12:01:38. 662559 z "}}}}]Copy the code
From the above, we can see that bytes is now an integer. We could have done the same thing by adding verbose to the interface call to see how each processor is performing. For debugging purposes, we can even add a tag to each processor so that when we use verbose we can easily know which processor it is:
GET _ingest/pipeline/_simulate? verbose { "pipeline": { "processors": [ { "grok": { "field": "message", "patterns": [ "%{COMMONAPACHELOG}" ] } }, { "date": { "field": "timestamp", "formats": [ "dd/MMM/yyyy:HH:mm:ss Z" ] } }, { "remove": { "field": "message" } }, { "convert": { "field": "bytes", "type": "integer" } }, { "convert": { "tag": "convert_reponse", "field": "response", "type": "integer" } } ] }, "docs": [{" _source ": {" message" : "" "83.149.9.216 - [17 / May / 2015:10:05:03 + 0000] "GET/HTTP / 1.1" 24 "200"} ", "_index" : "my_index" } ] }Copy the code
Above, we added a tag called convert_response for the convert processor of Response. This makes it easier for us to search for convert_response, otherwise there are two convert processors that we can’t easily tell apart, even though the execution is sequential.
{ "docs" : [ { "processor_results" : [ { "processor_type" : "grok", "status" : "success", "doc" : { "_index" : "my_index", "_type" : "_doc", "_id" : "_id", "_source" : { "request" : "/", "auth" : "-", "ident" : "-", "verb" : "GET", "message" : "" "83.149.9.216 - [17 / May / 2015:10:05:03 + 0000] "GET/HTTP / 1.1" 24 "200", "" response" : "200", "bytes" : "24", "clientip" : "83.149.9.216", "httpversion" : "1.1", "timestamp" : "17/May/2015:10:05:03 +0000" }, "_ingest" : { "pipeline" : "_simulate_pipeline", "timestamp" : "The 2020-11-17 T12:07:36. 432606 z"}}}, {" processor_type ":" date ", "status" : "success", "doc" : {" _index ": "my_index", "_type" : "_doc", "_id" : "_id", "_source" : { "request" : "/", "auth" : "-", "ident" : "-", "verb" : "GET", "message" : "" "83.149.9.216 - [17 / May / 2015:10:05:03 + 0000] "GET/HTTP / 1.1" 24 "200", "" @ timestamp" : "2015-05-17T10:05:03.000Z", "Response" : "200", "bytes" : "24", "clientip" : "83.149.9.216", "httpversion" : "1.1", "timestamp" : "17/May/2015:10:05:03 +0000"}, "_ingest" : {"pipeline" : "_simulate_pipeline", "timestamp" : "2020-11-17T12:07:36.432606z"}}}, {"processor_type" : "remove", "status" : "success", "doc" : {"_index" : "my_index", "_type" : "_doc", "_id" : "_id", "_source" : { "request" : "/", "auth" : "-", "ident" : "-", "verb" : "GET", "@ the timestamp" : "the 2015-05-17 T10:05:03. 000 z" and "response" : "200", "bytes" : "24", "clientip" : "" httpversion 83.149.9.216", ":" 1.1 ", "timestamp" : "17 / May / 2015:10:05:03 + 0000"}, "_ingest" : {" pipeline ": "_simulate_pipeline", "TIMESTAMP" : "2020-11-17T12:07:36.432606z"}}}, {"processor_type" : "convert", "status" : "success", "doc" : { "_index" : "my_index", "_type" : "_doc", "_id" : "_id", "_source" : { "request" : "/", "auth" : "-", "the ident" : "-", "verb" : "GET", "@ timestamp" : "the 2015-05-17 T10:05:03. 000 z" and "response" : "200", "bytes" : 24, "clientip" : "83.149.9.216", "httpversion" : "1.1", "TIMESTAMP" : "17/May/2015:10:05:03 +0000"}, "_ingest" : {"pipeline" : "_simulate_pipeline", "timestamp" : "2020-11-17T12:07:36.432606z"}}}, {"processor_type" : "convert", "status" : "success", "tag" : "convert_reponse", "doc" : { "_index" : "my_index", "_type" : "_doc", "_id" : "_id", "_source" : { "request" : "/", "auth" : "-", "ident" : "-", "verb" : "GET", "@timestamp" : "2015-05-17T10:05:03.000z "," Response ": 200, "bytes" : 24, "clientip" : "83.149.9.216", "httpversion" : "1.1", "timestamp" : "17/May/2015:10:05:03 +0000"}, "_ingest" : {"pipeline" : "_simulate_pipeline", "timestamp" : "2020-11-17T12:07:36.432606z"}}}]}Copy the code
We can see the tag for convert_response in the output above.
Let’s simulate an incorrect document so that the processor is not parsed correctly. We remove the “5” in 2015 from the document:
GET _ingest/pipeline/_simulate? verbose { "pipeline": { "processors": [ { "grok": { "field": "message", "patterns": [ "%{COMMONAPACHELOG}" ] } }, { "date": { "field": "timestamp", "formats": [ "dd/MMM/yyyy:HH:mm:ss Z" ] } }, { "remove": { "field": "message" } }, { "convert": { "field": "bytes", "type": "integer" } }, { "convert": { "tag": "convert_reponse", "field": "response", "type": "integer" } } ] }, "docs": [{" _source ": {" message" : "" "83.149.9.216 - [17 / May / 201:10:05:03 + 0000] "GET/HTTP / 1.1" 24 "200"} ", "_index" : "my_index" } ] }Copy the code
This obviously results in a document that will not be properly parsed. The error returned is as follows:
{ "docs" : [ { "processor_results" : [ { "processor_type" : "grok", "status" : "error", "error" : { "root_cause" : [ { "type" : "illegal_argument_exception", "reason" : """Provided Grok expressions do not match field value: [83.149.9.216 - [17 / May / 201:10:05:03 + 0000] "GET/HTTP / 1.1 200 24]" "" "}]," type ": "illegal_argument_exception", "reason" : """Provided Grok expressions do not match field value: [83.149.9.216 - [17 / May / 201:10:05:03 + 0000] "GET/HTTP / 1.1 200 24]" "" "}}}}]]Copy the code
It’s easy to spot the problem when we send errors like this. The grok pattern does not match. We can modify the document as follows:
"Message" : "" "83.149.9.216 - [17 / May / 2015:10:05:03 + 200] "GET/HTTP / 1.1 200 24" "" "Copy the code
Above, we change the +0000 in time to +000, that is, a 0 is missing. Let’s run:
GET _ingest/pipeline/_simulate? verbose { "pipeline": { "processors": [ { "grok": { "field": "message", "patterns": [ "%{COMMONAPACHELOG}" ] } }, { "date": { "field": "timestamp", "formats": [ "dd/MMM/yyyy:HH:mm:ss Z" ] } }, { "remove": { "field": "message" } }, { "convert": { "field": "bytes", "type": "integer" } }, { "convert": { "tag": "convert_reponse", "field": "response", "type": "integer" } } ] }, "docs": [{" _source ": {" message" : "" "83.149.9.216 - [17 / May / 2015:10:05:03 + 000] "GET/HTTP / 1.1" 24 "200"} ", "_index" : "my_index" } ] }Copy the code
The command above returns:
{ "docs" : [ { "processor_results" : [ { "processor_type" : "grok", "status" : "success", "doc" : { "_index" : "my_index", "_type" : "_doc", "_id" : "_id", "_source" : { "request" : "/", "auth" : "-", "ident" : "-", "verb" : "GET", "message" : "" "83.149.9.216 - [17 / May / 2015:10:05:03 + 000] "GET/HTTP / 1.1" 24 "200", "" response" : "200", "bytes" : "24", "clientip" : "83.149.9.216", "httpversion" : "1.1", "timestamp" : "17/May/2015:10:05:03 +000" }, "_ingest" : { "pipeline" : "_simulate_pipeline", "timestamp" : "2020-11-17T12:20:43.098763z"}}}, {"processor_type" : "date", "status" : "error", [ { "type" : "illegal_argument_exception", "reason" : "unable to parse date [17/May/2015:10:05:03 +000]" } ], "type" : "illegal_argument_exception", "reason" : "unable to parse date [17/May/2015:10:05:03 +000]", "caused_by" : { "type" : "illegal_argument_exception", "reason" : "failed to parse date field [17/May/2015:10:05:03 +000] with format [dd/MMM/yyyy:HH:mm:ss Z]", "caused_by" : { "type" : "date_time_parse_exception", "reason" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21" } } } } ] } ] }Copy the code
This is obviously different from the previous ones. The Grok pattern parsed our document correctly, but our Date Processor had a problem parsing time.
There are two ways to deal with this problem:
- Pipeline level to handle
- Processor level
Pipeline level to handle
We add an on_failure to the end of the pipeline:
GET _ingest/pipeline/_simulate? verbose { "pipeline": { "processors": [ { "grok": { "field": "message", "patterns": [ "%{COMMONAPACHELOG}" ] } }, { "date": { "field": "timestamp", "formats": [ "dd/MMM/yyyy:HH:mm:ss Z" ] } }, { "remove": { "field": "message" } }, { "convert": { "field": "bytes", "type": "integer" } }, { "convert": { "tag": "convert_reponse", "field": "response", "type": "integer" } } ], "on_failure": [ { "set": { "field": "_index", "value": "failed" } } ] }, "docs": [ { "_source": { "message": "" "83.149.9.216 - [17 / May / 2015:10:05:03 + 000] "GET/HTTP / 1.1" 24 "200" "}, "_index" : "my_index}]}"Copy the code
Above, I added the following code:
"on_failure": [
{
"set": {
"field": "_index",
"value": "failed"
}
}
]
Copy the code
Here, we specify another index called failed. Execute the pipeline above:
{ "docs" : [ { "processor_results" : [ { "processor_type" : "grok", "status" : "success", "doc" : { "_index" : "my_index", "_type" : "_doc", "_id" : "_id", "_source" : { "request" : "/", "auth" : "-", "ident" : "-", "verb" : "GET", "message" : "" "83.149.9.216 - [17 / May / 2015:10:05:03 + 000] "GET/HTTP / 1.1" 24 "200", "" response" : "200", "bytes" : "24", "clientip" : "83.149.9.216", "httpversion" : "1.1", "timestamp" : "17/May/2015:10:05:03 +000" }, "_ingest" : { "pipeline" : "_simulate_pipeline", "timestamp" : {"processor_type" : "date", "status" : "error", "error" : {"root_cause" : [ { "type" : "illegal_argument_exception", "reason" : "unable to parse date [17/May/2015:10:05:03 +000]" } ], "type" : "illegal_argument_exception", "reason" : "unable to parse date [17/May/2015:10:05:03 +000]", "caused_by" : { "type" : "illegal_argument_exception", "reason" : "failed to parse date field [17/May/2015:10:05:03 +000] with format [dd/MMM/yyyy:HH:mm:ss Z]", "caused_by" : { "type" : "date_time_parse_exception", "reason" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21" } } } }, { "processor_type" : "set", "status" : "success", "doc" : { "_index" : "failed", "_type" : "_doc", "_id" : "_id", "_source" : { "request" : "/", "auth" : "-", "ident" : "-", "verb" : "GET", "message" : "" "83.149.9.216 - [17 / May / 2015:10:05:03 + 000] "GET/HTTP / 1.1" 24 "200", "" response" : "200", "bytes" : "24", "clientip" : "83.149.9.216", "httpversion" : "1.1", "timestamp" : "17 / May / 2015:10:05:03 + 000"}, "_ingest" : { "pipeline" : "_simulate_pipeline", "on_failure_message" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21", "on_failure_processor_tag" : null, "timestamp" : "2020-11-17T12:25:50.517958z ", "on_failure_processor_type" : "date"}}}]}Copy the code
Obviously the first step was successful, the second step had an error, and then it executed on_failure and set processor inside it to change the index to failed. You can view it directly in the failed index later.
"_ingest" : {
"pipeline" : "_simulate_pipeline",
"on_failure_message" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21",
"on_failure_processor_tag" : null,
"timestamp" : "2020-11-17T12:25:50.517958Z",
"on_failure_processor_type" : "date"
}
Copy the code
Above it indicates an error message in ingest. We can then log this error message:
GET _ingest/pipeline/_simulate? verbose { "pipeline": { "processors": [ { "grok": { "field": "message", "patterns": [ "%{COMMONAPACHELOG}" ] } }, { "date": { "field": "timestamp", "formats": [ "dd/MMM/yyyy:HH:mm:ss Z" ] } }, { "remove": { "field": "message" } }, { "convert": { "field": "bytes", "type": "integer" } }, { "convert": { "tag": "convert_reponse", "field": "response", "type": "integer" } } ], "on_failure": [ { "set": { "field": "_index", "value": "failed" } }, { "set": { "tag": "mark_failure", "field": "failure", "value": { "message": "{{_ingest.on_failure_message}}" } } } ] }, "docs": [ { "_source": { "message": "" "83.149.9.216 - [17 / May / 2015:10:05:03 + 000] "GET/HTTP / 1.1" 24 "200" "}, "_index" : "my_index}]}"Copy the code
Above we set the Failure field and record an object. Run the pipeline above:
{ "docs" : [ { "processor_results" : [ { "processor_type" : "grok", "status" : "success", "doc" : { "_index" : "my_index", "_type" : "_doc", "_id" : "_id", "_source" : { "request" : "/", "auth" : "-", "ident" : "-", "verb" : "GET", "message" : "" "83.149.9.216 - [17 / May / 2015:10:05:03 + 000] "GET/HTTP / 1.1" 24 "200", "" response" : "200", "bytes" : "24", "clientip" : "83.149.9.216", "httpversion" : "1.1", "timestamp" : "17/May/2015:10:05:03 +000" }, "_ingest" : { "pipeline" : "_simulate_pipeline", "timestamp" : {"processor_type" : "date", "status" : "error", "error" : {"root_cause" : [ { "type" : "illegal_argument_exception", "reason" : "unable to parse date [17/May/2015:10:05:03 +000]" } ], "type" : "illegal_argument_exception", "reason" : "unable to parse date [17/May/2015:10:05:03 +000]", "caused_by" : { "type" : "illegal_argument_exception", "reason" : "failed to parse date field [17/May/2015:10:05:03 +000] with format [dd/MMM/yyyy:HH:mm:ss Z]", "caused_by" : { "type" : "date_time_parse_exception", "reason" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21" } } } }, { "processor_type" : "set", "status" : "success", "doc" : { "_index" : "failed", "_type" : "_doc", "_id" : "_id", "_source" : { "request" : "/", "auth" : "-", "ident" : "-", "verb" : "GET", "message" : "" "83.149.9.216 - [17 / May / 2015:10:05:03 + 000] "GET/HTTP / 1.1" 24 "200", "" response" : "200", "bytes" : "24", "clientip" : "83.149.9.216", "httpversion" : "1.1", "timestamp" : "17 / May / 2015:10:05:03 + 000"}, "_ingest" : { "pipeline" : "_simulate_pipeline", "on_failure_message" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21", "on_failure_processor_tag" : null, "timestamp" : "2020-11-17T12:39:09.206999z "," on_failure_PROCESSor_type ": "date"}}}, {"processor_type" : "set", "status" : "success", "tag" : "mark_failure", "doc" : { "_index" : "failed", "_type" : "_doc", "_id" : "_id", "_source" : { "request" : "/", "auth" : "-", "ident" : "-", "verb" : "GET", "message" : "" "83.149.9.216 - [17 / May / 2015:10:05:03 + 000] "GET/HTTP / 1.1" 24 "200", "" response" : "200", "bytes" : "24", "failure" : { "message" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21" }, "clientip" : "" httpversion 83.149.9.216", ":" 1.1 ", "timestamp" : "17 / May / 2015:10:05:03 + 000"}, "_ingest" : {" pipeline ": "_simulate_pipeline", "on_failure_message" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21", "on_failure_processor_tag" : null, "timestamp" : "On_failure_processor_type" : "date"}}}]}"Copy the code
Obviously there is a new field called failure in the _source above. It contains the corresponding error message. Since failure above is an object, we can actually add multiple fields to it, such as:
GET _ingest/pipeline/_simulate? verbose { "pipeline": { "processors": [ { "grok": { "field": "message", "patterns": [ "%{COMMONAPACHELOG}" ] } }, { "date": { "field": "timestamp", "formats": [ "dd/MMM/yyyy:HH:mm:ss Z" ] } }, { "remove": { "field": "message" } }, { "convert": { "field": "bytes", "type": "integer" } }, { "convert": { "tag": "convert_reponse", "field": "response", "type": "integer" } } ], "on_failure": [ { "set": { "field": "_index", "value": "failed" } }, { "set": { "tag": "mark_failure", "field": "failure", "value": { "message": "{{_ingest.on_failure_message}}", "processor": "{{_ingest.on_failure_processor_type}}" } } } ] }, "docs": [{" _source ": {" message" : "" "83.149.9.216 - [17 / May / 2015:10:05:03 + 000] "GET/HTTP / 1.1" 24 "200"} ", "_index" : "my_index" } ] }Copy the code
We added a field processor to make it easier to know which processor is faulty:
{ "docs" : [ { "processor_results" : [ { "processor_type" : "grok", "status" : "success", "doc" : { "_index" : "my_index", "_type" : "_doc", "_id" : "_id", "_source" : { "request" : "/", "auth" : "-", "ident" : "-", "verb" : "GET", "message" : "" "83.149.9.216 - [17 / May / 2015:10:05:03 + 000] "GET/HTTP / 1.1" 24 "200", "" response" : "200", "bytes" : "24", "clientip" : "83.149.9.216", "httpversion" : "1.1", "timestamp" : "17/May/2015:10:05:03 +000" }, "_ingest" : { "pipeline" : "_simulate_pipeline", "timestamp" : {"processor_type" : "date", "status" : "error", "error" : {"root_cause" : [ { "type" : "illegal_argument_exception", "reason" : "unable to parse date [17/May/2015:10:05:03 +000]" } ], "type" : "illegal_argument_exception", "reason" : "unable to parse date [17/May/2015:10:05:03 +000]", "caused_by" : { "type" : "illegal_argument_exception", "reason" : "failed to parse date field [17/May/2015:10:05:03 +000] with format [dd/MMM/yyyy:HH:mm:ss Z]", "caused_by" : { "type" : "date_time_parse_exception", "reason" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21" } } } }, { "processor_type" : "set", "status" : "success", "doc" : { "_index" : "failed", "_type" : "_doc", "_id" : "_id", "_source" : { "request" : "/", "auth" : "-", "ident" : "-", "verb" : "GET", "message" : "" "83.149.9.216 - [17 / May / 2015:10:05:03 + 000] "GET/HTTP / 1.1" 24 "200", "" response" : "200", "bytes" : "24", "clientip" : "83.149.9.216", "httpversion" : "1.1", "timestamp" : "17 / May / 2015:10:05:03 + 000"}, "_ingest" : { "pipeline" : "_simulate_pipeline", "on_failure_message" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21", "on_failure_processor_tag" : null, "timestamp" : "2020-11-17T12:42:27.811805Z", "ON_failure_PROCESSor_type" : "date"}}}, {"processor_type" : "set", "status" : "success", "tag" : "mark_failure", "doc" : { "_index" : "failed", "_type" : "_doc", "_id" : "_id", "_source" : { "request" : "/", "auth" : "-", "ident" : "-", "verb" : "GET", "message" : "" "83.149.9.216 - [17 / May / 2015:10:05:03 + 000] "GET/HTTP / 1.1" 24 "200", "" response" : "200", "bytes" : "24", "failure" : { "message" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21", "processor" : "Date"}, "clientip" : "83.149.9.216", "httpversion" : "1.1", "timestamp" : "17 / May / 2015:10:05:03 + 000"}, "_ingest" : { "pipeline" : "_simulate_pipeline", "on_failure_message" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21", "on_failure_processor_tag" : null, "timestamp" : "2020-11-17T12:42:27.811805z ", "on_failure_processor_type" : "date"}}}]}Copy the code
The above processing is at the pipeline level.
Processor level processing
We can directly catch and handle errors for each processor. For example, for date process:
GET _ingest/pipeline/_simulate? verbose { "pipeline": { "processors": [ { "grok": { "field": "message", "patterns": [ "%{COMMONAPACHELOG}" ] } }, { "date": { "field": "timestamp", "formats": [ "dd/MMM/yyyy:HH:mm:ss Z" ], "on_failure": [ { "set": { "tag": "set_default_date", "field": "@timestamp", "value": "{{_ingest.timestamp}}" } } ] } }, { "remove": { "field": "message" } }, { "convert": { "field": "bytes", "type": "integer" } }, { "convert": { "tag": "convert_reponse", "field": "response", "type": "integer" } } ], "on_failure": [ { "set": { "field": "_index", "value": "failed" } }, { "set": { "tag": "mark_failure", "field": "failure", "value": { "message": "{{_ingest.on_failure_message}}", "processor": "{{_ingest.on_failure_processor_type}}" } } } ] }, "docs": [{" _source ": {" message" : "" "83.149.9.216 - [17 / May / 2015:10:05:03 + 000] "GET/HTTP / 1.1" 24 "200"} ", "_index" : "my_index" } ] }Copy the code
Above, we added the following on_failure code for the Date Processor:
{
"date": {
"field": "timestamp",
"formats": [
"dd/MMM/yyyy:HH:mm:ss Z"
],
"on_failure": [
{
"set": {
"tag": "set_default_date",
"field": "@timestamp",
"value": "{{_ingest.timestamp}}"
}
}
]
}
}
Copy the code
When an error occurs, we simply use _ingest. Timestamp as the value of @timestamp. Run the pipeline above:
{ "docs" : [ { "processor_results" : [ { "processor_type" : "grok", "status" : "success", "doc" : { "_index" : "my_index", "_type" : "_doc", "_id" : "_id", "_source" : { "request" : "/", "auth" : "-", "ident" : "-", "verb" : "GET", "message" : "" "83.149.9.216 - [17 / May / 2015:10:05:03 + 000] "GET/HTTP / 1.1" 24 "200", "" response" : "200", "bytes" : "24", "clientip" : "83.149.9.216", "httpversion" : "1.1", "timestamp" : "17/May/2015:10:05:03 +000" }, "_ingest" : { "pipeline" : "_simulate_pipeline", "timestamp" : {"processor_type" : "date", "status" : "error", "error" : {"root_cause" : [ { "type" : "illegal_argument_exception", "reason" : "unable to parse date [17/May/2015:10:05:03 +000]" } ], "type" : "illegal_argument_exception", "reason" : "unable to parse date [17/May/2015:10:05:03 +000]", "caused_by" : { "type" : "illegal_argument_exception", "reason" : "failed to parse date field [17/May/2015:10:05:03 +000] with format [dd/MMM/yyyy:HH:mm:ss Z]", "caused_by" : { "type" : "date_time_parse_exception", "reason" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21" } } } }, { "processor_type" : "set", "status" : "success", "tag" : "set_default_date", "doc" : { "_index" : "my_index", "_type" : "_doc", "_id" : "_id", "_source" : { "request" : "/", "auth" : "-", "ident" : "-", "verb" : "GET", "message" : "" "83.149.9.216 - [17 / May / 2015:10:05:03 + 000] "GET/HTTP / 1.1" 24 "200", "" @ timestamp" : "2020-11-17T12:49:49.720153Z", "Response" : "200", "bytes" : "24", "clientip" : "83.149.9.216", "httpversion" : "1.1" and "timestamp", "17 / May / 2015:10:05:03 + 000"}, "_ingest" : {" pipeline ": "_simulate_pipeline", "on_failure_message" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21", "on_failure_processor_tag" : null, "timestamp" : "2020-11-17T12:49:49.720153z "," on_failure_PROCESSor_type ": "date"}}}, {"processor_type" : "remove", "status" : "success", "doc" : { "_index" : "my_index", "_type" : "_doc", "_id" : "_id", "_source" : { "request" : "/", "auth" : "-", "the ident" : "-", "verb" : "GET", "@ timestamp" : "the 2020-11-17 T12:49:49. 720153 z" and "response" : "200", "bytes" : "24", "clientip" : "83.149.9.216", "httpversion" : "1.1", "timestamp" : "17 / May / 2015:10:05:03 + 000"}, "_ingest" : {"pipeline" : "_simulate_pipeline", "timestamp" : "2020-11-17T12:49:49.720153z"}}}, {"processor_type" : "convert", "status" : "success", "doc" : { "_index" : "my_index", "_type" : "_doc", "_id" : "_id", "_source" : { "request" : "/", "auth" : "-", "ident" : "-", "verb" : "GET", "@timestamp" : "2020-11-17T12:49:49.720153Z", "Response" : "200", "bytes" : 24, "clientip" : "83.149.9.216", "httpversion" : "1.1", "timestamp" : "17/May/2015:10:05:03 +000"}, "_ingest" : {" "2020-11-17T12:49:49.720153z"}}}, {"processor_type" : "convert", "status" : "success", "tag" : "convert_reponse", "doc" : { "_index" : "my_index", "_type" : "_doc", "_id" : "_id", "_source" : { "request" : "/", "auth" : "-", "the ident" : "-", "verb" : "GET", "@ timestamp" : "the 2020-11-17 T12:49:49. 720153 z" and "response" : 200, "bytes" : 24, "clientip" : "83.149.9.216", "httpversion" : "1.1", "timestamp" : "17/May/2015:10:05:03 +000" }, "_ingest" : { "pipeline" : "_simulate_pipeline", "timestamp" : "2020-11-17T12:49:49.720153z"}}}]}]}Copy the code
Apparently in this run, set_default_date was called when an error occurred, and @timestamp” : “2020-11-17t12:49:49.720153z. This is obviously the time when the INGest pipeline is executed. This is a far cry from the time in the previous documentation. It all depends on your own business design, which values you want to use.
Next, let’s say we have corrected our time to +0000 again. We change the bytes value to a character that cannot be converted to a value, such as “-“.
"Message" : "" "83.149.9.216 - [17 / May / 2015:10:05:03 + 0000] "GET/HTTP / 1.1 200 -" "" "Copy the code
Re-run the pipeline and we will find error messages:
"error" : {
"root_cause" : [
{
"type" : "illegal_argument_exception",
"reason" : "field [bytes] not present as part of path [bytes]"
}
],
"type" : "illegal_argument_exception",
"reason" : "field [bytes] not present as part of path [bytes]"
}
Copy the code
Similarly, we can customize an on_failure for this processor:
GET _ingest/pipeline/_simulate? verbose { "pipeline": { "processors": [ { "grok": { "field": "message", "patterns": [ "%{COMMONAPACHELOG}" ] } }, { "date": { "field": "timestamp", "formats": [ "dd/MMM/yyyy:HH:mm:ss Z" ], "on_failure": [ { "set": { "tag": "set_default_date", "field": "@timestamp", "value": "{{_ingest.timestamp}}" } } ] } }, { "remove": { "field": "message" } }, { "convert": { "field": "bytes", "type": "integer", "on_failure":[ { "set": { "field": "bytes", "value": -1 } } ] } }, { "convert": { "tag": "convert_reponse", "field": "response", "type": "integer" } } ], "on_failure": [ { "set": { "field": "_index", "value": "failed" } }, { "set": { "tag": "mark_failure", "field": "failure", "value": { "message": "{{_ingest.on_failure_message}}", "processor": "{{_ingest.on_failure_processor_type}}" } } } ] }, "docs": [ { "_source": { "message": "" "83.149.9.216 - [17 / May / 2015:10:05:03 + 0000] "GET/HTTP / 1.1 200 -" "" "}," _index ":" my_index}]}"Copy the code
We added the desired code:
{
"convert": {
"field": "bytes",
"type": "integer",
"on_failure":[
{
"set": {
"field": "bytes",
"value": -1
}
}
]
}
}
Copy the code
That is, when an error occurs, we simply set bytes to -1:
{ "docs" : [ { "processor_results" : [ { "processor_type" : "grok", "status" : "success", "doc" : { "_index" : "my_index", "_type" : "_doc", "_id" : "_id", "_source" : { "request" : "/", "auth" : "-", "ident" : "-", "response" : "200", "clientip" : "83.149.9.216", "verb" : "GET", "httpversion" : "1.1", "message" : "" "83.149.9.216 - [17 / May / 2015:10:05:03 + 0000] "GET/HTTP / 1.1" 200 - "" "," timestamp ": "17/May/2015:10:05:03 +0000" }, "_ingest" : { "pipeline" : "_simulate_pipeline", "timestamp" : "The 2020-11-17 T12: they. 385189 z"}}}, {" processor_type ":" date ", "status" : "success", "doc" : {" _index ": "my_index", "_type" : "_doc", "_id" : "_id", "_source" : { "request" : "/", "auth" : "-", "ident" : "-", "verb" : "GET", "message" : "" "83.149.9.216 - [17 / May / 2015:10:05:03 + 0000] "GET/HTTP / 1.1" 200 - "" "," @ timestamp ": "2015-05-17T10:05:03.000z ", "response" : "200", "clientip" : "83.149.9.216", "httpversion" : "1.1", "timestamp" : "17/May/2015:10:05:03 +0000" }, "_ingest" : { "pipeline" : "_simulate_pipeline", "timestamp" : "2020-11-17T12:59:19.385189z"}}}, {"processor_type" : "remove", "status" : "success", "doc" : {"_index" : "my_index", "_type" : "_doc", "_id" : "_id", "_source" : { "request" : "/", "@timestamp" : "The 2015-05-17 T10:05:03. 000 z", "auth" : "-", "the ident" : "-", "response" : "200", "clientip" : "83.149.9.216", "verb" : "GET", "httpversion" : "1.1", "timestamp" : "17 / May / 2015:10:05:03 + 0000"}, "_ingest" : {" pipeline ": "_simulate_pipeline", "TIMESTAMP" : "2020-11-17T12:59:19.385189z"}}}, {"processor_type" : "convert", "status" : "error", "error" : { "root_cause" : [ { "type" : "illegal_argument_exception", "reason" : "field [bytes] not present as part of path [bytes]" } ], "type" : "illegal_argument_exception", "reason" : "field [bytes] not present as part of path [bytes]" } }, { "processor_type" : "set", "status" : "success", "doc" : { "_index" : "my_index", "_type" : "_doc", "_id" : "_id", "_source" : { "request" : "/", "auth" : "-", "ident" : "-", "verb" : "GET", "@ timestamp" : "the 2015-05-17 T10:05:03. 000 z" and "response" : "200", "bytes" : 1, "clientip" : "" httpversion 83.149.9.216", ":" 1.1 ", "timestamp" : "17 / May / 2015:10:05:03 + 0000"}, "_ingest" : {" pipeline ": "_simulate_pipeline", "on_failure_message" : "field [bytes] not present as part of path [bytes]", "on_failure_processor_tag" : null, "timestamp" : "2020-11-17T12:59:19.385189z ", "on_failure_processor_type" : "convert"}}}, {"processor_type" : "convert", "status" : "success", "tag" : "convert_reponse", "doc" : { "_index" : "my_index", "_type" : "_doc", "_id" : "_id", "_source" : { "request" : "/", "auth" : "-", "ident" : "-", "verb" : "GET", "@timestamp" : "2015-05-17T10:05:03.000z "," Response ": 200, "bytes" : -1, "clientip" : "83.149.9.216", "httpversion" : "1.1", "timestamp" : "17/May/2015:10:05:03 +0000"}, "_ingest" : {"pipeline" : "_simulate_pipeline", "timestamp" : "2020-11-17T12:59:19.385189z"}}}]}Copy the code
We can see from the output above that byte has been set to -1.
Well, that’s all for today’s sharing. Hope you know how to deal with pipeline errors, and do the corresponding processing.