Micro public account: Operation and maintenance development story, author: Xia Teacher

What is the Rule

Prometheus supports user – defined rules. Rules are classified into Recording rules and Alerting rules. The main purpose of Recording Rule is to query, aggregate, and perform various operations on Prometheus samples in real time through PromQL. In some cases where PromQL is complex and computationally heavy, direct use of PromQL may result in Prometheus response timeouts. At this time, a mechanism similar to background batch processing is needed to complete the calculation of these complex operations in the background, and users only need to query the results of these operations. Prometheus supports this backstage computing method through Recoding rules to optimize the performance of complex queries and improve query efficiency. Today is mainly about alarm analysis. Alarm rules in Prometheus allow you to define alarm trigger conditions based on PromQL expressions. The Prometheus back end calculates these trigger rules periodically and triggers alarm notifications when the trigger conditions are met.

What is an alarm Rule

Alarm is an important function of Prometheus, and the following is an analysis of the alarm execution process from the source code perspective.

How do I define an alarm Rule

A typical alarm rule is as follows:

groups:
- name: example
  rules:
  - alert: HighErrorRate
    The indicator must be greater than 0.5 within 10 minutes before the alarm is generated.
    expr: job:request_latency_seconds:mean5m{job="myjob"} > 0.5
    for: 10m
    labels:
      severity: page
    annotations:
      summary: High request latency
      description: description info
Copy the code

In the alarm rules file, we can define a set of related rule Settings under a group. We can define multiple alarm rules in each group. An alarm rule consists of the following parts:

  • Alert: indicates the name of an alarm rule.
  • Expr: based on the PromQL expression, it is used to calculate whether a time series meets the alarm triggering condition.
  • For: Evaluates the waiting time. This parameter is optional. An alarm is sent only when the triggering condition lasts for a period of time. The status of new alarms generated during the waiting period is Pending.
  • Labels: User-defined labels that allow the user to specify a set of additional labels to attach to an alarm.
  • Annotations: Used to specify a set of additional information, such as the text used to describe alarm details, that are sent to the Alertmanager as a parameter when an alarm is generated.

Rule manager

The rule manager calculates whether a time series meets the condition based on the triggering condition of the PromQL expression alarm based on the configured rule. When the conditions are met, the alarm information is sent to the alarm service.

type Manager struct {
	opts     *ManagerOptions // External dependencies
	groups   map[string]*Group // The current rule group
	mtx      sync.RWMutex // The rule manager reads and writes locks
	block    chan struct{} 
	done     chan struct{} 
	restored bool 

	logger log.Logger 
}

Copy the code
  • Opts (*ManagerOptions) : records other modules used by the Manager instance, such as the storage module and notify module.
  • Groups (map[string]*Group type) : records all instances of rules.Group, where the key consists of the name of the rules.Group and its configuration file.
  • MTX (sync.RWMutex type) : This lock is required when reading or writing groups fields.

Read the Rule group configuration

During Prometheus Server startup, the Rule configuration file is first loaded and parsed by calling the manager.update () method as follows.

  • Call the manager.loadGroups () method to load and parse the Rule configuration file, resulting in a collection of rules.group instances.
  • Stop the old rules.group instance and start the new rules.group instance. A Goroutine is launched for each rules.group instance, which is associated with all PromQL queries under the rules.group instance.
func (m *Manager) Update(interval time.Duration, files []string, externalLabels labels.Labels, externalURL string) error {
	m.mtx.Lock()
	defer m.mtx.Unlock()
    // Loads rules from the current file
	groups, errs := m.LoadGroups(interval, externalLabels, externalURL, files...)
	iferrs ! =nil {
		for _, e := range errs {
			level.Error(m.logger).Log("msg"."loading groups failed"."err", e)
		}
		return errors.New("error loading rules, previous rule set restored")
	}
	m.restored = true

	var wg sync.WaitGroup
    // Loop through the rule group
	for _, newg := range groups {
		// If there is an old group with the same identifier,
		// check if new group equals with the old group, if yes then skip it.
		// If not equals, stop it and wait for it to finish the current iteration.
		// Then copy it into the new group.
        // Get the rule Group name based on the new rules.group information
		gn := GroupKey(newg.file, newg.name)
        // Get the old rule Group by rule Group name and delete the old rules.group instance
		oldg, ok := m.groups[gn]
		delete(m.groups, gn)

		if ok && oldg.Equals(newg) {
			groups[gn] = oldg
			continue
		}

		wg.Add(1)
        // Start a Goroutine for each rules.group instance
		go func(newg *Group) {
			if ok {
				oldg.stop()
                // Copy the state information from the old rule group to the new rule group
				newg.CopyState(oldg)
			}
			wg.Done()
			// Wait with starting evaluation until the rule manager
			// is told to run. This is necessary to avoid running
			// queries against a bootstrapping storage.
			<-m.block
            // Call the rules.group.run () method to periodically execute PromQl statements
			newg.run(m.opts.Context)
		}(newg)
	}

	// Stop remaining old groups.
    // Stop all old rule group services
	wg.Add(len(m.groups))
	for n, oldg := range m.groups {
		go func(n string, g *Group) {
			g.markStale = true
			g.stop()
			ifm := g.metrics; m ! =nil {
				m.IterationsMissed.DeleteLabelValues(n)
				m.IterationsScheduled.DeleteLabelValues(n)
				m.EvalTotal.DeleteLabelValues(n)
				m.EvalFailures.DeleteLabelValues(n)
				m.GroupInterval.DeleteLabelValues(n)
				m.GroupLastEvalTime.DeleteLabelValues(n)
				m.GroupLastDuration.DeleteLabelValues(n)
				m.GroupRules.DeleteLabelValues(n)
				m.GroupSamples.DeleteLabelValues((n))
			}
			wg.Done()
		}(n, oldg)
	}

	wg.Wait()
    // Update the rule group in the rule Manager
	m.groups = groups 

	return nil
}
Copy the code

Run the Rule group scheduling method

Rule Group startup process (group.run) : After entering the group.run method, the initialization wait is performed to ensure that the operation time of the rule is at the same time and the period is G.interval. Then define the rule operation scheduling method: iter, scheduling period is G.interval; The next level of regular operation scheduling is performed by calling the g. val method in the iter method. Rules of operation scheduling cycle g.i nterval has Prometheus. Yml in the global configuration file [evaluation_interval: | default = 1 m] specified. The implementation is as follows:

func (g *Group) run(ctx context.Context) {
	defer close(g.terminated)

	// Wait an initial amount to have consistently slotted intervals.
	evalTimestamp := g.EvalTimestamp(time.Now().UnixNano()).Add(g.interval)
	select {
	case <-time.After(time.Until(evalTimestamp)):// Initialize the wait
	case <-g.done:
		return
	}

	ctx = promql.NewOriginContext(ctx, map[string]interface{} {"ruleGroup": map[string]string{
			"file": g.File(),
			"name": g.Name(),
		},
	})
    // Define rule group rule operation scheduling algorithm
	iter := func(a) {
		g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Inc()

		start := time.Now()
        // Entry to regular operations
		g.Eval(ctx, evalTimestamp)
		timeSinceStart := time.Since(start)

		g.metrics.IterationDuration.Observe(timeSinceStart.Seconds())
		g.setEvaluationTime(timeSinceStart)
		g.setLastEvaluation(start)
	}

	// The assumption here is that since the ticker was started after having
	// waited for `evalTimestamp` to pass, the ticks will trigger soon
	// after each `evalTimestamp + N * g.interval` occurrence.
	tick := time.NewTicker(g.interval) // Set the rule operation timer
	defer tick.Stop()

	defer func(a) {
		if! g.markStale {return
		}
		go func(now time.Time) {
			for _, rule := range g.seriesInPreviousEval {
				for _, r := range rule {
					g.staleSeries = append(g.staleSeries, r)
				}
			}
			// That can be garbage collected at this point.
			g.seriesInPreviousEval = nil
			// Wait for 2 intervals to give the opportunity to renamed rules
			// to insert new series in the tsdb. At this point if there is a
			// renamed rule, it should already be started.
			select {
			case <-g.managerDone:
			case <-time.After(2 * g.interval):
				g.cleanupStaleSeries(ctx, now)
			}
		}(time.Now())
	}()
    // Call the scheduling method of rule group rule operations
	iter()
	if g.shouldRestore {
		// If we have to restore, we wait for another Eval to finish.
		// The reason behind this is, during first eval (or before it)
		// we might not have enough data scraped, and recording rules would not
		// have updated the latest values, on which some alerts might depend.
		select {
		case <-g.done:
			return
		case <-tick.C:
			missed := (time.Since(evalTimestamp) / g.interval) - 1
			if missed > 0 {
				g.metrics.IterationsMissed.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed))
				g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed))
			}
			evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval)
			iter()
		}

		g.RestoreForState(time.Now())
		g.shouldRestore = false
	}

	for {
		select {
		case <-g.done:
			return
		default:
			select {
			case <-g.done:
				return
			case <-tick.C:
				missed := (time.Since(evalTimestamp) / g.interval) - 1
				if missed > 0 {
					g.metrics.IterationsMissed.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed))
					g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed))
				}
				evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval)
                // Call the scheduling method of rule group rule operations
				iter()
			}
		}
	}
}
Copy the code

Run the Rule scheduling method

In Group.Eval, each rule in a rule Group is executed in the QueryFunc queryEngine (promQL). If an AlertingRule type is executed, Then the execution result indicator is sent to the alarm service by the NotifyFunc component. In the case of RecordingRule, the modified indicator is stored in the storage manager of Prometheus and the expired indicator is marked for storage.

// Eval runs a single evaluation cycle in which all rules are evaluated sequentially.
func (g *Group) Eval(ctx context.Context, ts time.Time) {
	var samplesTotal float64Traverses all rules in the current rule groupfor i, rule := range g.rules {
		select {
		case <-g.done:
			return
		default:}func(i int, rule Rule) {
			sp, ctx := opentracing.StartSpanFromContext(ctx, "rule")
			sp.SetTag("name", rule.Name())
			defer func(t time.Time) {
				sp.Finish()
                // Update service metrics - rule execution time
				since := time.Since(t)
				g.metrics.EvalDuration.Observe(since.Seconds())
				rule.SetEvaluationDuration(since)
                // Record the rule execution time
				rule.SetEvaluationTimestamp(t)
			}(time.Now())
            // Count the number of rule operations
			g.metrics.EvalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()
            // Run the following command
			vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL)
			iferr ! =nil {
                // The query is aborted when the rule fails
				rule.SetHealth(HealthBad)
				rule.SetLastError(err)
                // Record the number of query failures
				g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()

				// Canceled queries are intentional termination of queries. This normally
				// happens on shutdown and thus we skip logging of any errors here.
				if_, ok := err.(promql.ErrQueryCanceled); ! ok { level.Warn(g.logger).Log("msg"."Evaluating rule failed"."rule", rule, "err", err)
				}
				return
			}
			samplesTotal += float64(len(vector))
            // Check whether it is an alarm type rule
			ifar, ok := rule.(*AlertingRule); Ok {send alarm ar.sendAlerts(CTX, ts, g.o.pts.ResendDelay, G.INterval, g.o.pts.NotifyFunc)}var (
				numOutOfOrder = 0
				numDuplicates = 0
			)
          // here is a Recording to obtain a memory indicator
			app := g.opts.Appendable.Appender(ctx)
			seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i]))
			defer func(a) {
				iferr := app.Commit(); err ! =nil {
					rule.SetHealth(HealthBad)
					rule.SetLastError(err)
					g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()

					level.Warn(g.logger).Log("msg"."Rule sample appending failed"."err", err)
					return
				}
				g.seriesInPreviousEval[i] = seriesReturned
			}()

			for _, s := range vector {
				if _, err := app.Append(0, s.Metric, s.T, s.V); err ! =nil {
					rule.SetHealth(HealthBad)
					rule.SetLastError(err)

					switchErrors. Cause(err) {Stores the processing of various error codes returned by indicatorscase storage.ErrOutOfOrderSample:
						numOutOfOrder++
						level.Debug(g.logger).Log("msg"."Rule evaluation result discarded"."err", err, "sample", s)
					case storage.ErrDuplicateSampleForTimestamp:
						numDuplicates++
						level.Debug(g.logger).Log("msg"."Rule evaluation result discarded"."err", err, "sample", s)
					default:
						level.Warn(g.logger).Log("msg"."Rule evaluation result discarded"."err", err, "sample", s)
					}
				} elseSeriesReturned [s.Metric.String()] = s.Metric}}if numOutOfOrder > 0 {
				level.Warn(g.logger).Log("msg"."Error on ingesting out-of-order result from rule evaluation"."numDropped", numOutOfOrder)
			}
			if numDuplicates > 0 {
				level.Warn(g.logger).Log("msg"."Error on ingesting results from rule evaluation with different value but same timestamp"."numDropped", numDuplicates)
			}

			for metric, lset := range g.seriesInPreviousEval[i] {
				if_, ok := seriesReturned[metric]; ! ok {// Set the expiration indicator value
					// Series no longer exposed, mark it stale.
					_, err = app.Append(0, lset, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN))
					switch errors.Cause(err) {
					case nil:
					case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
						// Do not count these in logging, as this is expected if series
						// is exposed from a different rule.
					default:
						level.Warn(g.logger).Log("msg"."Adding stale sample failed"."sample", metric, "err", err)
					}
				}
			}
		}(i, rule)
	}
	ifg.metrics ! =nil {
		g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal)
	}
	g.cleanupStaleSeries(ctx, ts)
}
Copy the code

Then there’s the execution of the rule, so let’s just look at the flow of an AlertingRule. Let’s take a look at the structure of an AlertingRule:

// An AlertingRule generates alerts from its vector expression.
type AlertingRule struct {
    // The name of the alert.
    name string
    // The vector expression from which to generate alerts.
    vector parser.Expr
    // The duration for which a labelset needs to persist in the expression
    // output vector before an alert transitions from Pending to Firing state.
    holdDuration time.Duration
    // Extra labels to attach to the resulting alert sample vectors.
    labels labels.Labels
    // Non-identifying key/value pairs.
    annotations labels.Labels
    // External labels from the global config.
    externalLabels map[string]string
    // true if old state has been restored. We start persisting samples for ALERT_FOR_STATE
    // only after the restoration.
    restored bool
    // Protects the below.
    mtx sync.Mutex
    // Time in seconds taken to evaluate rule.
    evaluationDuration time.Duration
    // Timestamp of last evaluation of rule.
    evaluationTimestamp time.Time
    // The health of the alerting rule.
    health RuleHealth
    // The last error seen by the alerting rule.
    lastError error
    // A map of alerts which are currently active (Pending or Firing), keyed by
    // the fingerprint of the labelset they correspond to.
    active map[uint64]*Alert
    logger log.Logger
}
Copy the code

The most important field is the active field, which stores the resources that need to be reported after the rule is executed. A series of logic is executed to determine whether an alarm condition is met. The logic is as follows:

func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, externalURL *url.URL) (promql.Vector, error) {
    res, err := query(ctx, r.vector.String(), ts)
    iferr ! =nil {
        r.SetHealth(HealthBad)
        r.SetLastError(err)
        return nil, err
    }
    / /...
}
Copy the code

This step executes the EXPR expression in the rule configuration through the QueryFunc function passed in when the Manager is created, and then returns the result, which is a collection of metrics that satisfy the expression. For example, the rule is as follows:

cpu_usage > 90
Copy the code

Well, then what we found out could be

cpu_usage{instance="192.168.0.11"} 91
cpu_usage{instance="192.168.0.12"} 92
Copy the code

Then iterate the query result, generate a hash value according to the label of the indicator, and determine whether the hash value has existed before (i.e. whether the same indicator data has been returned before). If so, update the value and Annotations of the last time. If not, A new alert is created and saved to the active Alert list under this rule. Then, the active Alert list of the rule is traversed, and the status of the alert is modified according to the configuration of the duration of the rule, the last trigger time of the alert, the current status of the alert, whether the query alert still exists and other information. The specific rules are as follows:

  1. If alert existed before, but does not exist at this time of execution
    1. If the status is StatePending or the check time has been more than 15 minutes since the last trigger (15 minutes is written dead constant), the alert is removed from the active list
    2. The alert whose state is not StateInactive is changed to StateInactive
  2. If alert existed before and this execution still exists
    1. Alert is StatePending and the time since the last check has exceeded the configured duration for, then the state is changed to StateFiring
  3. In other cases change the status of alert to StatePending

The previous step only modifies the alert status, but does not actually send an alert. The following is the actual alarm operation:

// Check whether the rule is an alert rule, and if so, send an alarm message (the logic in ar.sendalerts determines whether the rule is really sent)
if ar, ok := rule.(*AlertingRule); ok {
    ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc)
}
/ /...
func (r *AlertingRule) sendAlerts(ctx context.Context, ts time.Time, resendDelay time.Duration, interval time.Duration, notifyFunc NotifyFunc) {
    alerts := []*Alert{}
    r.ForEachActiveAlert(func(alert *Alert) {
        if alert.needsSending(ts, resendDelay) {
            alert.LastSentAt = ts
            // Allow for two Eval or Alertmanager send failures.
            delta := resendDelay
            if interval > resendDelay {
                delta = interval
            }
            alert.ValidUntil = ts.Add(4 * delta)
            anew := *alert
            alerts = append(alerts, &anew)
        }
    })
    notifyFunc(ctx, r.vector.String(), alerts...)
}
func (a *Alert) needsSending(ts time.Time, resendDelay time.Duration) bool {
    if a.State == StatePending {
        return false
    }
    // if an alert has been resolved since the last send, resend it
    if a.ResolvedAt.After(a.LastSentAt) {
        return true
    }
    return a.LastSentAt.Add(resendDelay).Before(ts)
}
Copy the code

To summarize the above logic:

  1. If the status of alert is StatePending, no alarm is sent
  2. If the alert has been resolved, send another alarm to indicate that the message has been resolved
  3. If the time between the current time and the time when the last alarm is sent is longer than the ResendDelay, the alarm is sent. Otherwise, the alarm is not sent

This is the alarm process for Prometheus. Learning this process is mainly about being able to do secondary development for Prometheus rules. You can change the LoadGroups () method to dynamically load rules defined in mysql and dynamically update alarm rules.