preface

Recently joined the cloud native community organization K8S source code institute, began to learn k8S source code, and sorted out notes. Welcome interested students to join us and make progress together. There are all kinds of bigwigs in the group and community ready to answer your questions. Github.com/cloudnative…

Last article organized the client- Go framework of the Informer mechanism, Informer source analysis, at the same time apI-server used the Go -restful web framework, go-restful principle and source code reference go-restful source code analysis

Let’s start with a kube-Apiserver code call diagram

The subsequent source analysis link is very long, it is easy to get stuck in and out, it is recommended to view the current step according to this figure.

Hd address

An overview of the

As the core component of K8S, Kube-Apiserver is a bridge for communication between components. Each component will not communicate directly, but will be transferred through apI-Server. See another previous blog post, which looked at apI-Server from a source perspective

Main responsibilities of Kube-Apiserver

  • Provides API management of the entire cluster, API registration, and API discovery through the gO -restful framework
  • Unique entry point for resource operations – operating etCD resources
  • A hub for components in a cluster
  • Provides security control such as request authentication, authorization, and access control

The three services

Kube-apiserver provides three TYPES of HTTP server services to decouple the functionality of the massive Kube-Apiserver component:

The service name registry overview | | | object management resources

  • | – | – | – |

KubeAPIServer | core services, provide k8s built-in core resource service, don’t allow developers to modify, such as: Pod, Service | Master | Legacyscheme. Scheme APIExtensionsServer | API extension services, The service provides the CRD custom resource service | CustomResourceDefinitions | extensionsapiserver. The Scheme AggregatorServer | API aggregation service, Provides the aggregation service | APIAggregator | aggregatorscheme. Scheme

All three services rely on GenericAPIServer, which maps K8S resources to REST apis

Overview of the kube-Apiserver startup process

Kube-apiserver is the entry point for all resource control. The startup process is also slightly complicated. The startup code logic can be divided into 9 steps:

  • Resources to register
  • Cobra command line parameter resolution
  • Create the Apiserver common configuration
  • Create APIExtensionsServer
  • Create KubeAPIServer
  • Create AggregatorServer
  • Create GenericAPIServer
  • Starting the HTTP Service
  • Starting the HTTPS Service

Three sever servers are connected in delegate mode, and the initialization process is similar, including:

  • Start by creating a config for each server
  • Initialize the HTTP server, including:
    • Initialize GoRestfulContainer
    • Install the API included with the server. Details are as follows:
      • Create a back-end storage RESTStorage for each apI-Resource
      • Add handlers for verbs supported by each API-Resource
      • Register the handler with the Router
      • Register the router with the WebService

0. Entry function

The Kube-Apiserver component is a separate process that starts with the following entry functions:

CMD /kube-apiserver/apiserver.go

import(...// Introduce legacyScheme, init method to implement resource registry registration
  "k8s.io/kubernetes/pkg/api/legacyscheme"
  // add the master init method to register all resources in k8S
  "k8s.io/kubernetes/pkg/master"...).func main(a) {
  rand.Seed(time.Now().UnixNano())

  // Create a Cobra Command object with default parameters
  command := app.NewAPIServerCommand()

  logs.InitLogs()
  defer logs.FlushLogs()

  iferr := command.Execute(); err ! =nil {
    os.Exit(1)}}Copy the code

1. Register resources

The first thing the Kube-Apiserver component does after starting up is to register the resources supported by K8S into the Scheme resource registry so that the subsequent startup logic can get the resource information and start and run the three services described earlier

The resource registration process is triggered not by function calls, but by import and init mechanisms. The previous step 0 mentioned import “k8s. IO/kubernetes/PKG/API/legacyscheme”

Resource registration consists of two steps:

  • Initializes the Scheme resource registry
  • Register resources supported by K8S

Initializes the Scheme resource registry

Code paths: PKG/API/legacyscheme/scheme. Go

Three global variables are defined to serve kube-Apiserver, and components can be invoked from anywhere

package legacyscheme

import (
  "k8s.io/apimachinery/pkg/runtime"
  "k8s.io/apimachinery/pkg/runtime/serializer"
)

// Scheme resource registry
var Scheme = runtime.NewScheme()
// Codec
var Codecs = serializer.NewCodecFactory(Scheme)
// Parameter codec
var ParameterCodec = runtime.NewParameterCodec(Scheme)
Copy the code

Register resources supported by K8S

Apiserver startup imported the master package, front step 0 introduces the import of “k8s. IO/kubernetes/PKG/master”

Import_known_versions. go in the master package calls the install package in the K8S resource and triggers the initialization function by importing the package. The install package is defined for each resource. When referenced, the init function is triggered to complete the resource registration process

PKG /master/import_known_versions.go

import (
  // These imports are the API groups the API server will support.
  _ "k8s.io/kubernetes/pkg/apis/admission/install"
  _ "k8s.io/kubernetes/pkg/apis/admissionregistration/install"
  _ "k8s.io/kubernetes/pkg/apis/apps/install"
  _ "k8s.io/kubernetes/pkg/apis/authentication/install"
  _ "k8s.io/kubernetes/pkg/apis/authorization/install"
  _ "k8s.io/kubernetes/pkg/apis/autoscaling/install"
  _ "k8s.io/kubernetes/pkg/apis/batch/install"
  _ "k8s.io/kubernetes/pkg/apis/certificates/install"
  _ "k8s.io/kubernetes/pkg/apis/coordination/install"
  _ "k8s.io/kubernetes/pkg/apis/core/install"
  _ "k8s.io/kubernetes/pkg/apis/discovery/install"
  _ "k8s.io/kubernetes/pkg/apis/events/install"
  _ "k8s.io/kubernetes/pkg/apis/extensions/install"
  _ "k8s.io/kubernetes/pkg/apis/flowcontrol/install"
  _ "k8s.io/kubernetes/pkg/apis/imagepolicy/install"
  _ "k8s.io/kubernetes/pkg/apis/networking/install"
  _ "k8s.io/kubernetes/pkg/apis/node/install"
  _ "k8s.io/kubernetes/pkg/apis/policy/install"
  _ "k8s.io/kubernetes/pkg/apis/rbac/install"
  _ "k8s.io/kubernetes/pkg/apis/scheduling/install"
  _ "k8s.io/kubernetes/pkg/apis/settings/install"
  _ "k8s.io/kubernetes/pkg/apis/storage/install"
)
Copy the code

The following uses Core resources as an example to view Install. Source location: PKG/apis/core/install/install. Go

func init(a) {
  // Legacyscheme. Scheme is the global resource registry introduced earlier
  Install(legacyscheme.Scheme)
}

// Install registers the API group and adds types to a scheme
func Install(scheme *runtime.Scheme) {
  // Register a resource group
  utilruntime.Must(core.AddToScheme(scheme))
  utilruntime.Must(v1.AddToScheme(scheme))
  // Register the version order
  utilruntime.Must(scheme.SetVersionPriority(v1.SchemeGroupVersion))
}
Copy the code

2. Cobra command line parameter parsing

All components in K8S use COBRA to resolve command-line arguments uniformly. The kube-Apiserver component populates the configuration parameter defaults and validates the parameters via Cobra, app.newapiserverCommand () described in step 0 earlier

CMD /kube-apiserver/app/server.go

func NewAPIServerCommand(a) *cobra.Command {
  // Initializes the default configuration of each module, internally calling the default configuration of each module
  s := options.NewServerRunOptions()
  cmd := &cobra.Command{
    ...
    RunE: func(cmd *cobra.Command, args []string) error{...// Set the default parameter configuration
      completedOptions, err := Complete(s)
      // Verify that the parameters are valid
      if errs := completedOptions.Validate(); len(errs) ! =0 {
        return utilerrors.NewAggregate(errs)
      }
      // Start running, permanent process
      // The Run function is covered later
      returnRun(completedOptions, genericapiserver.SetupSignalHandler()) }, ... }... }// genericapiserver.SetupSignalHandler()
func SetupSignalHandler(a) <-chan struct{} {
  return SetupSignalContext().Done()
}

func SetupSignalContext(a) context.Context{...// Listen for OS signals os.interrupt and syscall.sigterm
  // Bind the listening signal to stopChan to ensure that groutine exits gracefully when the process terminatessignal.Notify(shutdownHandler, shutdownSignals...) . }Copy the code

3. Create a service chain

The core process is the configuration and creation of the three services in the Kube-Apiserver component described earlier, including:

  • Create the kube API-server generic configuration
  • Create kube apI-extension-server configuration
  • Create kube API-Extension service
  • Create kube API-server service
  • Example Create the Aggregator-server configuration
  • Example Create the Aggregator-server service
  • Start the service

The apiserver common configuration is the configuration required for different kuBE-Apiserver modules to be instantiated, including:

  • GenericConfig instantiation
  • OpenAPI and Swagger configurations
  • StorageFactory (Etcd) configuration
  • Authentication Authentication configuration
  • Authorization and Authorization Configuration
  • Admission Configures the quasi-controller
func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
  // Create a service chain
  server, err := CreateServerChain(completeOptions, stopCh)
  iferr ! =nil {
    return err
  }
  / / run
  prepared, err := server.PrepareRun()
  iferr ! =nil {
    return err
  }
  // It is officially running
  return prepared.Run(stopCh)
}

// Create a service chain
func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error){...// create kubeapi-server configuration
  kubeAPIServerConfig, insecureServingInfo, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
  iferr ! =nil {
    return nil, err
  }

  // create kubeapi-extension-server configuration
  apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
    serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig))

  // create the kubeapi-extension-server service
  apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())

  // create kubeapi-server
  kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)

  // Create aggregator-server configuration
  aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, proxyTransport, pluginInitializer)

  // Create the Aggregator-server service
  aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)

  ifinsecureServingInfo ! =nil {
    insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(aggregatorServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig)
    // Start the service
    iferr := insecureServingInfo.Serve(insecureHandlerChain, kubeAPIServerConfig.GenericConfig.RequestTimeout, stopCh); err ! =nil {
      return nil, err
    }
  }
  return aggregatorServer, nil
}
Copy the code

3.1 Create kubeAPI-server generic configuration

func CreateKubeAPIServerConfig(...). (...). {
  // Build a generic configuration
  genericConfig, versionedInformers, insecureServingInfo, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err := buildGenericConfig(s.ServerRunOptions, proxyTransport)
  ...
  // Set the port range
  serviceIPRange, apiServerServiceIP, err := master.ServiceIPRange(s.PrimaryServiceClusterIPRange)
  ...
  / / constructs master. Configconfig := &master.Config{ GenericConfig: genericConfig, ExtraConfig: master.ExtraConfig{ APIResourceConfigSource: storageFactory.APIResourceConfigSource, ... }}... }Copy the code
3.1.1 buildGenericConfig
func buildGenericConfig(...). (...). {
  genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)
  // Enable and disable GV
  genericConfig.MergedResourceConfig = master.DefaultAPIResourceConfigSource()
  ...
  / / openapi/swagger configuration
  OpenAPIConfig is used to generate OpenAPI specifications
  genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensionsapiserver.Scheme, aggregatorscheme.Scheme))
  genericConfig.OpenAPIConfig.Info.Title = "Kubernetes"
  genericConfig.LongRunningFunc = filters.BasicLongRunningRequestCheck(
    sets.NewString("watch"."proxy"),
    sets.NewString("attach"."exec"."proxy"."log"."portforward"),
  )

  kubeVersion := version.Get()
  genericConfig.Version = &kubeVersion
  / / etcd configuration
  // The storageFactoryConfig object defines how Kube-Apiserver interacts with ETCD, such as ETCD authentication, address, storage prefix, etc
  // This object also defines the storage mode of resources, such as resource information, resource code information, and resource status
  storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig()
  storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig
  completedStorageFactoryConfig, err := storageFactoryConfig.Complete(s.Etcd)
  iferr ! =nil {
    lastErr = err
    return
  }
  storageFactory, lastErr = completedStorageFactoryConfig.New()
  iflastErr ! =nil {
    return
  }
  ifgenericConfig.EgressSelector ! =nil {
    storageFactory.StorageConfig.Transport.EgressLookup = genericConfig.EgressSelector.Lookup
  }
  iflastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr ! =nil {
    return
  }

  // NewSharedInformerFactory initialization
  versionedInformers = clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute)

  // Authentication configuration
  // Internally call authenticatorConfig.new ()
  K8s provides nine authentication mechanisms, each of which becomes an authenticator when instantiated
  iflastErr = s.Authentication.ApplyTo(&genericConfig.Authentication, genericConfig.SecureServing, genericConfig.EgressSelector, genericConfig.OpenAPIConfig, clientgoExternalClient, versionedInformers); lastErr ! =nil {
    return
  }

  // Authorization configuration
  K8e provides six authorization mechanisms, each of which becomes an authorizer when instantiated
  genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers)

  // Enter the admission configuration
  // K8S resources enter the access control logic before authentication and authorization pass and are persisted to etCD
  // Access control includes: custom operations (validation, modification, rejection) on requested resources
  // K8S supports 31 types of access control
  // Access controller uses Plugins data structure for unified registration, storage, and management
  admissionConfig := &kubeapiserveradmission.Config{
    ExternalInformers:    versionedInformers,
    LoopbackClientConfig: genericConfig.LoopbackClientConfig,
    CloudConfigFile:      s.CloudProvider.CloudConfigFile,
  }
  serviceResolver = buildServiceResolver(s.EnableAggregatorRouting, genericConfig.LoopbackClientConfig.Host, versionedInformers)
  pluginInitializers, admissionPostStartHook, err = admissionConfig.New(proxyTransport, genericConfig.EgressSelector, serviceResolver)
  iferr ! =nil {
    lastErr = fmt.Errorf("failed to create admission plugin initializer: %v", err)
    return
  }

  err = s.Admission.ApplyTo(
    genericConfig,
    versionedInformers,
    kubeClientConfig,
    feature.DefaultFeatureGate,
    pluginInitializers...)
  iferr ! =nil {
    lastErr = fmt.Errorf("failed to initialize admission: %v", err)
  }

  if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIPriorityAndFairness) && s.GenericServerRunOptions.EnablePriorityAndFairness {
    genericConfig.FlowControl = BuildPriorityAndFairness(s, clientgoExternalClient, versionedInformers)
  }

  return
}

// Authentication initialization
func (o *BuiltInAuthenticationOptions) ApplyTo(...). error{... authInfo.Authenticator, openAPIConfig.SecurityDefinitions, err = authenticatorConfig.New() ... }// Determine the initialization of the nine authenticators according to the configuration
func (config Config) New(a) (authenticator.Request, *spec.SecurityDefinitions, error) {
  // Define a list of authenticators
  var authenticators []authenticator.Request

  // Determine whether to configure an authenticator based on different switches

  // RequestHeader authenticator
  ifconfig.RequestHeaderConfig ! =nil {
    requestHeaderAuthenticator := headerrequest.NewDynamicVerifyOptionsSecure(
      config.RequestHeaderConfig.CAContentProvider.VerifyOptions,
      config.RequestHeaderConfig.AllowedClientNames,
      config.RequestHeaderConfig.UsernameHeaders,
      config.RequestHeaderConfig.GroupHeaders,
      config.RequestHeaderConfig.ExtraHeaderPrefixes,
    )
    authenticators = append(authenticators, authenticator.WrapAudienceAgnosticRequest(config.APIAudiences, requestHeaderAuthenticator))
  }
  // X509 methods
  ifconfig.ClientCAContentProvider ! =nil {
    certAuth := x509.NewDynamic(config.ClientCAContentProvider.VerifyOptions, x509.CommonNameUserConversion)
    authenticators = append(authenticators, certAuth)
  }

  // TokenAuth authenticator
  if len(config.TokenAuthFile) > 0 {
    tokenAuth, err := newAuthenticatorFromTokenFile(config.TokenAuthFile)
    iferr ! =nil {
      return nil.nil, err
    }
    tokenAuthenticators = append(tokenAuthenticators, authenticator.WrapAudienceAgnosticToken(config.APIAudiences, tokenAuth))
  }
  // ServiceAccountAuth Indicates the authenticator
  if len(config.ServiceAccountKeyFiles) > 0 {
    serviceAccountAuth, err := newLegacyServiceAccountAuthenticator(config.ServiceAccountKeyFiles, config.ServiceAccountLookup, config.APIAudiences, config.ServiceAccountTokenGetter)
    iferr ! =nil {
      return nil.nil, err
    }
    tokenAuthenticators = append(tokenAuthenticators, serviceAccountAuth)
  }
  ifutilfeature.DefaultFeatureGate.Enabled(features.TokenRequest) && config.ServiceAccountIssuer ! ="" {
    serviceAccountAuth, err := newServiceAccountAuthenticator(config.ServiceAccountIssuer, config.ServiceAccountKeyFiles, config.APIAudiences, config.ServiceAccountTokenGetter)
    iferr ! =nil {
      return nil.nil, err
    }
    tokenAuthenticators = append(tokenAuthenticators, serviceAccountAuth)
  }
  // BootstrapToken authenticator
  if config.BootstrapToken {
    ifconfig.BootstrapTokenAuthenticator ! =nil {
      // TODO: This can sometimes be nil because of
      tokenAuthenticators = append(tokenAuthenticators, authenticator.WrapAudienceAgnosticToken(config.APIAudiences, config.BootstrapTokenAuthenticator))
    }
  }
  // NOTE(ericchiang): Keep the OpenID Connect after Service Accounts.
  //
  // Because both plugins verify JWTs whichever comes first in the union experiences
  // cache misses for all requests using the other. While the service account plugin
  // simply returns an error, the OpenID Connect plugin may query the provider to
  // update the keys, causing performance hits.
  if len(config.OIDCIssuerURL) > 0 && len(config.OIDCClientID) > 0 {
    oidcAuth, err := newAuthenticatorFromOIDCIssuerURL(oidc.Options{
      IssuerURL:            config.OIDCIssuerURL,
      ClientID:             config.OIDCClientID,
      CAFile:               config.OIDCCAFile,
      UsernameClaim:        config.OIDCUsernameClaim,
      UsernamePrefix:       config.OIDCUsernamePrefix,
      GroupsClaim:          config.OIDCGroupsClaim,
      GroupsPrefix:         config.OIDCGroupsPrefix,
      SupportedSigningAlgs: config.OIDCSigningAlgs,
      RequiredClaims:       config.OIDCRequiredClaims,
    })
    iferr ! =nil {
      return nil.nil, err
    }
    tokenAuthenticators = append(tokenAuthenticators, authenticator.WrapAudienceAgnosticToken(config.APIAudiences, oidcAuth))
  }
  // WebhookTokenAuth authenticator
  if len(config.WebhookTokenAuthnConfigFile) > 0 {
    webhookTokenAuth, err := newWebhookTokenAuthenticator(config)
    iferr ! =nil {
      return nil.nil, err
    }

    tokenAuthenticators = append(tokenAuthenticators, webhookTokenAuth)
  }

  if len(tokenAuthenticators) > 0 {
    // Union the token authenticators
    tokenAuth := tokenunion.New(tokenAuthenticators...)
    // Optionally cache authentication results
    if config.TokenSuccessCacheTTL > 0 || config.TokenFailureCacheTTL > 0 {
      tokenAuth = tokencache.New(tokenAuth, true, config.TokenSuccessCacheTTL, config.TokenFailureCacheTTL)
    }
    authenticators = append(authenticators, bearertoken.New(tokenAuth), websocket.NewProtocolAuthenticator(tokenAuth))
    securityDefinitions["BearerToken"] = &spec.SecurityScheme{
      SecuritySchemeProps: spec.SecuritySchemeProps{
        Type:        "apiKey",
        Name:        "authorization",
        In:          "header",
        Description: "Bearer Token authentication",}}}// Anonymous authenticator
  if len(authenticators) == 0 {
    if config.Anonymous {
      return anonymous.NewAuthenticator(), &securityDefinitions, nil
    }
    return nil, &securityDefinitions, nil}...// Merge multiple authenticatorsauthenticator := union.New(authenticators...) . }// Authorization initialization
func BuildAuthorizer(s *options.ServerRunOptions, EgressSelector *egressselector.EgressSelector, versionedInformers clientgoinformers.SharedInformerFactory) (authorizer.Authorizer, authorizer.RuleResolver, error){...return authorizationConfig.New()
}

// 6 authorizer configurations
func (config Config) New(a) (authorizer.Authorizer, authorizer.RuleResolver, error){...// Declare the Authorizer list of authenticators
  var (
    authorizers   []authorizer.Authorizer
    ruleResolvers []authorizer.RuleResolver
  )

  for _, authorizationMode := range config.AuthorizationModes {
    switch authorizationMode {
    // Node authorization
    case modes.ModeNode:
      graph := node.NewGraph()
      node.AddGraphEventHandlers(
        graph,
        config.VersionedInformerFactory.Core().V1().Nodes(),
        config.VersionedInformerFactory.Core().V1().Pods(),
        config.VersionedInformerFactory.Core().V1().PersistentVolumes(),
        config.VersionedInformerFactory.Storage().V1().VolumeAttachments(),
      )
      nodeAuthorizer := node.NewAuthorizer(graph, nodeidentifier.NewDefaultNodeIdentifier(), bootstrappolicy.NodeRules())
      authorizers = append(authorizers, nodeAuthorizer)
      ruleResolvers = append(ruleResolvers, nodeAuthorizer)
    // AlwaysAllow authorizer
    case modes.ModeAlwaysAllow:
      alwaysAllowAuthorizer := authorizerfactory.NewAlwaysAllowAuthorizer()
      authorizers = append(authorizers, alwaysAllowAuthorizer)
      ruleResolvers = append(ruleResolvers, alwaysAllowAuthorizer)
    // AlwaysDeny authorizer
    case modes.ModeAlwaysDeny:
      alwaysDenyAuthorizer := authorizerfactory.NewAlwaysDenyAuthorizer()
      authorizers = append(authorizers, alwaysDenyAuthorizer)
      ruleResolvers = append(ruleResolvers, alwaysDenyAuthorizer)
    // ABAC authorizer
    case modes.ModeABAC:
      abacAuthorizer, err := abac.NewFromFile(config.PolicyFile)
      iferr ! =nil {
        return nil.nil, err
      }
      authorizers = append(authorizers, abacAuthorizer)
      ruleResolvers = append(ruleResolvers, abacAuthorizer)
    // Webhook authorizer
    case modes.ModeWebhook:
      webhookAuthorizer, err := webhook.New(config.WebhookConfigFile,
        config.WebhookVersion,
        config.WebhookCacheAuthorizedTTL,
        config.WebhookCacheUnauthorizedTTL,
        config.CustomDial)
      iferr ! =nil {
        return nil.nil, err
      }
      authorizers = append(authorizers, webhookAuthorizer)
      ruleResolvers = append(ruleResolvers, webhookAuthorizer)
    // RBAC authorizer
    case modes.ModeRBAC:
      rbacAuthorizer := rbac.New(
        &rbac.RoleGetter{Lister: config.VersionedInformerFactory.Rbac().V1().Roles().Lister()},
        &rbac.RoleBindingLister{Lister: config.VersionedInformerFactory.Rbac().V1().RoleBindings().Lister()},
        &rbac.ClusterRoleGetter{Lister: config.VersionedInformerFactory.Rbac().V1().ClusterRoles().Lister()},
        &rbac.ClusterRoleBindingLister{Lister: config.VersionedInformerFactory.Rbac().V1().ClusterRoleBindings().Lister()},
      )
      authorizers = append(authorizers, rbacAuthorizer)
      ruleResolvers = append(ruleResolvers, rbacAuthorizer)
    default:
      return nil.nil, fmt.Errorf("unknown authorization mode %s specified", authorizationMode)
    }
  }
  // Merge enabled authenticators into the list
  When the request arrives, kube-apiserver iterates through the list of authenticators, and when one returns True, the authentication is successful
  returnunion.New(authorizers...) , union.NewRuleResolvers(ruleResolvers...) .nil
}
Copy the code

3.2 Creating kubeapi-extension-server configuration

func createAPIExtensionsConfig(...).(...).{... apiextensionsConfig := &apiextensionsapiserver.Config{ GenericConfig: &genericapiserver.RecommendedConfig{ Config: genericConfig, SharedInformerFactory: externalInformers, }, ExtraConfig: apiextensionsapiserver.ExtraConfig{ CRDRESTOptionsGetter: apiextensionsoptions.NewCRDRESTOptionsGetter(etcdOptions), MasterCount: masterCount, AuthResolverWrapper: authResolverWrapper, ServiceResolver: serviceResolver, }, } ... }Copy the code

3.3 Create kubeAPI-Extension service

func createAPIExtensionsServer(...). (*apiextensionsapiserver.CustomResourceDefinitions, error) {
  return apiextensionsConfig.Complete().New(delegateAPIServer)
}

func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
  // APIExtensionsServer relies on GenericAPIServer
  // Create a service named APIExtensions-apiserver with GenericConfig
  genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)

  / / APIExtensionsServer through CustomResourceDefinitions managed object
  // Resources under the APIExtensionsServer can be registered only after this object is instantiated
  s := &CustomResourceDefinitions{
    GenericAPIServer: genericServer,
  }

  apiResourceConfig := c.GenericConfig.MergedResourceConfig

  // Instantiate APIGroupInfo, which describes resource group information
  apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiextensions.GroupName, Scheme, metav1.ParameterCodec, Codecs)

  // Complete the mapping between resources and resource storage objects
  // If the v1beta1 resource version is enabled, save the resource version, resource, and resource storage to the APIGroupInfo map
  if apiResourceConfig.VersionEnabled(v1beta1.SchemeGroupVersion) {
    storage := map[string]rest.Storage{}
    // Create a resource store object with NewRest
    customResourceDefinitionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
    iferr ! =nil {
      return nil, err
    }
    storage["customresourcedefinitions"] = customResourceDefinitionStorage
    storage["customresourcedefinitions/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefinitionStorage)

    apiGroupInfo.VersionedResourcesStorageMap[v1beta1.SchemeGroupVersion.Version] = storage
  }
  // If v1 is enabled, save the resource version, resource, and resource storage to the APIGroupInfo map
  if apiResourceConfig.VersionEnabled(v1.SchemeGroupVersion) {
    storage := map[string]rest.Storage{}
    // customresourcedefinitions
    customResourceDefintionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
    iferr ! =nil {
      return nil, err
    }
    storage["customresourcedefinitions"] = customResourceDefintionStorage
    storage["customresourcedefinitions/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefintionStorage)

    apiGroupInfo.VersionedResourcesStorageMap[v1.SchemeGroupVersion.Version] = storage
  }
  // Register API, which is described separately later
  iferr := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err ! =nil {
    return nil, err
  }

  crdClient, err := internalclientset.NewForConfig(s.GenericAPIServer.LoopbackClientConfig)

  s.Informers = internalinformers.NewSharedInformerFactory(crdClient, 5*time.Minute)
  // Initialize the master controller
  establishingController := establish.NewEstablishingController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(), crdClient.Apiextensions())
  / / that handler
  crdHandler, err := NewCustomResourceDefinitionHandler(
    versionDiscoveryHandler,
    groupDiscoveryHandler,
    s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(),
    delegateHandler,
    c.ExtraConfig.CRDRESTOptionsGetter,
    c.GenericConfig.AdmissionControl,
    establishingController,
    c.ExtraConfig.ServiceResolver,
    c.ExtraConfig.AuthResolverWrapper,
    c.ExtraConfig.MasterCount,
    s.GenericAPIServer.Authorizer,
    c.GenericConfig.MaxRequestBodyBytes,
  )
  iferr ! =nil {
    return nil, err
  }
  // Add the handler function
  s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler)
  s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler)

  // Initialize crdController
  crdController := NewDiscoveryController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler)
  // Initialize namingController
  namingController := status.NewNamingConditionController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(), crdClient.Apiextensions())
  // Initialize finalizingController
  finalizingController := finalizer.NewCRDFinalizer(
    s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(),
    crdClient.Apiextensions(),
    crdHandler,
  )
  // Initialize openapiController
  var openapiController *openapicontroller.Controller
  if utilfeature.DefaultFeatureGate.Enabled(apiextensionsfeatures.CustomResourcePublishOpenAPI) {
    openapiController = openapicontroller.NewController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions())
  }

  // Register hook functions: listen on informer
  s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-informers".func(context genericapiserver.PostStartHookContext) error {
    s.Informers.Start(context.StopCh)
    return nil
  })
  // Register hook function: start controller
  s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-controllers".func(context genericapiserver.PostStartHookContext) error {
    if utilfeature.DefaultFeatureGate.Enabled(apiextensionsfeatures.CustomResourcePublishOpenAPI) {
      go openapiController.Run(s.GenericAPIServer.StaticOpenAPISpec, s.GenericAPIServer.OpenAPIVersionedService, context.StopCh)
    }
    // Start the various controllers initialized earlier
    go crdController.Run(context.StopCh)
    go namingController.Run(context.StopCh)
    go establishingController.Run(context.StopCh)
    go finalizingController.Run(5, context.StopCh)
    return nil
  })
  // Register hook functions to synchronize CRD resources
  s.GenericAPIServer.AddPostStartHookOrDie("crd-informer-synced".func(context genericapiserver.PostStartHookContext) error {
    return wait.PollImmediateUntil(100*time.Millisecond, func(a) (bool, error) {
      return s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions().Informer().HasSynced(), nil
    }, context.StopCh)
  })

  return s, nil
}
Copy the code
3.3.1 Instantiating APIGroupInfo

APIGroupInfo Describes information about resource groups. One resource corresponds to one APIGroupInfo object, and each resource corresponds to one resource storage object

func NewDefaultAPIGroupInfo(group string, scheme *runtime.Scheme, parameterCodec runtime.ParameterCodec, codecs serializer.CodecFactory) APIGroupInfo {
  return APIGroupInfo{
    PrioritizedVersions:          scheme.PrioritizedVersionsForGroup(group),
    // This map is used to map resources and resource storage objects
    // Format: resource version/resource/Resource store object
    // The RESTStorage object is used to add, delete, modify, and query resources
    // The RESTStorage is later converted to HTTP handler functions
    VersionedResourcesStorageMap: map[string]map[string]rest.Storage{},
    // TODO unhardcode this. It was hardcoded before, but we need to re-evaluate
    OptionsExternalVersion: &schema.GroupVersion{Version: "v1"},
    Scheme:                 scheme,
    ParameterCodec:         parameterCodec,
    NegotiatedSerializer:   codecs,
  }
}
Copy the code
3.3.2 Registering API functions: InstallAPIGroup

Registered APIGroupInfo function is very important, will be resources in APIGroupInfo object to register APIExtensionServerHandler function. The process is:

  • Traverse APIGroupInfo
  • Map resource groups, resource versions, and resource names to HTTP PATH request paths
  • The Resource store object is used as the Handlers method of the resource through the InstallREST function
  • Finally, the defined request path and handlers methods are routed to the gO restful using ws-route
func (s *GenericAPIServer) InstallAPIGroup(apiGroupInfo *APIGroupInfo) error {
  return s.InstallAPIGroups(apiGroupInfo)
}

// InstallAPIGroups
func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ... *APIGroupInfo) error{...// Go through all the resource information and install the resource version handler at once
  for _, apiGroupInfo := range apiGroupInfos {
    iferr := s.installAPIResources(APIGroupPrefix, apiGroupInfo, openAPIModels); err ! =nil {
      return fmt.Errorf("unable to install api resources: %v", err)
    }
    ...
    apiGroup := metav1.APIGroup{
      Name:             apiGroupInfo.PrioritizedVersions[0].Group, Versions: apiVersionsForDiscovery, PreferredVersion: preferredVersionForDiscovery, } s.DiscoveryGroupManager.AddGroup(apiGroup) s.Handler.GoRestfulContainer.Add(discovery.NewAPIGroupHandler(s.Serializer,  apiGroup).WebService()) }return nil
}

// installAPIResources
func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo, openAPIModels openapiproto.Models) error {
  for _, groupVersion := range apiGroupInfo.PrioritizedVersions {
    ...
    / / call InstallREST
    // The parameter is a Go-restful container object
    iferr := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer); err ! =nil {
      return fmt.Errorf("unable to setup API %v: %v", apiGroupInfo, err)
    }
  }

  return nil
}

// InstallREST
func (g *APIGroupVersion) InstallREST(container *restful.Container) error {
  // Define the HTTP path request path
  // Format: 
      
       /
       
        /
        
       
      
  // apiPrefix is an API or apis
  prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
  // Instantiate the APIInstaller instantiator
  installer := &APIInstaller{
    group:             g,
    prefix:            prefix,
    minRequestTimeout: g.MinRequestTimeout,
  }
  // Register the API to return a Go-restful WebService object
  apiResources, ws, registrationErrors := installer.Install()
  versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources})
  versionDiscoveryHandler.AddToWebService(ws)
  // Add WebService to Container
  container.Add(ws)
  return utilerrors.NewAggregate(registrationErrors)
}

// installer.Install
func (a *APIInstaller) Install(a) ([]metav1.APIResource, *restful.WebService, []error) {
  // Construct WebService object
  ws := a.newWebService()
  ...
  // Traverses all paths
  for _, path := range paths {
    // Implement the Storage to Router conversion, register the route to the WebService
    apiResource, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)
    ...
    ifapiResource ! =nil {
      // Add to the list
      apiResources = append(apiResources, *apiResource)
    }
  }
  return apiResources, ws, errors
}

// This method is very long. The core function is to construct a handler based on the storage, and then construct the handler and path into a Go-restful framework Route object. Finally, Route is added to the WebService
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (...).{...// Determine which Rest interfaces the storage implements
  creater, isCreater := storage.(rest.Creater)
  namedCreater, isNamedCreater := storage.(rest.NamedCreater)
  ...
  // Construct the action list
  actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister)
  ...
  for _, action := range actions {
    ...
    // Construct a Go-restful RouteBuilder object
    routes := []*restful.RouteBuilder{}
    // Register different handlers depending on the Verb of the action
    switch action.Verb {
    case "GET":...// Initialize handler
      handler = restfulGetResource(getter, exporter, reqScope)
      ...
      / / construct the route
      route := ws.GET(action.Path).To(handler).xxx
      ...
      // route appended to routes
      routes = append(routes, route)
    ...
    case "POST":...// Handler initialization
      handler = restfulCreateResource(creater, reqScope, admit)
      route := ws.POST(action.Path).To(handler).xxx
      routes = append(routes, route)
    ...

    // Walk through all the routes
    for _, route := range routes {
        route.Metadata(ROUTE_META_GVK, metav1.GroupVersionKind{
          Group:   reqScope.Kind.Group,
          Version: reqScope.Kind.Version,
          Kind:    reqScope.Kind.Kind,
        })
        // Add custom extension attributes (k8S all extension attributes start with x-)
        route.Metadata(ROUTE_META_ACTION, strings.ToLower(action.Verb))
        // Add route to WebService
        ws.Route(route)
      }
  }
}

// Handler is initialized
func restfulCreateResource(r rest.Creater, scope handlers.RequestScope, admit admission.Interface) restful.RouteFunction {
  return func(req *restful.Request, res *restful.Response) {
    handlers.CreateResource(r, &scope, admit)(res.ResponseWriter, req.Request)
  }
}

// Returns a handler to process the resource
func CreateResource(r rest.Creater, scope *RequestScope, admission admission.Interface) http.HandlerFunc {
  return createHandler(&namedCreaterAdapter{r}, scope, admission, false)}// Returns an HTTP library handler function that handles the corresponding routing request
func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Interface, includeName bool) http.HandlerFunc {
  // HTTP library standard handler
  return func(w http.ResponseWriter, req *http.Request){...// Find the appropriate SerializeInfo
    s, err := negotiation.NegotiateInputSerializer(req, false, scope.Serializer)
    ...
    // Find the appropriate codec
    decoder := scope.Serializer.DecoderToVersion(s.Serializer, scope.HubGroupVersion)

    / / decoding
    obj, gvk, err := decoder.Decode(body, &defaultGVK, original)
    // Process the request
    result, err := finishRequest(timeout, func(a) (runtime.Object, error){... })... }}Copy the code

3.4 Creating kubeapi-server service

The process for creating KubeAPIServer is similar to the process for creating KubeAPIExtensionServer, with the same principles. Include:

  • Map // to a resource store object and store it in APIGroupInfo’s map
  • Use the Installer. install installer to register the corresponding handlers methods (ResourceStorage of the resource store object) for the resource.
  • Bind the resource to the Handlers method and construct a Route to add to the WebService
  • Finally, add the WebService to the Container
func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, delegateAPIServer genericapiserver.DelegationTarget) (*master.Master, error) {
  kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
  ...
}

func (c *Config) Complete(a) CompletedConfig{...// Call createEndpointReconciler internally
  if cfg.ExtraConfig.EndpointReconcilerConfig.Reconciler == nil {
    cfg.ExtraConfig.EndpointReconcilerConfig.Reconciler = c.createEndpointReconciler()
  }

  return CompletedConfig{&cfg}
}

// createEndpointReconciler
func (c *Config) createEndpointReconciler(a) reconcilers.EndpointReconciler {
  switch c.ExtraConfig.EndpointReconcilerType {
  // there are numerous test dependencies that depend on a default controller
  case "", reconcilers.MasterCountReconcilerType:
    return c.createMasterCountReconciler()
  case reconcilers.LeaseEndpointReconcilerType:
    return c.createLeaseReconciler()
  case reconcilers.NoneEndpointReconcilerType:
    return c.createNoneReconciler()
  default:
    klog.Fatalf("Reconciler not implemented: %v", c.ExtraConfig.EndpointReconcilerType)
  }
  return nil
}

func (c *Config) createLeaseReconciler(a) reconcilers.EndpointReconciler{...// Initialize the Storage
  leaseStorage, _, err := storagefactory.Create(*config)
  ...
  return reconcilers.NewLeaseEndpointReconciler(endpointsAdapter, masterLeases)
}

func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
  switch c.Type {
  ...
  case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
    // Initialize the Storage
    returnnewETCD3Storage(c) ... }}func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Master, error) {
  // initialize kube-apiserver
  s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)
  // Initialize the Master. K8s core services are managed through the Master object
  // Only instantiated objects can register resources under KubeAPIServer
  m := &Master{
    GenericAPIServer:          s,
    ClusterAuthenticationInfo: c.ExtraConfig.ClusterAuthenticationInfo,
  }

  if c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) {
    // Register resource groups without group names with paths prefixed with "/ API"
    iferr := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider); err ! =nil {
      return nil, err
    }
  }

 // Register resource group with group name, path prefixed with "/apis"
  iferr := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...) ; err ! =nil {
    return nil, err
  }

  m.GenericAPIServer.AddPostStartHookOrDie("start-cluster-authentication-info-controller".func(hookContext genericapiserver.PostStartHookContext) error {
    kubeClient, err := kubernetes.NewForConfig(hookContext.LoopbackClientConfig)
    // Create an authentication controller
    controller := clusterauthenticationtrust.NewClusterAuthenticationTrustController(m.ClusterAuthenticationInfo, kubeClient)
    ...
    / / start the controller
    go controller.Run(1, hookContext.StopCh)
    return nil
  })

  return m, nil
}
Copy the code
3.4.1 track InstallLegacyAPI
func (m *Master) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter, legacyRESTStorageProvider corerest.LegacyRESTStorageProvider) error {
  // instantiate APIGroupInfo
  // Generate storages corresponding to various resources
  legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)

  ...
  / / create bootstrapController
  bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient, coreClient.RESTClient())
  / / register API
  iferr := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err ! =nil {
    return fmt.Errorf("error in registering group versions: %v", err)
  }
  return nil
}

// Use NewStorage and NewRest to create storage for various resources and store them in the map
func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error){... restStorage := LegacyRESTStorage{} podTemplateStorage, err := podtemplatestore.NewREST(restOptionsGetter) podStorage, err := podstore.NewStorage(...) .// NewStorage operates etCD internally through the etcd client
  controllerStorage, err := controllerstore.NewStorage(restOptionsGetter)
  ...
  restStorageMap := map[string]rest.Storage{
    "pods":             podStorage.Pod,
    ...
    "replicationControllers": controllerStorage.Controller, ... }...return restStorage, apiGroupInfo, nil
}
Copy the code
3.4.2 InstallLegacyAPIGroup
func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo *APIGroupInfo) error{...// Also calls installAPIResources internally, as described earlier
  iferr := s.installAPIResources(apiPrefix, apiGroupInfo, openAPIModels); err ! =nil {
    return err
  }

  s.Handler.GoRestfulContainer.Add(discovery.NewLegacyRootAPIHandler(s.discoveryAddresses, s.Serializer, apiPrefix).WebService())
  return nil
}
Copy the code
3.4.2 InstallAPIs
func (m *Master) InstallAPIs(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter, restStorageProviders ... RESTStorageProvider) error {
  apiGroupsInfo := []*genericapiserver.APIGroupInfo{}

  // Iterate through all providers
  for _, restStorageBuilder := range restStorageProviders {
    ...
    // Obtain the Storage corresponding to the resource
    apiGroupInfo, enabled, err := restStorageBuilder.NewRESTStorage(apiResourceConfigSource, restOptionsGetter)
    ...
  }

  // InstallAPIGroups this function was analyzed earlier
  iferr := m.GenericAPIServer.InstallAPIGroups(apiGroupsInfo...) ; err ! =nil {
    return fmt.Errorf("error in registering group versions: %v", err)
  }
  return nil
}

// RESTStorageProvider interface. Each resource implements this interface and its own service logic
// The implementation logic is pretty much the same as described above. Both call NewStorage and NewREST operations etcd
type RESTStorageProvider interface {
  GroupName() string
  NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, bool, error)
}
Copy the code

3.5 Creating the Aggregator-Server configuration

func createAggregatorConfig(...). (*aggregatorapiserver.Config, error){... aggregatorConfig := &aggregatorapiserver.Config{ GenericConfig: &genericapiserver.RecommendedConfig{ Config: genericConfig, SharedInformerFactory: externalInformers, }, ExtraConfig: aggregatorapiserver.ExtraConfig{ ProxyClientCertFile: commandOptions.ProxyClientCertFile, ProxyClientKeyFile: commandOptions.ProxyClientKeyFile, ServiceResolver: serviceResolver, ProxyTransport: proxyTransport, }, }return aggregatorConfig, nil
}
Copy the code

3.6 Creating the Aggregator-Server Service

The process for creating AggregatorServer is similar to the process for creating KubeAPIExtensionServer.

func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget, apiExtensionInformers apiextensionsinformers.SharedInformerFactory) (*aggregatorapiserver.APIAggregator, error) {
  // Initialize the delegate
  aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer)
  ...
  / / create autoRegistrationController
  autoRegistrationController := autoregister.NewAutoRegisterController(aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(), apiRegistrationClient)
  apiServices := apiServicesToRegister(delegateAPIServer, autoRegistrationController)
  / / create crdRegistrationController
  crdRegistrationController := crdregistration.NewCRDRegistrationController(
    apiExtensionInformers.Apiextensions().V1().CustomResourceDefinitions(),
    autoRegistrationController)

  err = aggregatorServer.GenericAPIServer.AddPostStartHook("kube-apiserver-autoregistration".func(context genericapiserver.PostStartHookContext) error {
    / / start crdRegistrationController
    go crdRegistrationController.Run(5, context.StopCh)
    go func(a) {
      if aggregatorConfig.GenericConfig.MergedResourceConfig.AnyVersionForGroupEnabled("apiextensions.k8s.io") {
        crdRegistrationController.WaitForInitialSync()
      }
      / / start autoRegistrationController
      autoRegistrationController.Run(5, context.StopCh)
    }()
    return nil})... }func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) {
  / / create kube - aggregator
  genericServer, err := c.GenericConfig.New("kube-aggregator", delegationTarget)
  // Initialize APIAggregator
  s := &APIAggregator{
    GenericAPIServer:           genericServer,
    delegateHandler:            delegationTarget.UnprotectedHandler(),
    proxyTransport:             c.ExtraConfig.ProxyTransport,
    proxyHandlers:              map[string]*proxyHandler{},
    handledGroups:              sets.String{},
    lister:                     informerFactory.Apiregistration().V1().APIServices().Lister(),
    APIRegistrationInformers:   informerFactory,
    serviceResolver:            c.ExtraConfig.ServiceResolver,
    openAPIConfig:              openAPIConfig,
    egressSelector:             c.GenericConfig.EgressSelector,
    proxyCurrentCertKeyContent: func(a) (bytes []byte, bytes2 []byte) { return nil.nil}},// Initialize the Storage. The logic is the same as before
  apiGroupInfo := apiservicerest.NewRESTStorage(c.GenericConfig.MergedResourceConfig, c.GenericConfig.RESTOptionsGetter)
  // Install API, same as before
 iferr := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err ! =nil {
    return nil, err
  }
  ...
}
Copy the code

3.7 create GenericAPIServer – server

GenericAPIServer is used to create the first three services. Map K8S resources to RestAPI using Genericapiserver

3.7.1 genericConfig Instantiation
func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*GenericAPIServer, error){...// Construct the handler chain
  handlerChainBuilder := func(handler http.Handler) http.Handler {
    returnc.BuildHandlerChainFunc(handler, c.Config) } apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler()) s := &GenericAPIServer{ ... }... installAPI(s, c.Config) ... }// NewAPIServerHandler
func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler{...// Create a Go-restful Container object
  gorestfulContainer := restful.NewContainer()
  gorestfulContainer.ServeMux = http.NewServeMux()
  gorestfulContainer.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*}
  gorestfulContainer.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
    logStackOnRecover(s, panicReason, httpWriter)
  })
  gorestfulContainer.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
    serviceErrorHandler(s, serviceErr, request, response)
  })

  director := director{
    name:               name,
    goRestfulContainer: gorestfulContainer,
    nonGoRestfulMux:    nonGoRestfulMux,
  }
  / / create a handler
  return &APIServerHandler{
    FullHandlerChain:   handlerChainBuilder(director),
    GoRestfulContainer: gorestfulContainer,
    NonGoRestfulMux:    nonGoRestfulMux,
    Director:           director,
  }
}
Copy the code

3.8 Starting the Service

CreateServerChain of the last step is to start the service insecureServingInfo. Serve function

func (s *DeprecatedInsecureServingInfo) Serve(handler http.Handler, shutdownTimeout time.Duration, stopCh <-chan struct{}) error {
  // Initialize the HTTP service
  insecureServer := &http.Server{
    Addr:           s.Listener.Addr().String(),
    Handler:        handler,
    MaxHeaderBytes: 1 << 20,}...// Start the service, internally call server.serve (listener)
  _, err := RunServer(insecureServer, s.Listener, shutdownTimeout, stopCh)
  return err
}

func RunServer(
  server *http.Server,
  ln net.Listener,
  shutDownTimeout time.Duration,
  stopCh <-chan struct{},) (< -chan struct{}, error){...go func(a){...// Use the go library server.Serve to listen to the listener
    // Set up groutine for each connection during runtime, groutine reads the request, and calls handler functions to process and respond to the requesterr := server.Serve(listener) ... } ()return stoppedCh, nil
}
Copy the code

reference

  • Kubernetes source code Analysis