Micro public account: Operation and maintenance development story, author: Xia Teacher
What is the Rule
Prometheus supports user – defined rules. Rules are classified into Recording rules and Alerting rules. The main purpose of Recording Rule is to query, aggregate, and perform various operations on Prometheus samples in real time through PromQL. In some cases where PromQL is complex and computationally heavy, direct use of PromQL may result in Prometheus response timeouts. At this time, a mechanism similar to background batch processing is needed to complete the calculation of these complex operations in the background, and users only need to query the results of these operations. Prometheus supports this backstage computing method through Recoding rules to optimize the performance of complex queries and improve query efficiency. Today is mainly about alarm analysis. Alarm rules in Prometheus allow you to define alarm trigger conditions based on PromQL expressions. The Prometheus back end calculates these trigger rules periodically and triggers alarm notifications when the trigger conditions are met.
What is an alarm Rule
Alarm is an important function of Prometheus, and the following is an analysis of the alarm execution process from the source code perspective.
How do I define an alarm Rule
A typical alarm rule is as follows:
groups:
- name: example
rules:
- alert: HighErrorRate
The indicator must be greater than 0.5 within 10 minutes before the alarm is generated.
expr: job:request_latency_seconds:mean5m{job="myjob"} > 0.5
for: 10m
labels:
severity: page
annotations:
summary: High request latency
description: description info
Copy the code
In the alarm rules file, we can define a set of related rule Settings under a group. We can define multiple alarm rules in each group. An alarm rule consists of the following parts:
- Alert: indicates the name of an alarm rule.
- Expr: based on the PromQL expression, it is used to calculate whether a time series meets the alarm triggering condition.
- For: Evaluates the waiting time. This parameter is optional. An alarm is sent only when the triggering condition lasts for a period of time. The status of new alarms generated during the waiting period is Pending.
- Labels: User-defined labels that allow the user to specify a set of additional labels to attach to an alarm.
- Annotations: Used to specify a set of additional information, such as the text used to describe alarm details, that are sent to the Alertmanager as a parameter when an alarm is generated.
Rule manager
The rule manager calculates whether a time series meets the condition based on the triggering condition of the PromQL expression alarm based on the configured rule. When the conditions are met, the alarm information is sent to the alarm service.
type Manager struct {
opts *ManagerOptions // External dependencies
groups map[string]*Group // The current rule group
mtx sync.RWMutex // The rule manager reads and writes locks
block chan struct{}
done chan struct{}
restored bool
logger log.Logger
}
Copy the code
- Opts (*ManagerOptions) : records other modules used by the Manager instance, such as the storage module and notify module.
- Groups (map[string]*Group type) : records all instances of rules.Group, where the key consists of the name of the rules.Group and its configuration file.
- MTX (sync.RWMutex type) : This lock is required when reading or writing groups fields.
Read the Rule group configuration
During Prometheus Server startup, the Rule configuration file is first loaded and parsed by calling the manager.update () method as follows.
- Call the manager.loadGroups () method to load and parse the Rule configuration file, resulting in a collection of rules.group instances.
- Stop the old rules.group instance and start the new rules.group instance. A Goroutine is launched for each rules.group instance, which is associated with all PromQL queries under the rules.group instance.
func (m *Manager) Update(interval time.Duration, files []string, externalLabels labels.Labels, externalURL string) error {
m.mtx.Lock()
defer m.mtx.Unlock()
// Loads rules from the current file
groups, errs := m.LoadGroups(interval, externalLabels, externalURL, files...)
iferrs ! =nil {
for _, e := range errs {
level.Error(m.logger).Log("msg"."loading groups failed"."err", e)
}
return errors.New("error loading rules, previous rule set restored")
}
m.restored = true
var wg sync.WaitGroup
// Loop through the rule group
for _, newg := range groups {
// If there is an old group with the same identifier,
// check if new group equals with the old group, if yes then skip it.
// If not equals, stop it and wait for it to finish the current iteration.
// Then copy it into the new group.
// Get the rule Group name based on the new rules.group information
gn := GroupKey(newg.file, newg.name)
// Get the old rule Group by rule Group name and delete the old rules.group instance
oldg, ok := m.groups[gn]
delete(m.groups, gn)
if ok && oldg.Equals(newg) {
groups[gn] = oldg
continue
}
wg.Add(1)
// Start a Goroutine for each rules.group instance
go func(newg *Group) {
if ok {
oldg.stop()
// Copy the state information from the old rule group to the new rule group
newg.CopyState(oldg)
}
wg.Done()
// Wait with starting evaluation until the rule manager
// is told to run. This is necessary to avoid running
// queries against a bootstrapping storage.
<-m.block
// Call the rules.group.run () method to periodically execute PromQl statements
newg.run(m.opts.Context)
}(newg)
}
// Stop remaining old groups.
// Stop all old rule group services
wg.Add(len(m.groups))
for n, oldg := range m.groups {
go func(n string, g *Group) {
g.markStale = true
g.stop()
ifm := g.metrics; m ! =nil {
m.IterationsMissed.DeleteLabelValues(n)
m.IterationsScheduled.DeleteLabelValues(n)
m.EvalTotal.DeleteLabelValues(n)
m.EvalFailures.DeleteLabelValues(n)
m.GroupInterval.DeleteLabelValues(n)
m.GroupLastEvalTime.DeleteLabelValues(n)
m.GroupLastDuration.DeleteLabelValues(n)
m.GroupRules.DeleteLabelValues(n)
m.GroupSamples.DeleteLabelValues((n))
}
wg.Done()
}(n, oldg)
}
wg.Wait()
// Update the rule group in the rule Manager
m.groups = groups
return nil
}
Copy the code
Run the Rule group scheduling method
Rule Group startup process (group.run) : After entering the group.run method, the initialization wait is performed to ensure that the operation time of the rule is at the same time and the period is G.interval. Then define the rule operation scheduling method: iter, scheduling period is G.interval; The next level of regular operation scheduling is performed by calling the g. val method in the iter method. Rules of operation scheduling cycle g.i nterval has Prometheus. Yml in the global configuration file [evaluation_interval: | default = 1 m] specified. The implementation is as follows:
func (g *Group) run(ctx context.Context) {
defer close(g.terminated)
// Wait an initial amount to have consistently slotted intervals.
evalTimestamp := g.EvalTimestamp(time.Now().UnixNano()).Add(g.interval)
select {
case <-time.After(time.Until(evalTimestamp)):// Initialize the wait
case <-g.done:
return
}
ctx = promql.NewOriginContext(ctx, map[string]interface{} {"ruleGroup": map[string]string{
"file": g.File(),
"name": g.Name(),
},
})
// Define rule group rule operation scheduling algorithm
iter := func(a) {
g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Inc()
start := time.Now()
// Entry to regular operations
g.Eval(ctx, evalTimestamp)
timeSinceStart := time.Since(start)
g.metrics.IterationDuration.Observe(timeSinceStart.Seconds())
g.setEvaluationTime(timeSinceStart)
g.setLastEvaluation(start)
}
// The assumption here is that since the ticker was started after having
// waited for `evalTimestamp` to pass, the ticks will trigger soon
// after each `evalTimestamp + N * g.interval` occurrence.
tick := time.NewTicker(g.interval) // Set the rule operation timer
defer tick.Stop()
defer func(a) {
if! g.markStale {return
}
go func(now time.Time) {
for _, rule := range g.seriesInPreviousEval {
for _, r := range rule {
g.staleSeries = append(g.staleSeries, r)
}
}
// That can be garbage collected at this point.
g.seriesInPreviousEval = nil
// Wait for 2 intervals to give the opportunity to renamed rules
// to insert new series in the tsdb. At this point if there is a
// renamed rule, it should already be started.
select {
case <-g.managerDone:
case <-time.After(2 * g.interval):
g.cleanupStaleSeries(ctx, now)
}
}(time.Now())
}()
// Call the scheduling method of rule group rule operations
iter()
if g.shouldRestore {
// If we have to restore, we wait for another Eval to finish.
// The reason behind this is, during first eval (or before it)
// we might not have enough data scraped, and recording rules would not
// have updated the latest values, on which some alerts might depend.
select {
case <-g.done:
return
case <-tick.C:
missed := (time.Since(evalTimestamp) / g.interval) - 1
if missed > 0 {
g.metrics.IterationsMissed.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed))
g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed))
}
evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval)
iter()
}
g.RestoreForState(time.Now())
g.shouldRestore = false
}
for {
select {
case <-g.done:
return
default:
select {
case <-g.done:
return
case <-tick.C:
missed := (time.Since(evalTimestamp) / g.interval) - 1
if missed > 0 {
g.metrics.IterationsMissed.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed))
g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed))
}
evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval)
// Call the scheduling method of rule group rule operations
iter()
}
}
}
}
Copy the code
Run the Rule scheduling method
In Group.Eval, each rule in a rule Group is executed in the QueryFunc queryEngine (promQL). If an AlertingRule type is executed, Then the execution result indicator is sent to the alarm service by the NotifyFunc component. In the case of RecordingRule, the modified indicator is stored in the storage manager of Prometheus and the expired indicator is marked for storage.
// Eval runs a single evaluation cycle in which all rules are evaluated sequentially.
func (g *Group) Eval(ctx context.Context, ts time.Time) {
var samplesTotal float64Traverses all rules in the current rule groupfor i, rule := range g.rules {
select {
case <-g.done:
return
default:}func(i int, rule Rule) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "rule")
sp.SetTag("name", rule.Name())
defer func(t time.Time) {
sp.Finish()
// Update service metrics - rule execution time
since := time.Since(t)
g.metrics.EvalDuration.Observe(since.Seconds())
rule.SetEvaluationDuration(since)
// Record the rule execution time
rule.SetEvaluationTimestamp(t)
}(time.Now())
// Count the number of rule operations
g.metrics.EvalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()
// Run the following command
vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL)
iferr ! =nil {
// The query is aborted when the rule fails
rule.SetHealth(HealthBad)
rule.SetLastError(err)
// Record the number of query failures
g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()
// Canceled queries are intentional termination of queries. This normally
// happens on shutdown and thus we skip logging of any errors here.
if_, ok := err.(promql.ErrQueryCanceled); ! ok { level.Warn(g.logger).Log("msg"."Evaluating rule failed"."rule", rule, "err", err)
}
return
}
samplesTotal += float64(len(vector))
// Check whether it is an alarm type rule
ifar, ok := rule.(*AlertingRule); Ok {send alarm ar.sendAlerts(CTX, ts, g.o.pts.ResendDelay, G.INterval, g.o.pts.NotifyFunc)}var (
numOutOfOrder = 0
numDuplicates = 0
)
// here is a Recording to obtain a memory indicator
app := g.opts.Appendable.Appender(ctx)
seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i]))
defer func(a) {
iferr := app.Commit(); err ! =nil {
rule.SetHealth(HealthBad)
rule.SetLastError(err)
g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()
level.Warn(g.logger).Log("msg"."Rule sample appending failed"."err", err)
return
}
g.seriesInPreviousEval[i] = seriesReturned
}()
for _, s := range vector {
if _, err := app.Append(0, s.Metric, s.T, s.V); err ! =nil {
rule.SetHealth(HealthBad)
rule.SetLastError(err)
switchErrors. Cause(err) {Stores the processing of various error codes returned by indicatorscase storage.ErrOutOfOrderSample:
numOutOfOrder++
level.Debug(g.logger).Log("msg"."Rule evaluation result discarded"."err", err, "sample", s)
case storage.ErrDuplicateSampleForTimestamp:
numDuplicates++
level.Debug(g.logger).Log("msg"."Rule evaluation result discarded"."err", err, "sample", s)
default:
level.Warn(g.logger).Log("msg"."Rule evaluation result discarded"."err", err, "sample", s)
}
} elseSeriesReturned [s.Metric.String()] = s.Metric}}if numOutOfOrder > 0 {
level.Warn(g.logger).Log("msg"."Error on ingesting out-of-order result from rule evaluation"."numDropped", numOutOfOrder)
}
if numDuplicates > 0 {
level.Warn(g.logger).Log("msg"."Error on ingesting results from rule evaluation with different value but same timestamp"."numDropped", numDuplicates)
}
for metric, lset := range g.seriesInPreviousEval[i] {
if_, ok := seriesReturned[metric]; ! ok {// Set the expiration indicator value
// Series no longer exposed, mark it stale.
_, err = app.Append(0, lset, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN))
switch errors.Cause(err) {
case nil:
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
// Do not count these in logging, as this is expected if series
// is exposed from a different rule.
default:
level.Warn(g.logger).Log("msg"."Adding stale sample failed"."sample", metric, "err", err)
}
}
}
}(i, rule)
}
ifg.metrics ! =nil {
g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal)
}
g.cleanupStaleSeries(ctx, ts)
}
Copy the code
Then there’s the execution of the rule, so let’s just look at the flow of an AlertingRule. Let’s take a look at the structure of an AlertingRule:
// An AlertingRule generates alerts from its vector expression.
type AlertingRule struct {
// The name of the alert.
name string
// The vector expression from which to generate alerts.
vector parser.Expr
// The duration for which a labelset needs to persist in the expression
// output vector before an alert transitions from Pending to Firing state.
holdDuration time.Duration
// Extra labels to attach to the resulting alert sample vectors.
labels labels.Labels
// Non-identifying key/value pairs.
annotations labels.Labels
// External labels from the global config.
externalLabels map[string]string
// true if old state has been restored. We start persisting samples for ALERT_FOR_STATE
// only after the restoration.
restored bool
// Protects the below.
mtx sync.Mutex
// Time in seconds taken to evaluate rule.
evaluationDuration time.Duration
// Timestamp of last evaluation of rule.
evaluationTimestamp time.Time
// The health of the alerting rule.
health RuleHealth
// The last error seen by the alerting rule.
lastError error
// A map of alerts which are currently active (Pending or Firing), keyed by
// the fingerprint of the labelset they correspond to.
active map[uint64]*Alert
logger log.Logger
}
Copy the code
The most important field is the active field, which stores the resources that need to be reported after the rule is executed. A series of logic is executed to determine whether an alarm condition is met. The logic is as follows:
func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, externalURL *url.URL) (promql.Vector, error) {
res, err := query(ctx, r.vector.String(), ts)
iferr ! =nil {
r.SetHealth(HealthBad)
r.SetLastError(err)
return nil, err
}
/ /...
}
Copy the code
This step executes the EXPR expression in the rule configuration through the QueryFunc function passed in when the Manager is created, and then returns the result, which is a collection of metrics that satisfy the expression. For example, the rule is as follows:
cpu_usage > 90
Copy the code
Well, then what we found out could be
cpu_usage{instance="192.168.0.11"} 91
cpu_usage{instance="192.168.0.12"} 92
Copy the code
Then iterate the query result, generate a hash value according to the label of the indicator, and determine whether the hash value has existed before (i.e. whether the same indicator data has been returned before). If so, update the value and Annotations of the last time. If not, A new alert is created and saved to the active Alert list under this rule. Then, the active Alert list of the rule is traversed, and the status of the alert is modified according to the configuration of the duration of the rule, the last trigger time of the alert, the current status of the alert, whether the query alert still exists and other information. The specific rules are as follows:
- If alert existed before, but does not exist at this time of execution
- If the status is StatePending or the check time has been more than 15 minutes since the last trigger (15 minutes is written dead constant), the alert is removed from the active list
- The alert whose state is not StateInactive is changed to StateInactive
- If alert existed before and this execution still exists
- Alert is StatePending and the time since the last check has exceeded the configured duration for, then the state is changed to StateFiring
- In other cases change the status of alert to StatePending
The previous step only modifies the alert status, but does not actually send an alert. The following is the actual alarm operation:
// Check whether the rule is an alert rule, and if so, send an alarm message (the logic in ar.sendalerts determines whether the rule is really sent)
if ar, ok := rule.(*AlertingRule); ok {
ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc)
}
/ /...
func (r *AlertingRule) sendAlerts(ctx context.Context, ts time.Time, resendDelay time.Duration, interval time.Duration, notifyFunc NotifyFunc) {
alerts := []*Alert{}
r.ForEachActiveAlert(func(alert *Alert) {
if alert.needsSending(ts, resendDelay) {
alert.LastSentAt = ts
// Allow for two Eval or Alertmanager send failures.
delta := resendDelay
if interval > resendDelay {
delta = interval
}
alert.ValidUntil = ts.Add(4 * delta)
anew := *alert
alerts = append(alerts, &anew)
}
})
notifyFunc(ctx, r.vector.String(), alerts...)
}
func (a *Alert) needsSending(ts time.Time, resendDelay time.Duration) bool {
if a.State == StatePending {
return false
}
// if an alert has been resolved since the last send, resend it
if a.ResolvedAt.After(a.LastSentAt) {
return true
}
return a.LastSentAt.Add(resendDelay).Before(ts)
}
Copy the code
To summarize the above logic:
- If the status of alert is StatePending, no alarm is sent
- If the alert has been resolved, send another alarm to indicate that the message has been resolved
- If the time between the current time and the time when the last alarm is sent is longer than the ResendDelay, the alarm is sent. Otherwise, the alarm is not sent
This is the alarm process for Prometheus. Learning this process is mainly about being able to do secondary development for Prometheus rules. You can change the LoadGroups () method to dynamically load rules defined in mysql and dynamically update alarm rules.