All code in this section is based on the latest version 1.13.4.
Start the analysis
As with all Kubernetes component startup code, Apiserver starts using cobra command line mode
Run
Run
The Server end to create
Server side creation focuses on the CreateServerChain method. The method code is as follows:
// CreateServerChain creates the apiservers connected via delegation.
// CreateServerChain Creates apiservers connected by a delegate to create a series of servers
func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*genericapiserver.GenericAPIServer, error) {
nodeTunneler, proxyTransport, err := CreateNodeDialer(completedOptions)
iferr ! =nil {
return nil, err
}
// 1. Create kubeAPIServerConfig
kubeAPIServerConfig, insecureServingInfo, serviceResolver, pluginInitializer, admissionPostStartHook, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
iferr ! =nil {
return nil, err
}
// If additional API servers are added, they should be gated.
// 2. Check whether the extension API Server is configured and create the apiExtensionsConfig configuration
apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig))
iferr ! =nil {
return nil, err
}
// apiExtensionsServer, extensible API server
// 3. Start extended API server
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())
iferr ! =nil {
return nil, err
}
// 4. Start kubeAPIServer
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer, admissionPostStartHook)
iferr ! =nil {
return nil, err
}
// otherwise go down the normal path of standing the aggregator up in front of the API server
// this wires up openapi
kubeAPIServer.GenericAPIServer.PrepareRun()
// This will wire up openapi for extension api server
apiExtensionsServer.GenericAPIServer.PrepareRun()
// aggregator comes last in the chain
// 5. Configure aggregatorConfig for the aggregation layer
aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, proxyTransport, pluginInitializer)
iferr ! =nil {
return nil, err
}
// 6. AggregatorServer
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
iferr ! =nil {
// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
return nil, err
}
// 7. Start the server on an insecure port
ifinsecureServingInfo ! =nil {
insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(aggregatorServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig)
iferr := insecureServingInfo.Serve(insecureHandlerChain, kubeAPIServerConfig.GenericConfig.RequestTimeout, stopCh); err ! =nil {
return nil, err
}
}
// 8. Return GenericAPIServer and then start the server of the security port
return aggregatorServer.GenericAPIServer, nil
}
Copy the code
Creation process basically has the following steps: 1, according to the configuration structure apiserver configuration, call methods CreateKubeAPIServerConfig; 2, according to the configuration structure of extended apiserver configuration, a method is called for createAPIExtensionsConfig; 3, create a server, including the extension of apiserver and native apiserver, call methods for createAPIExtensionsServer and CreateKubeAPIServer. This is to register the routing methods of each handler into a Container, which follows the Go-restful design mode. That is, the processing methods are registered with the Route, and routes in the same root path are registered with the WebService. The WebService is registered in the Container, which is responsible for distribution. The access procedure is Container >WebService >Route. Refer to the code for more detailed use of Go-restful; 4. Aggregate server configuration and creation. The main idea is to integrate the access of native Apiserver and extended Apiserver, adding some subsequent processing interfaces. Call methods createAggregatorConfig and createAggregatorServer. 5. The server information is displayed. More than a few steps, the most core is apiserver how to create, namely how to go – a restful pattern, add the routing and corresponding processing methods, CreateKubeAPIServer method, for example, createAPIExtensionsServer similar.
create
The CreateKubeAPIServer method is as follows
func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, delegateAPIServer genericapiserver.DelegationTarget, admissionPostStartHook genericapiserver.PostStartHookFunc) (*master.Master, error) {
kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
iferr ! =nil {
return nil, err
}
kubeAPIServer.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-admission-initializer", admissionPostStartHook)
return kubeAPIServer, nil
}
Copy the code
The New method generates the configuration of kubeAPIServer and enters the New method,
// New returns a new instance of Master from the given config.
// Certain config fields will be set to a default value if unset.
// Certain config fields must be specified, including:
// KubeletClientConfig
// Returns a new Master instance with the given configuration. For some unconfigured options, you can use the default Settings. But for a configuration like KubeletClientConfig, you must specify it manually
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Master, error) {
if reflect.DeepEqual(c.ExtraConfig.KubeletClientConfig, kubeletclient.KubeletClientConfig{}) {
return nil, fmt.Errorf("Master.New() called with empty config.KubeletClientConfig")}// 1. Initialize a Go-restful Container and initialize apiServerHandler
s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)
iferr ! =nil {
return nil, err
}
if c.ExtraConfig.EnableLogsSupport {
routes.Logs{}.Install(s.Handler.GoRestfulContainer)
}
m := &Master{
GenericAPIServer: s,
}
// install legacy rest storage
// / The version API that starts with THE API is registered in the Container, such as Pod and Namespace resources
if c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) {
legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{
StorageFactory: c.ExtraConfig.StorageFactory,
ProxyTransport: c.ExtraConfig.ProxyTransport,
KubeletClientConfig: c.ExtraConfig.KubeletClientConfig,
EventTTL: c.ExtraConfig.EventTTL,
ServiceIPRange: c.ExtraConfig.ServiceIPRange,
ServiceNodePortRange: c.ExtraConfig.ServiceNodePortRange,
LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig,
ServiceAccountIssuer: c.ExtraConfig.ServiceAccountIssuer,
ServiceAccountMaxExpiration: c.ExtraConfig.ServiceAccountMaxExpiration,
APIAudiences: c.GenericConfig.Authentication.APIAudiences,
}
m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider)
}
// The order here is preserved in discovery.
// If resources with identical names exist in more than one of these groups (e.g. "deployments.apps"" and "deployments.extensions"),
// the order of this list determines which group an unqualified resource name (e.g. "deployments") should prefer.
// This priority order is used for local discovery, but it ends up aggregated in `k8s.io/kubernetes/cmd/kube-apiserver/app/aggregator.go
// with specific priorities.
// TODO: describe the priority all the way down in the RESTStorageProviders and plumb it back through the various discovery
// handlers that we have.
// The API starting with apis is registered in the Container
restStorageProviders := []RESTStorageProvider{
auditregistrationrest.RESTStorageProvider{},
authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences},
authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver},
autoscalingrest.RESTStorageProvider{},
batchrest.RESTStorageProvider{},
certificatesrest.RESTStorageProvider{},
coordinationrest.RESTStorageProvider{},
extensionsrest.RESTStorageProvider{},
networkingrest.RESTStorageProvider{},
policyrest.RESTStorageProvider{},
rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer},
schedulingrest.RESTStorageProvider{},
settingsrest.RESTStorageProvider{},
storagerest.RESTStorageProvider{},
// keep apps after extensions so legacy clients resolve the extensions versions of shared resource names.
// See https://github.com/kubernetes/kubernetes/issues/42392
appsrest.RESTStorageProvider{},
admissionregistrationrest.RESTStorageProvider{},
eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL},
}
m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...)
ifc.ExtraConfig.Tunneler ! =nil {
m.installTunneler(c.ExtraConfig.Tunneler, corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig).Nodes())
}
m.GenericAPIServer.AddPostStartHookOrDie("ca-registration", c.ExtraConfig.ClientCARegistrationHook.PostStartHook)
return m, nil
}
Copy the code
The operations are as follows: 1. In gorestful mode, call the c. genericConfig. New method to initialize the Container, that is, gorestfulContainer. The initial method is NewAPIServerHandler. After initialization, add routes.
func installAPI(s *GenericAPIServer, c *Config) {
// Add "/" and "/index.html" routes
if c.EnableIndex {
routes.Index{}.Install(s.listedPathProvider, s.Handler.NonGoRestfulMux)
}
// Add "/swagger- UI /" route
ifc.SwaggerConfig ! =nil && c.EnableSwaggerUI {
routes.SwaggerUI{}.Install(s.Handler.NonGoRestfulMux)
}
// Add "/debug" related routes
if c.EnableProfiling {
routes.Profiling{}.Install(s.Handler.NonGoRestfulMux)
if c.EnableContentionProfiling {
goruntime.SetBlockProfileRate(1)}// so far, only logging related endpoints are considered valid to add for these debug flags.
routes.DebugFlags{}.Install(s.Handler.NonGoRestfulMux, "v", routes.StringFlagPutHandler(logs.GlogSetter))
}
// Add the "/metrics" route
if c.EnableMetrics {
if c.EnableProfiling {
routes.MetricsWithReset{}.Install(s.Handler.NonGoRestfulMux)
} else {
routes.DefaultMetrics{}.Install(s.Handler.NonGoRestfulMux)
}
}
// Add the "/version" route
routes.Version{Version: c.Version}.Install(s.Handler.GoRestfulContainer)
if c.EnableDiscovery {
s.Handler.GoRestfulContainer.Add(s.DiscoveryGroupManager.WebService())
}
}
Copy the code
This method adds several routes, including /, /swagger- UI, /debug/*, /metrics, and /version. You can access apiserver to view related information
api
apis
Route Adding (API start)
Routes starting with API are added using the InstallLegacyAPI method. Enter the InstallLegacyAPI method as follows:
func (m *Master) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter, legacyRESTStorageProvider corerest.LegacyRESTStorageProvider) {
legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)
iferr ! =nil {
klog.Fatalf("Error building core storage: %v", err)
}
controllerName := "bootstrap-controller"
coreClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient)
m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook)
m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook)
iferr := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err ! =nil {
klog.Fatalf("Error in registering group versions: %v", err)
}
}
Copy the code
Use the NewLegacyRESTStorage method to create reststorages for each resource. RESTStorage is a structure, the definition of concrete in the vendor/k8s. IO/apiserver/PKG/registry/generic/registry/store. Go, The structure contains NewFunc to return information about a specific resource, NewListFunc to return a specific resource list, CreateStrategy to create a specific resource, UpdateStrategy to update, DeleteStrategy to delete and other important methods. Inside NewLegacyRESTStorage, you can see reststorages with multiple resources created
NewREST
restStorageMap
restStorageMap := map[string]rest.Storage{
"pods": podStorage.Pod,
"pods/attach": podStorage.Attach,
"pods/status": podStorage.Status,
"pods/log": podStorage.Log,
"pods/exec": podStorage.Exec,
"pods/portforward": podStorage.PortForward,
"pods/proxy": podStorage.Proxy,
"pods/binding": podStorage.Binding,
"bindings": podStorage.Binding,
"podTemplates": podTemplateStorage,
"replicationControllers": controllerStorage.Controller,
"replicationControllers/status": controllerStorage.Status,
"services": serviceRest,
"services/proxy": serviceRestProxy,
"services/status": serviceStatusStorage,
"endpoints": endpointsStorage,
"nodes": nodeStorage.Node,
"nodes/status": nodeStorage.Status,
"nodes/proxy": nodeStorage.Proxy,
"events": eventStorage,
"limitRanges": limitRangeStorage,
"resourceQuotas": resourceQuotaStorage,
"resourceQuotas/status": resourceQuotaStatusStorage,
"namespaces": namespaceStorage,
"namespaces/status": namespaceStatusStorage,
"namespaces/finalize": namespaceFinalizeStorage,
"secrets": secretStorage,
"serviceAccounts": serviceAccountStorage,
"persistentVolumes": persistentVolumeStorage,
"persistentVolumes/status": persistentVolumeStatusStorage,
"persistentVolumeClaims": persistentVolumeClaimStorage,
"persistentVolumeClaims/status": persistentVolumeClaimStatusStorage,
"configMaps": configMapStorage,
"componentStatuses": componentstatus.NewStorage(componentStatusStorage{c.StorageFactory}.serversToValidate),
}
if legacyscheme.Scheme.IsVersionRegistered(schema.GroupVersion{Group: "autoscaling", Version: "v1"}) {
restStorageMap["replicationControllers/scale"] = controllerStorage.Scale
}
if legacyscheme.Scheme.IsVersionRegistered(schema.GroupVersion{Group: "policy", Version: "v1beta1"}) {
restStorageMap["pods/eviction"] = podStorage.Eviction
}
ifserviceAccountStorage.Token ! =nil {
restStorageMap["serviceaccounts/token"] = serviceAccountStorage.Token
}
apiGroupInfo.VersionedResourcesStorageMap["v1"] = restStorageMap
Copy the code
Finally, the RESTStorage operations for all resources starting with API are complete. After the route is created, install the route by executing InstallLegacyAPIGroup. The main call chain is InstallLegacyAPIGroup–>installAPIResources–>InstallREST–>Install–>registerResourceHandlers, The final core route construction is in the registerResourceHandlers method. This is a very complex method, the entire method is about 700 lines of code. The main function of the RESTStorage method is to determine which operations (such as create and update) can be performed on the resource and store the corresponding operations in the action. Each action corresponds to a standard REST operation. For example, the action corresponding to create is POST and the action corresponding to update is PUT. Finally, a handler method is added for each operation according to the array of actions, which is registered to route, and Route is registered to WebService, perfectly matching the gO restful design pattern.
Route Adding (apis start)
API routes are mainly implemented for basic resources. For other additional resources, such as authentication related and network related extended API resources, they are named with apis and the implementation entry is InstallAPIs. The main difference between InstallAPIs and InstallLegacyAPIGroup is how to obtain RESTStorage. For routes that start with API, the format is/API /v1. For apis routing is different in the beginning, it contains a variety of different formats (Kubernetes code called groupName), such as apis/apps /, / apis/certificates. K8s. IO wait for all sorts of irregular groupName. To do this, Kubernetes provides an interface to RESTStorageProvider’s factory mode
// RESTStorageProvider is a factory type for REST storage.
type RESTStorageProvider interface {
GroupName() string
NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, bool)}Copy the code
All resources that route starting with apis need to implement this interface. GroupName () method to obtain the is similar to/apis/apps, apis/certificates. K8s. IO GroupName, NewRESTStorage method after access to the corresponding RESTStorage packaging information. The NewRESTStorage interface of various resources is implemented as shown below:
The Server end to start
After the server is created through CreateServerChain, GenericAPIServer’s Run method is called to complete the final startup. We start with the PrepareRun methods for finishing up the routes before startup, which mainly registers Swagger and OpenAPI routes (Swagger and OpenAPI mainly contain all the details and specifications of the Kubernetes API). And complete the registration of/HEALTHZ routes. When complete, start the final server startup. The Run method starts the secure HTTP server using the NonBlockingRun method (the non-secure method has already been started in the CreateServerChain method).
// Run spawns the secure http server. It only returns if stopCh is closed
// or the secure port cannot be listened on initially.
// The Run method creates a secure HTTP server. Only returned when stopCh is closed or when the security port cannot be listened to initially
func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
// NonBlockingRun creates a secure HTTP server
err := s.NonBlockingRun(stopCh)
iferr ! =nil {
return err
}
<-stopCh
// The processing action after receiving stopCh
err = s.RunPreShutdownHooks()
iferr ! =nil {
return err
}
// Wait for all requests to finish, which are bounded by the RequestTimeout variable.
s.HandlerChainWaitGroup.Wait()
return nil
}
Copy the code
The main work of startup includes the configuration of various certificate authentication, time parameters, packet size parameters and so on. After that, the startup mode is started by calling the NET/HTTP library. The code is relatively simple and not listed in one.
Permissions related
There are three mechanisms related to permissions in ApiServer, namely, common authentication, authentication, and access control. Apiserver mainly provides reST-style interfaces, so all kinds of permissions are ultimately concentrated on the authority judgment of the interface. In the most core kubeAPIServerConfig, for example, in CreateServerChain method, call the CreateKubeAPIServerConfig method, this method is the main role is to create kubeAPIServer configuration. We go into this method, we call buildGenericConfig to create some generic configuration, and under NewConfig, we return DefaultBuildHandlerChain, which is basically used to chain the Apiserver REST interface, This operation is commonly known as filter operation, which is recorded first and analyzed later.
func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler := genericapifilters.WithAuthorization(apiHandler, c.Authorization.Authorizer, c.Serializer)
handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)
handler = genericapifilters.WithImpersonation(handler, c.Authorization.Authorizer, c.Serializer)
handler = genericapifilters.WithAudit(handler, c.AuditBackend, c.AuditPolicyChecker, c.LongRunningFunc)
failedHandler := genericapifilters.Unauthorized(c.Serializer, c.Authentication.SupportsBasicAuth)
failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.AuditBackend, c.AuditPolicyChecker)
handler = genericapifilters.WithAuthentication(handler, c.Authentication.Authenticator, failedHandler, c.Authentication.APIAudiences)
handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil.nil.nil."true")
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc, c.RequestTimeout)
handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup)
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
handler = genericfilters.WithPanicRecovery(handler)
return handler
}
Copy the code
After the configuration file is created, go to the CreateKubeAPIServer method and initialize the gO restful Container
handlerChainBuilder
DefaultBuildHandlerChain
NewAPIServerHandler
NewAPIServerHandler
func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
nonGoRestfulMux := mux.NewPathRecorderMux(name)
ifnotFoundHandler ! =nil {
nonGoRestfulMux.NotFoundHandler(notFoundHandler)
}
gorestfulContainer := restful.NewContainer()
gorestfulContainer.ServeMux = http.NewServeMux()
gorestfulContainer.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*}
gorestfulContainer.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
logStackOnRecover(s, panicReason, httpWriter)
})
gorestfulContainer.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
serviceErrorHandler(s, serviceErr, request, response)
})
director := director{
name: name,
goRestfulContainer: gorestfulContainer,
nonGoRestfulMux: nonGoRestfulMux,
}
return &APIServerHandler{
FullHandlerChain: handlerChainBuilder(director),
GoRestfulContainer: gorestfulContainer,
NonGoRestfulMux: nonGoRestfulMux,
Director: director,
}
}
Copy the code
The handler for gorestfulContainer is registered by passing the director as a parameter to the handlerChainBuilder callback method. Director is a variable that implements HTTP.Handler. So the whole processing logic is to pass the director of type HTTP. Handler as a parameter to the DefaultBuildHandlerChain method of the chain filter. Use DefaultBuildHandlerChain to filter each step for permission control and so on. See this article for details on how to implement filter functionality through the NET/HTTP package. After completing functions similar to filter, the next step is to do startup work, including certificate verification, TLS authentication and other work, which will not be described too much. Let’s look at how filter’s DefaultBuildHandlerChain method handles interface authentication.
RBAC start
Perhaps the most important of Kubernetes is RBAC. Within the DefaultBuildHandlerChain method, by calling the genericapifilters. WithAuthorization method, realizes the permissions for each interface filter operation. The WithAuthorization method is as follows
func WithAuthorization(handler http.Handler, a authorizer.Authorizer, s runtime.NegotiatedSerializer) http.Handler {
if a == nil {
klog.Warningf("Authorization is disabled")
return handler
}
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
ae := request.AuditEventFrom(ctx)
attributes, err := GetAuthorizerAttributes(ctx)
iferr ! =nil {
responsewriters.InternalError(w, req, err)
return
}
authorized, reason, err := a.Authorize(attributes)
// an authorizer like RBAC could encounter evaluation errors and still allow the request, so authorizer decision is checked before error here.
if authorized == authorizer.DecisionAllow {
audit.LogAnnotation(ae, decisionAnnotationKey, decisionAllow)
audit.LogAnnotation(ae, reasonAnnotationKey, reason)
handler.ServeHTTP(w, req)
return
}
iferr ! =nil {
audit.LogAnnotation(ae, reasonAnnotationKey, reasonError)
responsewriters.InternalError(w, req, err)
return
}
klog.V(4).Infof("Forbidden: %#v, Reason: %q", req.RequestURI, reason)
audit.LogAnnotation(ae, decisionAnnotationKey, decisionForbid)
audit.LogAnnotation(ae, reasonAnnotationKey, reason)
responsewriters.Forbidden(ctx, attributes, w, req, reason, s)
})
}
Copy the code
1. Call the GetAuthorizerAttributes method to obtain the configured values of various attributes; 2. Call the Authorize method to determine whether the authority is approved. Different permissions realize the interface and complete the authentication task.
handler.ServeHTTP
filter
Authorize
VisitRulesFor
kubernetes/pkg/registry/rbac/validation/rule.go
VisitRulesFor
func (r *DefaultRuleResolver) VisitRulesFor(user user.Info, namespace string, visitor func(source fmt.Stringer, rule *rbacv1.PolicyRule, err error) bool) {
ifclusterRoleBindings, err := r.clusterRoleBindingLister.ListClusterRoleBindings(); err ! =nil {
if! visitor(nil.nil, err) {
return}}else {
sourceDescriber := &clusterRoleBindingDescriber{}
for _, clusterRoleBinding := range clusterRoleBindings {
subjectIndex, applies := appliesTo(user, clusterRoleBinding.Subjects, "")
if! applies {continue
}
rules, err := r.GetRoleReferenceRules(clusterRoleBinding.RoleRef, "")
iferr ! =nil {
if! visitor(nil.nil, err) {
return
}
continue
}
sourceDescriber.binding = clusterRoleBinding
sourceDescriber.subject = &clusterRoleBinding.Subjects[subjectIndex]
for i := range rules {
if! visitor(sourceDescriber, &rules[i],nil) {
return}}}}if len(namespace) > 0 {
ifroleBindings, err := r.roleBindingLister.ListRoleBindings(namespace); err ! =nil {
if! visitor(nil.nil, err) {
return}}else {
sourceDescriber := &roleBindingDescriber{}
for _, roleBinding := range roleBindings {
subjectIndex, applies := appliesTo(user, roleBinding.Subjects, namespace)
if! applies {continue
}
rules, err := r.GetRoleReferenceRules(roleBinding.RoleRef, namespace)
iferr ! =nil {
if! visitor(nil.nil, err) {
return
}
continue
}
sourceDescriber.binding = roleBinding
sourceDescriber.subject = &roleBinding.Subjects[subjectIndex]
for i := range rules {
if! visitor(sourceDescriber, &rules[i],nil) {
return
}
}
}
}
}
}
Copy the code
The main work is to judge clusterRoleBinding and roleBinding and configured resources clearly, which is basically consistent with our idea of using RBAC.
Database operations
ApiServer’s interaction with the database refers primarily to its interaction with ETCD. All components of Kubernetes do not interact with ETCD directly, but request Apiserver, and APiserver interacts with ETCD to complete the final drop of data. As mentioned in the previous routing implementation, apiserver’s final implementation of the handler’s back-end data is stored as a Store structure. In this example, routes that start with API are generated using NewREST or NewStorage. For example, endpoints are generated using the following method
func NewREST(optsGetter generic.RESTOptionsGetter) *REST {
store := &genericregistry.Store{
NewFunc: func(a) runtime.Object { return &api.Endpoints{} },
NewListFunc: func(a) runtime.Object { return &api.EndpointsList{} },
DefaultQualifiedResource: api.Resource("endpoints"),
CreateStrategy: endpoint.Strategy,
UpdateStrategy: endpoint.Strategy,
DeleteStrategy: endpoint.Strategy,
TableConvertor: printerstorage.TableConvertor{TablePrinter: printers.NewTablePrinter().With(printersinternal.AddHandlers)},
}
options := &generic.StoreOptions{RESTOptions: optsGetter}
iferr := store.CompleteWithOptions(options); err ! =nil {
panic(err) // TODO: Propagate error up
}
return &REST{store}
}
Copy the code
Basically, the CompleteWithOptions method, and inside the CompleteWithOptions method, I call the GetRESTOptions method of RESTOptions, Call the StorageWithCacher–>NewRawStorage–>Create method to Create the final dependent back-end storage:
// Create creates a storage backend based on given config.
func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
switch c.Type {
case "etcd2":
return nil.nil, fmt.Errorf("%v is no longer a supported storage backend", c.Type)
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
return newETCD3Storage(c)
default:
return nil.nil, fmt.Errorf("unknown storage type: %s", c.Type)
}
}
Copy the code
As you can see, the Create method is used to Create the back-end version of etCD2 or etCD3. The default version is etCD3. Once the corresponding store has been created, the next thing to do is to bind the corresponding handler method to the final implementation of the background store (the handler method needs to process the final data from the disk). Remember there’s a long method registerResourceHandlers that handles specific handler routes. Going back to that method again,
POST
handler
createHandler
kubernetes/vendor/k8s.io/apiserver/pkg/endpoints/handlers/crete.go
kubernetes/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go
Create
BeforeCreate
Storage.Create
AfterCreate
Decorator
POST
Storage.Create
func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
if version, err := s.versioner.ObjectResourceVersion(obj); err == nil&& version ! =0 {
return errors.New("resourceVersion should not be set on objects to be created")}iferr := s.versioner.PrepareObjectForStorage(obj); err ! =nil {
return fmt.Errorf("PrepareObjectForStorage failed: %v", err)
}
data, err := runtime.Encode(s.codec, obj)
iferr ! =nil {
return err
}
key = path.Join(s.pathPrefix, key)
opts, err := s.ttlOpts(ctx, int64(ttl))
iferr ! =nil {
return err
}
newData, err := s.transformer.TransformToStorage(data, authenticatedDataString(key))
iferr ! =nil {
return storage.NewInternalError(err.Error())
}
txnResp, err := s.client.KV.Txn(ctx).If(
notFound(key),
).Then(
clientv3.OpPut(key, string(newData), opts...) , ).Commit()iferr ! =nil {
return err
}
if! txnResp.Succeeded {return storage.NewKeyExistsError(key, 0)}ifout ! =nil {
putResp := txnResp.Responses[0].GetResponsePut()
return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
}
return nil
}
Copy the code
The main operations are: 1, call Encode method serialization; 2. Call path.Join to parse the Key. 3. Call TransformToStorage to convert the data type; 4. Call the client method to write etCD. At this point, the binding of the handler processing to the etCD database operation is complete, that is, the entire operation procedure of the route back end is complete. See this article for more details on etCD operations.
The above are personal learning summary, if the error welcome correction!