The original address
Prometheus startup process
An overview of the
Prometheus was originally designed as an open source monitoring & alarm tool for cloud native applications, and since its adoption and adoption by many companies and organizations, Prometheus has been a standalone open source project with a very active community and developers. The most common Kubernetes container management system today is usually monitored with Prometheus.
Below is the architecture diagram for Prometheus from the official website:
Code entry
Prometheus uses the Golang development language, its source website address: github.com/prometheus/…
Cli Parameter Parsing
Initialize the flagConfig structure
cfg := flagConfig{
notifier: notifier.Options{
// The default registry registers the CPU and GO metric collectors
Registerer: prometheus.DefaultRegisterer,
},
web: web.Options{
Registerer: prometheus.DefaultRegisterer,
Gatherer: prometheus.DefaultGatherer,
},
promlogConfig: promlog.Config{},
}
Copy the code
The flagConfig structure is defined as follows to store CLI parsing configuration information:
type flagConfig struct {
configFile string
localStoragePath string
notifier notifier.Options
forGracePeriod model.Duration
outageTolerance model.Duration
resendDelay model.Duration
web web.Options
tsdb tsdbOptions
lookbackDelta model.Duration
webTimeout model.Duration
queryTimeout model.Duration
queryConcurrency int
queryMaxSamples int
RemoteFlushDeadline model.Duration
featureList []string
// These options are extracted from featureList
// for ease of use.
enablePromQLAtModifier bool
enablePromQLNegativeOffset bool
enableExpandExternalLabels bool
prometheusURL string
corsRegexString string
promlogConfig promlog.Config
}
Copy the code
Bind cli parameters to variables in the flagConfig structure:
a.Flag("config.file"."Prometheus configuration file path.").
Default("prometheus.yml").StringVar(&cfg.configFile)
a.Flag("web.listen-address"."Address to listen on for UI, API, and telemetry.").
Default("0.0.0.0:9090").StringVar(&cfg.web.ListenAddress)
a.Flag("web.read-timeout"."Maximum duration before timing out read of the request, and closing idle connections.").
Default("5m").SetValue(&cfg.webTimeout)
...
Copy the code
3. View cli parameters supported by Prometheus from Prometheus -h.
Here are some of the most common parameters:
--config.file:Specify the path to the Prometheus master configuration file
--web.listen-address:Specify the Prometheus listening address
--storage.tsdb.path:Local storage mode Directory for storing data
--storage.tsdb.retention.time:Local storage mode: 15 days by default
--storage.tsdb.retention.size:Maximum size of data to be saved in supported units B, KB, MB, GB, TB, PB, eB.ex
Copy the code
4. Parse CLI parameters
_, err := a.Parse(os.Args[1:)Copy the code
Args[1:] obtained all parameters from Prometheus startup command, and a.Parse() parsed the command line parameters into the variables of the flagConfig structure initialized above, which were bound to the command line parameters by the a. lag() method above.
Configuration initialization
1. Verify promethes. yml validity: the config.loadFile () method is used to parse and verify the promethes. yml configuration file
if _, err := config.LoadFile(cfg.configFile, false, log.NewNopLogger()); err ! =nil {
level.Error(logger).Log("msg", fmt.Sprintf("Error loading config (--config.file=%s)", cfg.configFile), "err", err)
os.Exit(2)}// Now that the validity of the config is established, set the config
// success metrics accordingly, although the config isn't really loaded
// yet. This will happen later (including setting these metrics again),
// but if we don't do it now, the metrics will stay at zero until the
// startup procedure is complete, which might take long enough to
// trigger alerts about an invalid config.
configSuccess.Set(1)
configSuccessTime.SetToCurrentTime()
Copy the code
Note that only the promethe. yml configuration file is parsed to verify the validity of the configuration file. Set monitoring indicators for Prometheus after the verification is successful: Prometheus_config_last_reload_successful and prometheus_config_last_reload_success_timestamp_seconds, Do not set the value of prometheus_config_LAST_reload_successful to 0, that is, if the startup time is long enough, invalid configuration alarms may be triggered.
2. Initialize the storage duration and storage space capacity:
{ // Time retention settings.
// --storage.tsdb.retention parameter is deprecated and is not recommended. If it is used, warn logs are printed
ifoldFlagRetentionDuration ! =0 {
level.Warn(logger).Log("deprecation_notice"."'storage.tsdb.retention' flag is deprecated use 'storage.tsdb.retention.time' instead.")
cfg.tsdb.RetentionDuration = oldFlagRetentionDuration
}
/ / -- storage. TSDB. Retention. Time assigned to CFG. TSDB. RetentionDuration
ifnewFlagRetentionDuration ! =0 {
cfg.tsdb.RetentionDuration = newFlagRetentionDuration
}
. / / - storage. TSDB. The retention time and -- storage. TSDB. Retention. The size is zero, which is neither the biggest limit storage time, and there is no limit to the storage capacity of the largest space scene, the default storage 15 d (days)
if cfg.tsdb.RetentionDuration == 0 && cfg.tsdb.MaxBytes == 0 {
cfg.tsdb.RetentionDuration = defaultRetentionDuration
level.Info(logger).Log("msg"."No time or size retention was set so using the default time retention"."duration", defaultRetentionDuration)
}
// Check for overflows. This limits our max retention to 100y.
// If the maximum storage time is too large the overflow integer range, the limit is 100 years
if cfg.tsdb.RetentionDuration < 0 {
y, err := model.ParseDuration("100y")
iferr ! =nil {
panic(err)
}
cfg.tsdb.RetentionDuration = y
level.Warn(logger).Log("msg"."Time retention value is too high. Limiting to: "+y.String())
}
}
Copy the code
Storage.tsdb. max-block-duration parameter initialization:
Prometheus TSDB data files are eventually stored in local chunks files, and storage.tsdb.max-block-duration limits the maximum size of a single chunks file. The default storage duration is 10% and does not exceed 31 days.
{
if cfg.tsdb.MaxBlockDuration == 0 {
maxBlockDuration, err := model.ParseDuration("31d")
iferr ! =nil {
panic(err)
}
// When the time retention is set and not too big use to define the max block duration.
ifcfg.tsdb.RetentionDuration ! =0 && cfg.tsdb.RetentionDuration/10 < maxBlockDuration {
maxBlockDuration = cfg.tsdb.RetentionDuration / 10
}
cfg.tsdb.MaxBlockDuration = maxBlockDuration
}
}
Copy the code
Component initialization
1, Storage component initialization:
/** The Storage component of Prometheus is a sequential database containing two components: The current version of localStorage refers to TSDB, which stores metrics locally, and remoteStorage, which stores metrics remotely. FanoutStorage acts as the read/write proxy server */ for localStorage and remoteStorage
var (
// Local storage
localStorage = &readyStorage{}
scraper = &readyScrapeManager{}
// Remote storage
remoteStorage = remote.NewStorage(log.With(logger, "component"."remote"), prometheus.DefaultRegisterer, localStorage.StartTime, cfg.localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper)
// Read and write the proxy server
fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage)
)
Copy the code
2. NotifierManager component: this component sends alarm information to the AlertManager
notifierManager = notifier.NewManager(&cfg.notifier, log.With(logger, "component"."notifier"))
Copy the code
3. DiscoveryManagerScrape component: This component is used for service discovery. The current version supports multiple service discovery systems, such as static files, Eureka, Kubertenes, etc
discoveryManagerScrape = discovery.NewManager(ctxScrape, log.With(logger, "component"."discovery manager scrape"), discovery.Name("scrape"))
Copy the code
4. DiscoveryManagerNotify: This component is used to discover alarm notification services, such as the AlertManager service
discoveryManagerNotify = discovery.NewManager(ctxNotify, log.With(logger, "component"."discovery manager notify"), discovery.Name("notify"))
Copy the code
The discoveryManagerScrape component and discoveryManagerNotify component are discovery.manager components that are created using discovery.newManager (), so they work the same way. Because they’re all for service discovery.
5. ScrapeManager component: this component captures all targets metrics found by discoveryManagerScrape component, and stores metrics metrics captured using fanoutStorage component
scrapeManager = scrape.NewManager(log.With(logger, "component"."scrape manager"), fanoutStorage)
Copy the code
QueryEngine component: promql queryEngine
// Declare the promQL engine configuration
opts = promql.EngineOpts{
Logger: log.With(logger, "component"."query engine"),
Reg: prometheus.DefaultRegisterer,
MaxSamples: cfg.queryMaxSamples,
Timeout: time.Duration(cfg.queryTimeout), // Query timed out
ActiveQueryTracker: promql.NewActiveQueryTracker(cfg.localStoragePath, cfg.queryConcurrency, log.With(logger, "component"."activeQueryTracker")),
LookbackDelta: time.Duration(cfg.lookbackDelta),
NoStepSubqueryIntervalFn: noStepSubqueryInterval.Get,
EnableAtModifier: cfg.enablePromQLAtModifier,
EnableNegativeOffset: cfg.enablePromQLNegativeOffset,
}
// Initialize queryEngine
queryEngine = promql.NewEngine(opts)
Copy the code
7. RuleManager component:
// The ruleManager component is initialized with the rules.newManager method. The rules.NewManager parameter involves multiple components: storage, queryEngine, and Notifier. The entire process includes rule calculation and alarm sending
ruleManager = rules.NewManager(&rules.ManagerOptions{
Appendable: fanoutStorage,
Queryable: localStorage,
QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage),
NotifyFunc: sendAlerts(notifierManager, cfg.web.ExternalURL.String()),
Context: ctxRule,
ExternalURL: cfg.web.ExternalURL,
Registerer: prometheus.DefaultRegisterer,
Logger: log.With(logger, "component"."rule manager"),
OutageTolerance: time.Duration(cfg.outageTolerance),
ForGracePeriod: time.Duration(cfg.forGracePeriod),
ResendDelay: time.Duration(cfg.resendDelay),
})
//
scraper.Set(scrapeManager)
Copy the code
8. WebHandler component: This component is used for Web service startup to provide HTTP access to Prometheus, such as by invoking the PROMQL statement
// Depends on cfg.web.ScrapeManager so needs to be after cfg.web.ScrapeManager = scrapeManager.
webHandler := web.New(log.With(logger, "component"."web"), &cfg.web)
Copy the code
ApplyConfig
Yml is the core configuration file of Prometheus. After the configuration information of Prometheus. Then call ApplyConfig(conf * config.config) defined by each component to extract its own configuration data for processing:
reloaders := []reloader{
{
name: "remote_storage".// Storage configuration
reloader: remoteStorage.ApplyConfig,
}, {
name: "web_handler"./ / web configuration
reloader: webHandler.ApplyConfig,
}, {
name: "query_engine",
reloader: func(cfg *config.Config) error {
if cfg.GlobalConfig.QueryLogFile == "" {
queryEngine.SetQueryLogger(nil)
return nil
}
l, err := logging.NewJSONFileLogger(cfg.GlobalConfig.QueryLogFile)
iferr ! =nil {
return err
}
queryEngine.SetQueryLogger(l)
return nil}, {},// The Scrape and notifier managers need to reload before the Discovery manager as
// they need to read the most updated config when receiving the new targets list.
// Scrape and Notifier Manager need to be reloaded before Discovery Manager because they need to be newly configured before getting new monitoring targets
name: "scrape"./ / scrapeManger configuration
reloader: scrapeManager.ApplyConfig,
}, {
name: "scrape_sd".// Extract Section scrape_configs from the configuration file
reloader: func(cfg *config.Config) error {
c := make(map[string]discovery.Configs)
for _, v := range cfg.ScrapeConfigs {
c[v.JobName] = v.ServiceDiscoveryConfigs
}
return discoveryManagerScrape.ApplyConfig(c)
},
}, {
name: "notify"./ / notifier configuration
reloader: notifierManager.ApplyConfig,
}, {
name: "notify_sd".// Extract Section:alerting from configuration files
reloader: func(cfg *config.Config) error {
c := make(map[string]discovery.Configs)
for k, v := range cfg.AlertingConfig.AlertmanagerConfigs.ToMap() {
c[k] = v.ServiceDiscoveryConfigs
}
return discoveryManagerNotify.ApplyConfig(c)
},
}, {
name: "rules".// Extract Section:rule_files from the configuration file
reloader: func(cfg *config.Config) error {
// Get all rule files matching the configuration paths.
var files []string
for _, pat := range cfg.RuleFiles {
fs, err := filepath.Glob(pat)
iferr ! =nil {
// The only error can be a bad pattern.
return errors.Wrapf(err, "error retrieving rule files for %s", pat)
}
files = append(files, fs...)
}
return ruleManager.Update(
time.Duration(cfg.GlobalConfig.EvaluationInterval),
files,
cfg.GlobalConfig.ExternalLabels,
)
},
},
}
Copy the code
Start the component
After the Prometheus components are initialized and the information processing in the prometheus.yml configuration is complete, start these components.
Oklog/Run’s Goroutine orchestration tool was used to coordinate Prometheus components.
var g run.Group
// Add goroutine (otherwise known as actor)
g.Add(...)
g.Add(...)
// All goroutines (or actors) are launched after the Run method is called
iferr := g.Run(); err ! =nil {
level.Error(logger).Log("err", err)
os.Exit(1)}Copy the code
Prometheus startup involves a total of 10 coroutines as follows:
Exit gracefully
// Create listener exit chan
term := make(chan os.Signal, 1)
// Pkill signal syscall.sigterm
// CTRL + C signals os.interrupt
// First we create an os.Signal channel, and use signal.Notify to register signals to receive.
signal.Notify(term, os.Interrupt, syscall.SIGTERM)
cancel := make(chan struct{})
g.Add(
func(a) error {
// Don't forget to release the reloadReady channel so that waiting blocks can exit normally.
select {
case <-term: // Listen for program exit signals such as CTRL + C or kill
level.Warn(logger).Log("msg"."Received SIGTERM, exiting gracefully...")
reloadReady.Close()
case <-webHandler.Quit():// Listen to the Web service stop
level.Warn(logger).Log("msg"."Received termination request via web service, exiting gracefully...")
case <-cancel:// Other signals to exit the program
reloadReady.Close()
}
return nil
},
func(err error) {
close(cancel)
},
)
Copy the code
2. DiscoveryManagerScrape component startup: Grab the scrape component and automatically find targets
g.Add(
func(a) error {
err := discoveryManagerScrape.Run()
level.Info(logger).Log("msg"."Scrape discovery manager stopped")
return err
},
func(err error) {
level.Info(logger).Log("msg"."Stopping scrape discovery manager...")
cancelScrape()
},
)
Copy the code
3. DiscoveryManagerNotify Startup: The alarm AlertManager service automatically discovers alarms
g.Add(
func(a) error {
err := discoveryManagerNotify.Run()
level.Info(logger).Log("msg"."Notify discovery manager stopped")
return err
},
func(err error) {
level.Info(logger).Log("msg"."Stopping notify discovery manager...")
cancelNotify()
},
)
Copy the code
4, scrapeManager component start, used to monitor index capture
g.Add(
func(a) error {
// When the scrape manager receives a new targets list
// it needs to read a valid config for each job.
// It depends on the config being in sync with the discovery manager so
// we wait until the config is fully loaded.
// When all the configuration is ready
// When the scrape Manager gets a new target list, it needs to read the valid configuration of each job.
// This depends on the configuration file being synchronized by discovery Manager, so wait until the configuration is loaded.
<-reloadReady.C
level.Info(logger).Log("cus_msg"."--->ScrapeManager reloadReady")
/ / start scrapeManager
//ScrapeManager component start function
err := scrapeManager.Run(discoveryManagerScrape.SyncCh())
level.Info(logger).Log("msg"."Scrape manager stopped")
return err
},
func(err error) {
// Failed processing
// Scrape manager needs to be stopped before closing the local TSDB
// so that it doesn't try to write samples to a closed storage.
level.Info(logger).Log("msg"."Stopping scrape manager...")
scrapeManager.Stop()
},
)
Copy the code
5. Dynamic configuration loading: Realize dynamic configuration loading by listening to kill -hUP PID signal and curl -xpost http://ip:9090/-/reload.
// Make sure that sighup handler is registered with a redirect to the channel before the potentially
// long and synchronous tsdb init.
// TSDB initialization can take a long time, make sure the SIGHUP handler is registered before that.
hup := make(chan os.Signal, 1)
signal.Notify(hup, syscall.SIGHUP)
cancel := make(chan struct{})
g.Add(
func(a) error {
<-reloadReady.C
for {
select {
case <-hup:
iferr := reloadConfig(cfg.configFile, cfg.enableExpandExternalLabels, logger, noStepSubqueryInterval, reloaders...) ; err ! =nil {
level.Error(logger).Log("msg"."Error reloading config"."err", err)
}
case rc := <-webHandler.Reload():
iferr := reloadConfig(cfg.configFile, cfg.enableExpandExternalLabels, logger, noStepSubqueryInterval, reloaders...) ; err ! =nil {
level.Error(logger).Log("msg"."Error reloading config"."err", err)
rc <- err
} else {
rc <- nil
}
case <-cancel:
return nil}}},func(err error) {
// Wait for any in-progress reloads to complete to avoid
// reloading things after they have been shutdown.
cancel <- struct{} {}},)Copy the code
6. Configuration load: Load the prometheus.yml configuration file in the reloadConfig() method
cancel := make(chan struct{})
g.Add(
func(a) error {
select {
case <-dbOpen:
// In case a shutdown is initiated before the dbOpen is released
case <-cancel:
reloadReady.Close()
return nil
}
// load parse the prometheus.yml configuration file and pass in the configuration by calling the respective component ApplyConfig() method
iferr := reloadConfig(cfg.configFile, cfg.enableExpandExternalLabels, logger, noStepSubqueryInterval, reloaders...) ; err ! =nil {
return errors.Wrapf(err, "error loading config from %q", cfg.configFile)
}
// When the configuration is loaded, execute reloadReady.close () to Close the reloadReady.c channel so that the < -reloadReady.c block can continue
reloadReady.Close()
webHandler.Ready()
level.Info(logger).Log("msg"."Server is ready to receive web requests.")
<-cancel
return nil
},
func(err error) {
close(cancel)
},
)
Copy the code
7, ruleManager component start, used for rule calculation:
g.Add(
func(a) error {
<-reloadReady.C
ruleManager.Run()
return nil
},
func(err error) {
ruleManager.Stop()
},
)
Copy the code
8. TSDB module starts and users monitor data storage:
opts := cfg.tsdb.ToTSDBOptions()
cancel := make(chan struct{})
g.Add(
func(a) error {
level.Info(logger).Log("msg"."Starting TSDB ...")
ifcfg.tsdb.WALSegmentSize ! =0 {
if cfg.tsdb.WALSegmentSize < 10*1024*1024 || cfg.tsdb.WALSegmentSize > 256*1024*1024 {
return errors.New("flag 'storage.tsdb.wal-segment-size' must be set between 10MB and 256MB")}}ifcfg.tsdb.MaxBlockChunkSegmentSize ! =0 {
if cfg.tsdb.MaxBlockChunkSegmentSize < 1024*1024 {
return errors.New("flag 'storage.tsdb.max-block-chunk-segment-size' must be set over 1MB")
}
}
db, err := openDBWithMetrics(
cfg.localStoragePath,
logger,
prometheus.DefaultRegisterer,
&opts,
)
iferr ! =nil {
return errors.Wrapf(err, "opening storage failed")}switch fsType := prom_runtime.Statfs(cfg.localStoragePath); fsType {
case "NFS_SUPER_MAGIC":
level.Warn(logger).Log("fs_type", fsType, "msg"."This filesystem is not supported and may lead to data corruption and data loss. Please carefully read https://prometheus.io/docs/prometheus/latest/storage/ to learn more about supported filesystems.")
default:
level.Info(logger).Log("fs_type", fsType)
}
level.Info(logger).Log("msg"."TSDB started")
level.Debug(logger).Log("msg"."TSDB options"."MinBlockDuration", cfg.tsdb.MinBlockDuration,
"MaxBlockDuration", cfg.tsdb.MaxBlockDuration,
"MaxBytes", cfg.tsdb.MaxBytes,
"NoLockfile", cfg.tsdb.NoLockfile,
"RetentionDuration", cfg.tsdb.RetentionDuration,
"WALSegmentSize", cfg.tsdb.WALSegmentSize,
"AllowOverlappingBlocks", cfg.tsdb.AllowOverlappingBlocks,
"WALCompression", cfg.tsdb.WALCompression,
)
startTimeMargin := int64(2 * time.Duration(cfg.tsdb.MinBlockDuration).Seconds() * 1000)
localStorage.Set(db, startTimeMargin)
time.Sleep(time.Duration(10)*time.Second)
close(dbOpen)
<-cancel
return nil
},
func(err error) {
iferr := fanoutStorage.Close(); err ! =nil {
level.Error(logger).Log("msg"."Error stopping storage"."err", err)
}
close(cancel)
},
)
Copy the code
9. The webHandler component starts and Prometheus receives HTTP requests:
g.Add(
func(a) error {
iferr := webHandler.Run(ctxWeb, listener, *webConfig); err ! =nil {
return errors.Wrapf(err, "error starting web server")}return nil
},
func(err error) {
cancelWeb()
},
)
Copy the code
The notifierManager component starts and sends the alarm data to the AlertManager service:
g.Add(
func(a) error {
// When the notifier manager receives a new targets list
// it needs to read a valid config for each job.
// It depends on the config being in sync with the discovery manager
// so we wait until the config is fully loaded.
<-reloadReady.C
notifierManager.Run(discoveryManagerNotify.SyncCh())
level.Info(logger).Log("msg"."Notifier manager stopped")
return nil
},
func(err error) {
notifierManager.Stop()
},
)
Copy the code
conclusion
The most critical step in the Prometheus startup process analyzed above is to start 10 coroutine components using the Oklog/Run coroutine orchestration tool, each of which has its own function (see figure below):
General description:
1. The green box represents the 10 startup components that oklog/ Run manages.
2, graceful exit component: mainly used for listening to the kill and Ctrl+C signals issued by the system, used for Prometheus graceful exit;
3. DiscoveryManagerScrape and discoveryManagerNotify are service discovery components used to discover the Targets and AlertManager services respectively. Through the channel to scrapeManager and notifierManager components, scrapeManager components to pick up targets monitoring indicators, notifierManager to get the AlertManager service to send alarm data;
4. Configuration loading component: it is mainly used to load the promethe. yml configuration and initialize it into the Config structure, and then execute reloader to pass the parsed configuration data Config to relevant components for processing. See ApplyConfig for reloader information.
5, after the configuration is loaded, it will send signals to the ReloadReady. C channel, scrapeManager component, configuration dynamic loading component, ruleManager component and notifierManager components will listen to the signal to execute, that is, these four components rely on configuration loading is complete. The reloader is complete.
6, TSDB component: mainly used for scrapeManager capture monitoring indicators storage timing database;
7. WebHandler component: Prometheus starts the HTTP server so that Prometheus data can be queried, such as executing promQL statements;
8. Dynamic configuration loading: this component mainly starts listening to kill -hup or curl -xpost http://ip:port/-/reload signal and dynamically reloads the promethes. yml configuration file after the configuration loading is complete.
9. RuleManager component: ruleManager component is mainly used to calculate rule files, including Record rule and Alert Rule files.