In this article we will learn how to debug and learn the KubeSphere back-end module architecture based on the SSH remote plug-in on vscode.

The premise

  • Install VScode and SSH Remote Container.
  • Install kubenertes container “operating system” and KubeSphere >= V3.1.0 cloud “control panel” on the remote host;
  • Install the go > = 1.16;
  • Ks components such as Devops, Kubeedge, or whatever are installed on KubeSphere that require debugging. If they are activated by default, such as Monitoring, they do not need to be deactivated.

Configure the launch file

$ cat .vscode/launch.json
{
    // Use IntelliSense to learn about related attributes.
    // Hover to view descriptions of existing properties.
    / / for more information, please visit: https://go.microsoft.com/fwlink/?linkid=830387
    "version": "0.2.0"."configurations": [{"name": "ks-apiserver"."type": "go"."request": "launch"."mode": "auto"."program": "${workspaceFolder}/cmd/ks-apiserver/apiserver.go"}}]Copy the code

Ks-apiserver debug dependency files

Configure kubesphere.yaml in CMD /ks-apiserver/.

First, look at the CM configuration file in the cluster:

$ kubectl -n kubesphere-system get cm kubesphere-config -oyaml
Copy the code

Because kubeconfig configuration is not available in configMap, you need to copy the yamL file and integrate it below.

Why add a KubeconFig file?

This is mainly because K8S needs such a file when creating the client, and does not need to add inclusterConfig because it is used in the container.

If you are interested, take a look at the client-Go example:

Github.com/kubernetes/…

Github.com/kubernetes/…

So the complete configuration startup file is as follows:

$ cat ./cmd/ks-apiserver/kubesphere.yaml
kubernetes:
  kubeconfig: "/root/.kube/config"
  master: https://192.168.88.6:6443
  $qps: 1e+06
  burst: 1000000
authentication:
  authenticateRateLimiterMaxTries: 10
  authenticateRateLimiterDuration: 10m0s
  loginHistoryRetentionPeriod: 168h
  maximumClockSkew: 10s
  multipleLogin: True
  kubectlImage: Kubesphere/kubectl: v1.20.0
  jwtSecret: "Xtc8ZWUf9f3cJN89bglrTJhfUPMZR87d"
  oauthOptions:
    clients:
    - name: kubesphere
      secret: kubesphere
      redirectURIs:
      - The '*'
network:
  ippoolType: none
monitoring:
  endpoint: http://prometheus-operated.kubesphere-monitoring-system.svc:9090
  enableGPUMonitoring: false
gpu:
  kinds:
  - resourceName: nvidia.com/gpu
    resourceType: GPU
    default: True
notification:
  endpoint: http://notification-manager-svc.kubesphere-monitoring-system.svc:19093
  
kubeedge:
  endpoint: http://edge-watcher.kubeedge.svc/api/

gateway:
  watchesPath: /var/helm-charts/watches.yaml
  namespace: kubesphere-controls-system
Copy the code

In addition to Kubernetes, the key in the first layer represents ks components in our cluster that are already activated by or by default. Now you can start debug via F5.

Before the debug, you might ask, why this configuration file in/CMD/ks – apiserver/kubesphere yaml?

Let’s first explore the logic behind a wave of KS-Apiserver.

Start the ks – apiserver

CMD /ks-apiserver/app/server.go

// Load configuration from file
conf, err := apiserverconfig.TryLoadFromDisk()
Copy the code

The logic for TryLoadFromDisk is as follows:

viper.SetConfigName(defaultConfigurationName) // kubesphere
viper.AddConfigPath(defaultConfigurationPath) // /etc/kubesphere

// Load from current working directory, only used for debugging
viper.AddConfigPath(".")

// Load from Environment variables
viper.SetEnvPrefix("kubesphere")
viper.AutomaticEnv()
viper.SetEnvKeyReplacer(strings.NewReplacer("."."_"))

// After the previous configuration, the step ReadInConfig reads the file path is
/ / v.c onfigPaths: ["/etc/kubesphere ", "/ root/go/SRC/kubesphere. IO/kubesphere/CMD/ks - apiserver"]
iferr := viper.ReadInConfig(); err ! =nil {
	if _, ok := err.(viper.ConfigFileNotFoundError); ok {
		return nil, err
	} else {
		return nil, fmt.Errorf("error parsing configuration file %s", err)
	}
}

conf := New() // Initialize component configurations

// Deserialize the struct to conf from the actual path configuration file read
iferr := viper.Unmarshal(conf); err ! =nil {
	return nil, err
}

return conf, n
Copy the code

The above comment explains the need to add kubespher. yaml to the specified path to start the ks-apiserver command line.

Let’s move on to the command line integration using the cobra.mand package:

func Run(s *options.ServerRunOptions, ctx context.Context) error {
	// NewAPIServer starts the apiserver instance with the given configuration and binds the clients of the instantiated components
	// This step also registers some custom GVK to K8s through AddToScheme, eventually exposing it to apis apis
	// Initialize runtimecache and runtimeClient with rest.Config and Scheme
	apiserver, err := s.NewAPIServer(ctx.Done())
	iferr ! =nil {
		return err
	}
	
	// PrepareRun mainly integrates the Kapis API using Resful-Go
	// The previous step bound the client of each component. This step can call the client of each component to access the server side of the corresponding component
	// Guess what the 4.0 backend pluggable architecture will look like?
	err = apiserver.PrepareRun(ctx.Done())
	iferr ! =nil {
		return nil
	}
	
	// Run various Informers to synchronize resources and start kS-APiserver listening for requests
	return apiserver.Run(ctx)
}
Copy the code

S.newapiserver (ctx.done ()) creates an apiserver instance. Creating an apiserver instance also registers ks custom GVK to K8S through Scheme, exposing the API as the API request path.

The PrepareRun mainly uses the Resful-Go framework to integrate various sub-module proxy requests or integration services, exposing API capabilities as kapis request paths.

Apiserver.run (CTX) synchronizes resources and starts server listening.

The following are separate explanations.

NewAPIServer

The first is to bind various clients and Informers:

// Call the component's NewForConfig method to integrate clientset
kubernetesClient, err := k8s.NewKubernetesClient(s.KubernetesOptions)
iferr ! =nil {
	return nil. err } apiServer.KubernetesClient = kubernetesClient informerFactory := informers.NewInformerFactories(kubernetesClient.Kubernetes(), kubernetesClient.KubeSphere(),kubernetesClient.Istio(), kubernetesClient.Snapshot(), kubernetesClient.ApiExtensions(), kubernetesClient.Prometheus()) apiServer.InformerFactory = informerFactory ...Yaml or kubesphere-config Configmap to bind the KS client.Copy the code

After the initial binding, a server will be started to respond to the request, so a ADDR binding will be done:

. server := &http.Server{ Addr: fmt.Sprintf(":%d", s.GenericServerRunOptions.InsecurePort),
}

ifs.GenericServerRunOptions.SecurePort ! =0 {
	certificate, err := tls.LoadX509KeyPair(s.GenericServerRunOptions.TlsCertFile, s.GenericServerRunOptions.TlsPrivateKey)
	iferr ! =nil {
		return nil, err
	}

	server.TLSConfig = &tls.Config{
		Certificates: []tls.Certificate{certificate},
	}
	server.Addr = fmt.Sprintf(":%d", s.GenericServerRunOptions.SecurePort)
}

sch := scheme.Scheme
iferr := apis.AddToScheme(sch); err ! =nil {
	klog.Fatalf("unable add APIs to scheme: %v", err)
}
...
Copy the code

Note this step of apis.AddToScheme(SCH) to register the GVK we defined in K8S.

GVK, by the way, stands for Group,Version, Kind, for example:

{Group: "", Version: "v1", Resource: "namespaces"}
{Group: "", Version: "v1", Resource: "nodes"}
{Group: "", Version: "v1", Resource: "resourcequotas"}... {Group:"tenant.kubesphere.io", Version: "v1alpha1", Resource: "workspaces"}
{Group: "cluster.kubesphere.io", Version: "v1alpha1", Resource: "clusters"}...Copy the code

Scheme manages the relationship between GVK and Type. A GVK can only correspond to one reflect.Type, and one reflect.Type may correspond to multiple GVKS. In addition, Scheme converges Converter and Cloner to convert different versions of structures and get copies of their values. Limited space, interested children’s shoes can explore further.

Back to the main body, let’s look at how to inject scheme:

// AddToSchemes may be used to add all resources defined in the project to a Schemevar AddToSchemes runtime.SchemeBuilder
// AddToScheme adds all Resources to the Schemefunc 
AddToScheme(s *runtime.Scheme) error {	return AddToSchemes.AddToScheme(s)}
Copy the code

AddToSchemes is an alias to []func(*Scheme) error, which can be injected into Scheme by implementing init() in the interface file of package apis.

Here’s an example:

$ cat pkg/apis/addtoscheme_dashboard_v1alpha2.go
package apis
import monitoringdashboardv1alpha2 "kubesphere.io/monitoring-dashboard/api/v1alpha2"
func init() {	
  AddToSchemes = append(AddToSchemes, monitoringdashboardv1alpha2.SchemeBuilder.AddToScheme)
}
Copy the code

That is, we develop a plug-in versioning resources integration, must implement the XXX. SchemeBuilder. AddToScheme function, can be registered with the scheme, finally exposed for apis to access the API.

At this point, all submodule clients are bound to the Apiserver.

PrepareRun

Next, we’ll look at how PrepareRun registers kapis and binds handlers.

Mainly through restful- Go framework to achieve.

The rest-Go framework uses Containers to hold webServices that have a specific GVR. A Webserver can bind to multiple routers, allowing containers or WebServers to add custom interceptors. That is, call the filter method.

func (s *APIServer) PrepareRun(stopCh <-chan struct{}) error {
  // Container holds the WebService that has a specific GVR
	s.container = restful.NewContainer()
	// Add Request Request log interceptor
	s.container.Filter(logRequestAndResponse)
	s.container.Router(restful.CurlyRouter{})
	
	// Bind a log handler when Recover occurs
	s.container.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
		logStackOnRecover(panicReason, httpWriter)
	})
	
	// Each API group builds a WebService and binds the callback function according to the routing rules
  // Use AddToContainer to complete the binding
	s.installKubeSphereAPIs()
	
	// Register metrics: ks_server_request_total and ks_server_request_duration_seconds
	// Bind metrics handler
	s.installMetricsAPI()
	
	// Add the monitor count for valid requests
	s.container.Filter(monitorRequest)

	for _, ws := range s.container.RegisteredWebServices() {
		klog.V(2).Infof("%s", ws.RootPath())
	}
	
	s.Server.Handler = s.container
	
	// Add interceptors for each invocation chain for validation and route distribution
	s.buildHandlerChain(stopCh)

	return nil
}
Copy the code

The previous section uses the restful- Go framework to bind s.sever. handler to a Container and add interceptors.

Installing GVR binds to the Kapis agent in the s.installKubesphereapis () step, as follows:

// Call the AddToContainer method for each API group to register kAPI with the container:
urlruntime.Must(monitoringv1alpha3.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.MonitoringClient, s.MetricsClient, s.InformerFactory, s.KubernetesClient.KubeSphere(), s.Config.OpenPitrixOptions))

// Specifically, the AddToContainer method implemented by each component
// Add a route to the webserver with GroupVersion information. Bind different handlers to different routing paths
ws := runtime.NewWebService(GroupVersion)
// Bind the child route to the callback function
ws.Route(ws.GET("/kubesphere").
    To(h.handleKubeSphereMetricsQuery).
    Doc("Get platform-level metric data.").
    Metadata(restfulspec.KeyOpenAPITags, []string{constants.KubeSphereMetricsTag}).
    Writes(model.Metrics{}).
    Returns(http.StatusOK, respOK, model.Metrics{})).
    Produces(restful.MIME_JSON)
Copy the code

We know that apis correspond to K8S requests, whereas in KS kapis correspond to sub-component proxy requests, which are responded by KS-Apiserver itself or the forwarding target component server. How does KS-Apiserver distinguish these requests?

The answer is distributed via buildHandlerChain.

buildHandlerChain

As mentioned above, buildHandlerChain builds interceptors for various services, listed in the following order.

handler = filters.WithKubeAPIServer(handler, s.KubernetesClient.Config(), &errorResponder{})

if s.Config.AuditingOptions.Enable {
	handler = filters.WithAuditing(handler,
		audit.NewAuditing(s.InformerFactory, s.Config.AuditingOptions, stopCh))
}

handler = filters.WithAuthorization(handler, authorizers)
ifs.Config.MultiClusterOptions.Enable { clusterDispatcher := dispatch.NewClusterDispatch(s.InformerFactory.KubeSphereSharedInformerFactory().Cluster().V1alpha1().Clusters()) handler  = filters.WithMultipleClusterDispatcher(handler, clusterDispatcher) } handler = filters.WithAuthentication(handler, authn) handler = filters.WithRequestInfo(handler, requestInfoResolver)Copy the code

The WithRequestInfo filter defines the following logic:

info, err := resolver.NewRequestInfo(req)
---
func (r *RequestInfoFactory) NewRequestInfo(req *http.Request) (*RequestInfo, error){...defer func(a) {
		prefix := requestInfo.APIPrefix
		if prefix == "" {
			currentParts := splitPath(requestInfo.Path)
			//Proxy discovery API
			if len(currentParts) > 0 && len(currentParts) < 3 {
				prefix = currentParts[0]}}// Can be distinguished by carrying apis or kapis in the API routing path
		if kubernetesAPIPrefixes.Has(prefix) {
			requestInfo.IsKubernetesRequest = true}} ()...// URL forms: /clusters/{cluster}/*
	if currentParts[0] = ="clusters" {
		if len(currentParts) > 1 {
			requestInfo.Cluster = currentParts[1]}if len(currentParts) > 2 {
			currentParts = currentParts[2:]}}... }Copy the code

A lot of code, I will not a screenshot, the general meaning can be seen from the comment:

// NewRequestInfo returns the information from the http request. If error is not nil, RequestInfo holds the information as best it is known before the failure
// It handles both resource and non-resource requests and fills in all the pertinent information for each.
// Valid Inputs:
//
// /apis/{api-group}/{version}/namespaces
// /api/{version}/namespaces
// /api/{version}/namespaces/{namespace}
// /api/{version}/namespaces/{namespace}/{resource}
// /api/{version}/namespaces/{namespace}/{resource}/{resourceName}
// /api/{version}/{resource}
// /api/{version}/{resource}/{resourceName}
//
// Special verbs without subresources:
// /api/{version}/proxy/{resource}/{resourceName}
// /api/{version}/proxy/namespaces/{namespace}/{resource}/{resourceName}
//
// Special verbs with subresources:
// /api/{version}/watch/{resource}
// /api/{version}/watch/namespaces/{namespace}/{resource}
//
// /kapis/{api-group}/{version}/workspaces/{workspace}/{resource}/{resourceName}
// /
// /kapis/{api-group}/{version}/namespaces/{namespace}/{resource}
// /kapis/{api-group}/{version}/namespaces/{namespace}/{resource}/{resourceName}
// With workspaces:
// /kapis/clusters/{cluster}/{api-group}/{version}/namespaces/{namespace}/{resource}
// /kapis/clusters/{cluster}/{api-group}/{version}/namespaces/{namespace}/{resource}/{resourceName}
Copy the code

With the information defined by the route, you can distinguish the level of the request and the server to which the request is sent.

Let’s add breakpoints to the callbacks of each filter, and then do a little experiment to see what order interceptors go in.

Assume that the remote cloud host service is enabled on port 9090, and that you have set ClusterDashboard access to the Monitoring. Kubesphere. IO resource group for anonymous. Of course, you can also test directly with an account with access rights.

Next, let’s send a kapis request to see how the link jumps:

curl -d '{"grafanaDashboardUrl":"https://grafana.com/api/dashboards/7362/revisions/5/download", "description":"this is a test dashboard."}' -H "Content-Type: application/json" localhost:9090/kapis/monitoring.kubesphere.io/v1alpha3/clusterdashboards/test1/template
Copy the code

The test results are as follows:

WithRequestInfo -> WithAuthentication -> WithAuthorization -> WithKubeAPIServer
Copy the code

Run

This method does two main things, one is to start informers to synchronize resources, the other is to start ks Apiserver.

func (s *APIServer) Run(ctx context.Context) (err error) {
  // Start the Informer factory, including informers for K8S and KS
	// Synchronize resources, including K8S and KS GVR
	// Check whether GVR exists. If there is no error warning, synchronize if there is
	err = s.waitForResourceSync(ctx)
	iferr ! =nil {
		return err
	}

	shutdownCtx, cancel := context.WithCancel(context.Background())
	defer cancel()

	go func(a) {
		<-ctx.Done()
		_ = s.Server.Shutdown(shutdownCtx)
	}()
	
	/ / start the server
	klog.V(0).Infof("Start listening on %s", s.Server.Addr)
	ifs.Server.TLSConfig ! =nil {
		err = s.Server.ListenAndServeTLS(""."")}else {
		err = s.Server.ListenAndServe()
	}

	return err
}
Copy the code

At this point, ks-apiserver is started after the Run method is called.

Now let’s do a quick summary:

  • From the configuration file, an instance of KS-Apiserver is created that calls three key methods, NewAPIServer, PrepareRun, and Run.
  • NewAPIServer binds the clients of each module with a given configuration, registers the customized GVK with Scheme, and exposes apis routing services.
  • PrepareRun uses the Resting-Go framework to register and bind KAPI routes and callback functions for their own response or to send component Server query merge data back to clients.
  • Finally, the Run method is called to synchronize resources and start the KS-APiserver service.

GVK explores combat

Obviously, we just need to focus on the AddToContainer method for each module.

iam.kubesphere.io

pkg/kapis/iam/v1alpha2/register.go

From the code comments, This module manages Users, ClusterMembers, GlobalRoles, ClusterRoles, Workspaceroles, Roles, Workspaces Groups, Workspace Members, and DevOps CRUD of account roles such as members.

We can now set breakpoints in the handler to request these apis.

$ curl "localhost:9090/kapis/iam.kubesphere.io/v1alpha2/users"
$ curl "localhost:9090/kapis/iam.kubesphere.io/v1alpha2/clustermembers"
$ curl "localhost:9090/kapis/iam.kubesphere.io/v1alpha2/users/admin/globalroles".Copy the code

kubeedge.kubesphere.io

pkg/kapis/kubeedge/v1alpha1/register.go

The proxy forwarding request used in the code:

func AddToContainer(container *restful.Container, endpoint string) error {
	proxy, err := generic.NewGenericProxy(endpoint, GroupVersion.Group, GroupVersion.Version)
	iferr ! =nil {
		return nil
	}

	return proxy.AddToContainer(container)
}
Copy the code

Namely kapis/kubeedge. Kubesphere. IO request will be forwarded to the edge – watcher. Kubeedge. SVC/API /, namely kubeedge service under the namespace, where the relevant interface integration.

As for the integration of edge computing platform, in addition to the rapid installation and integration of mainstream edge framework, an adaptor similar to Edge-Shim can also be integrated, which should be considered from the following aspects:

  • Proxy endpoint: Kubeedge now uses proxy mode forwarding;
  • Health check interface: At a minimum, ensure that components in the cloud have been successfully deployed;
  • Support for observables such as events, long-term logging, and auditing;
  • Other edge assist functions, such as file or configuration delivery, etc.

notification.kubesphere.io

pkg/kapis/notification/v2beta1/register.go

The API under this group mainly implements CRUD for the Global or tenant-level Config and Receivers resources of the Notification.

The config resource

This section describes how to configure parameters related to the notification channel, including global and tenant level Config resources.

Reciever resources

Some configuration information for configuring receivers to distinguish between global and tenant level receivers;

Let’s pick a callback function to parse:

ws.Route(ws.GET("/{resources}").
		To(h.ListResource).
		Doc("list the notification configs or receivers").
		Metadata(KeyOpenAPITags, []string{constants.NotificationTag}).
		Param(ws.PathParameter("resources"."known values include configs, receivers, secrets")).
		Param(ws.QueryParameter(query.ParameterName, "name used for filtering").Required(false)).
		Param(ws.QueryParameter(query.ParameterLabelSelector, "label selector used for filtering").Required(false)).
		Param(ws.QueryParameter("type"."config or receiver type, known values include dingtalk, email, slack, webhook, wechat").Required(false)).
		Param(ws.QueryParameter(query.ParameterPage, "page").Required(false).DataFormat("page=%d").DefaultValue("page=1")).
		Param(ws.QueryParameter(query.ParameterLimit, "limit").Required(false)).
		Param(ws.QueryParameter(query.ParameterAscending, "sort parameters, e.g. ascending=false").Required(false).DefaultValue("ascending=false")).
		Param(ws.QueryParameter(query.ParameterOrderBy, "sort parameters, e.g. orderBy=createTime")).
		Returns(http.StatusOK, api.StatusOK, api.ListResult{Items: []interface{} {}}))func (h *handler) ListResource(req *restful.Request, resp *restful.Response) {
	// The name of the tenant or user
	user := req.PathParameter("user")
	/ / the resource type, configs/recievers/secrets
	resource := req.PathParameter("resources")
	/ / notification channel dingtalk/slack/email/webhook/wechat
	subresource := req.QueryParameter("type")
	q := query.ParseQueryParameter(req)

	if! h.operator.IsKnownResource(resource, subresource) { api.HandleBadRequest(resp, req, servererr.New("unknown resource type %s/%s", resource, subresource))
		return
	}

	objs, err := h.operator.List(user, resource, subresource, q)
	handleResponse(req, resp, objs, err)
}
Copy the code

Let’s look at the logic of the list object:

// List objects.
func (o *operator) List(user, resource, subresource string, q *query.Query) (*api.ListResult, error) {
	if len(q.LabelSelector) > 0 {
		q.LabelSelector = q.LabelSelector + ","
	}

	filter := ""
	// If no tenant name is given, the global object is obtained
	if user == "" {
		if isConfig(o.GetObject(resource)) {
		    // type=default is global to config resources
			filter = "type=default"
		} else {
		    // type=global Is global to receiever resources
			filter = "type=global"}}else {
	// Otherwise, bind the tenant name to the filter
		filter = "type=tenant,user=" + user
	}
	// Assemble filter labels
	q.LabelSelector = q.LabelSelector + filter
	...
	// Obtain the specified resources under the cluster or namespace by filtering labels
	res, err := o.resourceGetter.List(resource, ns, q)
	iferr ! =nil {
		return nil, err
	}

	if subresource == "" || resource == Secret {
		return res, nil
	}

	results := &api.ListResult{}
    ...
}
Copy the code

In this way, CRUD is implemented for tenant level notification and alarm CR configurations, which are classified as follows:

  • The value of config is global type = default and tenant type = tenant.
  • Reciever has two levels: Global type = Global and Tenant type = Tenant.

How do config and Reciever bind to each other and how do alarms send messages to tenants through channels?

Github.com/kubesphere/…

Github.com/kubesphere/…

Notification-manager, or NM, is briefly answered out of context.

Functional aspects:

  • The global configuration reciever sends all alerts to its defined recipient list through the configured channel. The reciever configured with tenant information can only send alerts under the current NS through the channel.
  • Reciever can further filter alarm messages by configuring the alertSelector parameter.
  • You can customize the sending message template by modifying the Confimap named notification-manager-template.

Process of alarm to notification:

  • Nm receives alarms from the Alertmanager using port 19093 and the API path/API /v2/ Alerts.
  • The callback function converts alerts into Notification template data and distinguishes alarm data by namespace.
  • All Recievers are iterated, starting one coroutine per ns to send messages, where each NS corresponds to multiple notification channels, so waitGroup is also used to orchestrate tasks concurrently;

monitoring.kubesphere.io

pkg/kapis/monitoring/v1alpha3/register.go

Monitoring indicators can be divided into platform level, node level, workspaces, level of namespaces, the pods, and so on, can not only obtain the general statistics, also can get all the pods of nodes/under the namespaces/workspaces/containers such as monitoring indicators.

Let’s look at the callback function, using handleNamedMetricsQuery for example:

  • The legitimate metric index at the given metric level is traversed, and the metric name is filtered according to metricFilter in the request parameters.
  • Determine whether it is a range query or a real-time query to retrieve the relevant methods in the Monitoring package, and obtain the results through the corresponding client request backend.

The code is as follows:

func (h handler) handleNamedMetricsQuery(resp *restful.Response, q queryOptions) {
	var res model.Metrics

	var metrics []string
	// Q.namedMetrics is a complete array of metrics defined by promSQL EXPR, sorted by monitoring metrics level
	// Monitor indicator levels are subdivided in the previous stack based on monitoring.Levelxxx, i.e. : Monitoring.LevelPod
	for _, metric := range q.namedMetrics {
		if strings.HasPrefix(metric, model.MetricMeterPrefix) {
			// skip meter metric
			continue
		}
		// Filter by the indicator name in the request parameter
		ok, _ := regexp.MatchString(q.metricFilter, metric)
		if ok {
			metrics = append(metrics, metric)
		}
	}
	if len(metrics) == 0 {
		resp.WriteAsJson(res)
		return
	}
	
	// Determine whether it is a range query or a real-time query, and continue to call the relevant function
	Prometheus client is used primarily to query promsql, while metrics of edge nodes are currently queried through metrics Server
	if q.isRangeQuery() {
		res = h.mo.GetNamedMetricsOverTime(metrics, q.start, q.end, q.step, q.option)
	} else {
		res = h.mo.GetNamedMetrics(metrics, q.time, q.option)
		if q.shouldSort() {
			res = *res.Sort(q.target, q.order, q.identifier).Page(q.page, q.limit)
		}
	}
	resp.WriteAsJson(res)
}
Copy the code

Now, let’s transplant the perspective to:

pkg/models/monitoring/monitoring.go:156

GetNamedMetricsOverTime, for example, returns a combination of Prometheus and metrics-server query results:

func (mo monitoringOperator) GetNamedMetricsOverTime(metrics []string, start, end time.Time, step time.Duration, opt monitoring.QueryOption) Metrics {
    // Obtain Prometheus Client query results, mainly using sync.WaitGroup, starting a Goroutine for each indicator, and then summing and returning the results
	ress := mo.prometheus.GetNamedMetricsOverTime(metrics, start, end, step, opt)
	// If metrics-server is enabled
	ifmo.metricsserver ! =nil {

		// Merge edge node data
		edgeMetrics := make(map[string]monitoring.MetricData)

		for i, ressMetric := range ress {
			metricName := ressMetric.MetricName
			ressMetricValues := ressMetric.MetricData.MetricValues
			if len(ressMetricValues) == 0 {
				// this metric has no prometheus metrics data
				if len(edgeMetrics) == 0 {
					// start to request monintoring metricsApi data
					mr := mo.metricsserver.GetNamedMetricsOverTime(metrics, start, end, step, opt)
					for _, mrMetric := range mr {
						edgeMetrics[mrMetric.MetricName] = mrMetric.MetricData
					}
				}
				if val, ok := edgeMetrics[metricName]; ok {
					ress[i].MetricData.MetricValues = append(ress[i].MetricData.MetricValues, val.MetricValues...) }}}}return Metrics{Results: ress}
}
Copy the code

In addition, the Monitoring package defines interface methods for each monitoring query client, which can be explored on demand:

  • GetMetric(expr string, time time.Time) Metric

  • GetMetricOverTime(expr string, start, end time.Time, step time.Duration) Metric

  • GetNamedMetrics(metrics []string, time time.Time, opt QueryOption) []Metric

  • GetNamedMetricsOverTime(metrics []string, start, end time.Time, step time.Duration, opt QueryOption) []Metric

  • GetMetadata(namespace string) []Metadata

  • GetMetricLabelSet(expr string, start, end time.Time) []map[string]string

tenant.kubesphere.io

Before we talk more about apis, we can classify them into Soft multi-Tenancy and Hard multi-Tenancy in terms of the security of isolation for multiple tenants.

  • Soft isolation is more for the demand of multi-tenancy within the enterprise;
  • Hard isolation is more targeted at external service providers, requiring stricter isolation as a security guarantee.

The more important part of this group is implementing tenant queries logs/audits/events:

The following uses querying logs as an example:

func (h *tenantHandler) QueryLogs(req *restful.Request, resp *restful.Response) {
    // Query the tenant information carried in this context
	user, ok := request.UserFrom(req.Request.Context())
	if! ok { err := fmt.Errorf("cannot obtain user info")
		klog.Errorln(err)
		api.HandleForbidden(resp, req, err)
		return
	}
	/ / parse the query parameters, such as to determine which belongs to the ns/workload/pod/container inquiry, time period, whether for columnar query, etc
	queryParam, err := loggingv1alpha2.ParseQueryParameter(req)
	iferr ! =nil {
		klog.Errorln(err)
		api.HandleInternalError(resp, req, err)
		return
	}
	// Export data
	if queryParam.Operation == loggingv1alpha2.OperationExport {
		resp.Header().Set(restful.HEADER_ContentType, "text/plain")
		resp.Header().Set("Content-Disposition"."attachment")
		// Verify that the account has permissions
		// The admin account can export all NS logs. The tenant can export only this NS log
		// Assemble loggingClient to export logs
		err := h.tenant.ExportLogs(user, queryParam, resp)
		iferr ! =nil {
			klog.Errorln(err)
			api.HandleInternalError(resp, req, err)
			return}}else {
		// Verify that the account has permissions
		// The admin account can view all NS logs. Tenants can view only this NS log
		// Assemble loggingClient for log return
		result, err := h.tenant.QueryLogs(user, queryParam)
		iferr ! =nil {
			klog.Errorln(err)
			api.HandleInternalError(resp, req, err)
			return
		}
		resp.WriteAsJson(result)
	}
}
Copy the code

Due to the limited space, only the above GVR debugging, interested in in-depth understanding ~

This article is published by OpenWrite!