In K8S through Kubelet pull up a container, the user can specify the way to explore the container health check, currently support TCP, Http and command three ways, today introduces the implementation of the whole probe module, understand its periodic detection, counter, delay and other design specific implementation

1. The overall design of exploration

1.1 Thread Model

The thread model of probing is relatively simple, which implements low-level probing tasks through worker, manages worker through Manager, and caches the results of probing

1.2 Periodic exploration

To generate a timer based on the period of each probe task, you only need to listen to timer events

1.3 Implementation of probing mechanism

The implementation of the probe mechanism is relatively simple except for the command Http and Tcp. Tcp only needs to directly connect through net.DialTimeout, while Http is to construct an Http

Exec generates command based on environment variables of the current container, builds a command through containers, commands, and timeout, and finally calls runtimeService to invoke CSI to execute commands

2. Implement the probe interface

2.1 Core member structure

Type Prober struct {exec execprobe.Prober // We can see that an HTTP Transport is launched for readiness/ LiVENESS to link readinessHTTP httpprobe.Prober livenessHTTP httpprobe.Prober startupHTTP httpprobe.Prober tcp tcpprobe.Prober runner Kubecontainer. ContainerCommandRunner / / refManager is mainly used to get members of the reference object refManager * kubecontainer refManager / / The recorder will be responsible for the construction of the probe result event, and ultimately passed back to the Apiserver Recorder record.eventRecorder}Copy the code

2.2 Main process of exploration

The main process of probe is mainly located in Prober’s probe method, and its core process is divided into three sections

2.2.1 Obtaining the target configuration of exploration

func (pb *prober) probe(probeType probeType, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID) (results.Result, Error) {var probeSpec *v1.Probe // To obtain the Probe configuration for the corresponding location based on the Probe type switch probeType {Case Readiness: probeSpec = container.ReadinessProbe case liveness: probeSpec = container.LivenessProbe case startup: probeSpec = container.StartupProbe default: return results.Failure, fmt.Errorf("unknown probe type: %q", probeType) }Copy the code

2.2.2 Error Information Is recorded during probe execution

If return error, or is not successful or warning status, will get the corresponding reference object, and then through the Recorder event structure, send results back to apiserver

Result, output, err := Pb. runProbeWithRetries(probeType, probeSpec, POD, status, container, containerID, maxProbeRetries) if err ! = nil || (result ! = probe.Success && result ! = probe.Warning) {// // If an error is returned, or the status is not successful or Warning // the corresponding reference object is obtained, and the ref, hasRef := pb.refManager.getref (containerID) if! hasRef { klog.Warningf("No ref for container %q (%s)", containerID.String(), ctrName) } if err ! = nil { klog.V(1).Infof("%s probe for %q errored: % V ", probeType, ctrName, Err) Recorder to construct the event, Send the results back apiserver if hasRef {pb. Recorder. Eventf (ref, v1. EventTypeWarning, events ContainerUnhealthy, "% s probe errored: %v", probeType, err) } } else { // result ! = probe.Success klog.V(1).Infof("%s probe for %q failed (%v): %s", probeType, ctrName, result, output) // Recorder to construct events, Send the results back apiserver if hasRef {pb. Recorder. Eventf (ref, v1. EventTypeWarning, events ContainerUnhealthy, "% s probe failed: %s", probeType, output) } } return results.Failure, err }Copy the code

2.2.3 Implementation of probe retry

func (pb *prober) runProbeWithRetries(probeType probeType, p *v1.Probe, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID, retries int) (probe.Result, string, error) {
    var err error
    var result probe.Result
    var output string
    for i := 0; i < retries; i++ {
        result, output, err = pb.runProbe(probeType, p, pod, status, container, containerID)
        if err == nil {
            return result, output, nil
        }
    }
    return result, output, err
}Copy the code

2.2.4 Perform probe based on the probe type

func (pb *prober) runProbe(probeType probeType, p *v1.Probe, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID) (probe.Result, string, error) { timeout := time.Duration(p.TimeoutSeconds) * time.Second if p.Exec ! = nil { klog.V(4).Infof("Exec-Probe Pod: %v, Container: %v, Command: %v", pod, container, p.Exec.Command) command := kubecontainer.ExpandContainerCommandOnlyStatic(p.Exec.Command, container.Env) return pb.exec.Probe(pb.newExecInContainer(container, containerID, command, timeout)) } if p.HTTPGet ! Scheme := strings.ToLower(string(p.htpget.scheme)) host := p.htpget.host if host == "" {// Get protocol type and HTTP parameter information. host = status.PodIP } port, err := extractPort(p.HTTPGet.Port, container) if err ! = nil { return probe.Unknown, "", err } path := p.HTTPGet.Path klog.V(4).Infof("HTTP-Probe Host: %v://%v, Port: %v, Path: %v", scheme, host, port, path) url := formatURL(scheme, host, port, path) headers := buildHeader(p.HTTPGet.HTTPHeaders) klog.V(4).Infof("HTTP-Probe Headers: %v", headers) switch probeType { case liveness: return pb.livenessHTTP.Probe(url, headers, timeout) case startup: return pb.startupHTTP.Probe(url, headers, timeout) default: return pb.readinessHTTP.Probe(url, headers, timeout) } } if p.TCPSocket ! = nil { port, err := extractPort(p.TCPSocket.Port, container) if err ! = nil { return probe.Unknown, "", err } host := p.TCPSocket.Host if host == "" { host = status.PodIP } klog.V(4).Infof("TCP-Probe Host: %v, Port: %v, Timeout: %v", host, port, timeout) return pb.tcp.Probe(host, port, timeout) } klog.Warningf("Failed to find probe builder for container: %v", container) return probe.Unknown, "", fmt.Errorf("missing probe handler for %s:%s", format.Pod(pod), container.Name) }Copy the code

3. Worker Worker thread

The Worker thread performs probes with several considerations: 1. The container may need to wait for some time when it is first started, for example, the application may need to do some initialization work and not be ready. 2. If the container probe is found to have failed and then restarted, it is pointless to repeat the probe before starting 3. Regardless of success or failure, some thresholds may be required to assist in avoiding a single, low-probability failure and restarting the container

3.1 Core Members

In addition to detect configuration related key parameters, are mainly onHold parameter, this parameter can be used to detect the decision whether to delay the container, namely when the container restart, need to delay detection, resultRun is a counter, whether it is a continuous success or failure in a row, all through this counter accumulation, the follow-up to determine whether more than a given threshold

Type worker struct{// stop channel stopCh chan struct{} // pod *v1. pod // probe container v1. container // Probe configuration Spec *v1.Probe // Probe type probeType probeType // The Probe value during The initial delay. InitialValue results.result // Store probe resultsManager Results.manager probeManager * Manager // The last known container ID of this worker process. ContainerID kubecontainer. ContainerID / / final detection Result lastResult results. The detection Result / / continuous returns the same Result this time resultRun int / / Detection of failure will be set to true will not detect onHold bool / / proberResultsMetricLabels doesn the labels attached to this worker / / for ProberResults metric by result. proberResultsSuccessfulMetricLabels metrics.Labels proberResultsFailedMetricLabels metrics.Labels proberResultsUnknownMetricLabels metrics.Labels }Copy the code

3.2 Explore the implementation of the core process

3.2.1 Failed container detection is interrupted

If the current state of the container has been terminated, there is no need to probe it and just return

/ / get the current state of worker corresponding pod status, ok: = w.p. RobeManager statusManager. GetPodStatus (UID) w.p od. If! ok { // Either the pod has not been created yet, or it was already deleted. klog.V(3).Infof("No status for pod: %v", Format. W.p od (Pod)) return true} / / if the Pod worker should terminate if status. The Phase = = v1. PodFailed | | status. The Phase = = v1. PodSucceeded  { klog.V(3).Infof("Pod %v %v, exiting probe worker", format.Pod(w.pod), status.Phase) return false }Copy the code

3.2.2 Delay detection recovery

Delayed probe recovery mainly means that in the case of probe failure, the restart operation will be carried out, during which no probe will be carried out. The logic of recovery is to determine whether the ID of the corresponding container has changed and modify onHold to achieve this

/ / c by container vessel Name for the latest information, ok: = podutil. GetContainerStatus (status. ContainerStatuses, w.c ontainer. Name) if! ok || len(c.ContainerID) == 0 { // Either the container has not been created yet, or it was deleted. klog.V(3).Infof("Probe target container not found: %v - %v", format.Pod(w.pod), w.container.Name) return true // Wait for more information. } if w.containerID.String() ! = c.containerid {// If the container changes, a container has been restarted. w.containerID.IsEmpty() { w.resultsManager.Remove(w.containerID) } w.containerID = Kubecontainer. ParseContainerID (Arthur c. ontainerID) w.r esultsManager. Set (w.c ontainerID, w.i nitialValue, w.p od) / / access to a new container, W.onhold = false} if w.onhold {// If the delay status is set to true, no probe is performed. Return true}Copy the code

3.2.3 Initialization delay detection

Initialization delay detection mainly means that the container whose Running time is less than the configured InitialDelaySeconds is returned directly

    
if int32(time.Since(c.State.Running.StartedAt.Time).Seconds()) < w.spec.InitialDelaySeconds {
        return true
    }Copy the code

3.2.4 Executing probe logic

result, err := w.probeManager.prober.probe(w.probeType, w.pod, status, w.container, w.containerID) if err ! = nil { // Prober error, throw away the result. return true } switch result { case results.Success: ProberResults.With(w.proberResultsSuccessfulMetricLabels).Inc() case results.Failure: ProberResults.With(w.proberResultsFailedMetricLabels).Inc() default: ProberResults.With(w.proberResultsUnknownMetricLabels).Inc() }Copy the code

3.2.5 Cumulative probe count

After the accumulated probe count is added, it determines whether the accumulated count exceeds the set threshold. If not, no state change is made

if w.lastResult == result { w.resultRun++ } else { w.lastResult = result w.resultRun = 1 } if (result == results.Failure  && w.resultRun < int(w.spec.FailureThreshold)) || (result == results.Success && w.resultRun < int(w.spec.SuccessThreshold)) { // Success or failure is below threshold - leave the probe state unchanged. // Success or failure below threshold - leave probe state unchanged. return true }Copy the code

3.2.6 Modifying probe Status

If the probe status is changed, the state needs to be saved first. At the same time, if the probe fails, the onHOld state needs to be changed to true to delay the probe, and the counter needs to be returned to 0

W.resultsmanager.set (w.containerid, result, W.p od) if (w.p robeType = = liveness | | w.p robeType = = startup) && result = = the Failure {/ / container run liveness/startup failed test, They need to restart and stop probing until there is a new containerID. This is to reduce the chance of hitting #21751, where running Docker Exec while the container is stopped may result in a state corruption of the container.Copy the code

3.3 Detect the main cycle flow

The main flow is simply to execute the above probe flow

Func (w *worker) run() {// Build the timer based on the periodProbetickerPeriod := time.duration (w.scour.periodseconds) * time.second // If kubelet restarted the probes could be started in rapid succession. // Let the worker wait for a random portion of tickerPeriod before probing. time.Sleep(time.Duration(rand.Float64() * float64(probeTickerPeriod))) probeTicker := time.NewTicker(probeTickerPeriod) defer func() { // Clean up. probeTicker.Stop() if ! w.containerID.IsEmpty() { w.resultsManager.Remove(w.containerID) } w.probeManager.removeWorker(w.pod.UID, w.container.Name, w.probeType) ProberResults.Delete(w.proberResultsSuccessfulMetricLabels) ProberResults.Delete(w.proberResultsFailedMetricLabels) ProberResults.Delete(w.proberResultsUnknownMetricLabels) }() probeLoop: for w.doProbe() { // Wait for next probe tick. select { case <-w.stopCh: break probeLoop case <-probeTicker.C: // continue } } }Copy the code

Today I will go to this first, tomorrow I will talk about the implementation of proberManager, we share and forward, even if I support, ready to move

Wechat id: Baxiaoshi2020

Watch the bulletin number to read more source code analysis articles

More articles can be found at www.sreguide.com

This post is posted by OpenWrite, a blogging platform