The author
Li Tengfei, Tencent container technology RESEARCH and development engineer, Tencent Cloud TKE background research and development, SuperEdge core development member.
Du Yanghao, a senior engineer at Tencent Cloud, is passionate about open source, containers and Kubernetes. Currently, he is mainly engaged in the research and development of image warehouse, Kubernetes cluster high availability & backup and restore, and edge computing.
SuperEdge introduction
SuperEdge is a Kubernetes native edge container solution that extends the powerful container management capabilities of Kubernetes to edge computing scenarios, providing solutions to common technical challenges in edge computing scenarios, such as: The single cluster node is across the region, the cloud edge network is unreliable, and the edge node is located on the NAT network. These abilities can make applications easily deployed to the edge on the compute nodes, and reliable operation, can help you to easily put distributed computing resources everywhere in a Kubernetes cluster management, including but not limited to: edge of cloud computing resources, private cloud resources, field devices, the edges of the building belong to you the PaaS platform. SuperEdge supports all Kubernetes resource types, apis, usage modes, and o&M tools at no additional learning cost. It is also compatible with other cloud native projects, such as Promethues, which can be combined with other cloud native projects. The project is co-sponsored by the following companies: Tencent, Intel, VMware, Huya Live, Cambrian, Capital Online and Meituan.
Architecture and principle of cloud side tunnel
In edge scenarios, there are many one-way networks, that is, only edge nodes can actively access the cloud. The cloud-side tunnel is mainly used to proxy the cloud to access the edge node components, solving the problem that the cloud cannot directly access the edge node.
The architecture diagram is as follows:
The implementation principle is as follows:
-
On the edge node, tunnel-edge actively connects to the tunnel-cloud service, and the tunnel-cloud Service forwards requests to the tunnel-cloud Pod based on the load balancing policy
-
After a gRPC connection is established between tunnel-edge and tunnel-cloud, tunnel-cloud writes the mapping of its podIp and nodeName of the node where tunnel-edge resides to tunnel-DNS. When the gRPC connection is disconnected, tunnel-cloud will delete the mapping between the relevant podIp and the node name
The proxy forwarding process of the whole request is as follows:
-
When an Apiserver or another cloud application accesses a Kubelet or another application on an edge node, Tunnel-dns forwards the Request to the POD of tunnel-cloud through DNS hijacking, which resolves the node name in the host of the HTTP Request to the podIp of the tunnel-cloud
-
Tunnel-cloud forwards the request information to the gRPC connection established with tunnel-edge based on the node name
-
Tunnel-edge Requests applications on edge nodes based on the received request information
Data exchange within the Tunnel module
After configuring the Tunnel, the following describes the internal data flow of the Tunnel:
The figure above marks the HTTPS proxy data flow. The TCP proxy data flow is similar to the HTTPS data flow. The key steps are:
-
HTTPS Server -> StreamServer: The HTTPS Server sends the StreamMsg to the Stream Server through a Channel, which is the node.Channel obtained from the nodeContext based on the Streammsg. Node field
-
StreamServer -> StreamClient: Each cloud-side tunnel allocates a Node object. Sending StreamClient to a node Channel sends data to StreamClient
-
StreamServer -> HTTPS Server: StreamServer sends the StreamMsg to the HTTPS Server via a Channel that gets Node from nodeContext based on Streammsg. Node. Obtain the CONN. Channel of the HTTPS module by matching streammsg. Topic with conn.uid
Both nodeContext and connContext manage connections. However, the life cycles of connections (TCP and HTTPS) managed by nodeContext and connContext are different, so they need to be managed separately
Tunnel connection management
Connections managed by a Tunnel can be classified into low-level connections (gRPC connections in the cloud Tunnel) and upper-layer application connections (HTTPS connections and TCP connections). Connection exceptions can be managed in the following scenarios:
The gRPC connection is normal, but the upper-layer connection is abnormal
Take HTTPS connection as an example. If the HTTPS Client of tunnel-edge is abnormally disconnected from the Server of the edge node, a StreamMsg (streammsg. Type=CLOSE) message is sent. After receiving the StreamMsg message, tunnel-cloud closes the connection between the HTTPS Server and the HTTPS Client.
The gRPC connection is abnormal
If the gRPC connection is abnormal, the Stream module will send StreamMsg(streammsg. Type=CLOSE) to the HTTPS and TCP modules based on the Node.connContext bound to the gPRC connection. The HTTPS or TCP module disconnects the connection after receiving the message.
Stream (gRPC Cloud Side Tunnel)
func (stream *Stream) Start(mode string) { context.GetContext().RegisterHandler(util.STREAM_HEART_BEAT, util.STREAM, streammsg.HeartbeatHandler) if mode == util.CLOUD { ... // Start gRPC server go connect.startServer ()... Go connect.syncoreFile ()} else {// Start gRPC Client go connect.startsendClient ()... }... }Copy the code
HeartbeatHandler SynCorefile synchronizer tunnel-cloud synchronizer tunnel-coreDNS configuration file CheckHosts is executed every one minute (taking into account the time it takes configMap to synchronize tunnel-cloud’s pod mount files) as follows:
func SynCorefile() {
for {
...
err := coreDns.checkHosts()
...
time.Sleep(60 * time.Second)
}
}
Copy the code
CheckHosts is responsible for the configMap refresh:
func (dns *CoreDns) checkHosts() error { nodes, flag := parseHosts() if ! flag { return nil } ... _, err = dns.ClientSet.CoreV1().ConfigMaps(dns.Namespace).Update(cctx.TODO(), cm, metav1.UpdateOptions{}) ... }Copy the code
CheckHosts first calls parseHosts to obtain the local hosts file edge node name and the corresponding tunnel-cloud podIp mapping list, and compares the podIp corresponding node name with the node name in memory. If there are changes, write this overwrite to configMap and update:
In addition, tunnel-cloud introduces the ConfigMap local mount file for: optimizationHosted modeWhen multiple clusters synchronize tunnel-CoreDNS at the same time
Tunnel-edge first calls StartClient to establish a gRPC connection with tunnel-edge and returns grPC.clientConn
func StartClient() (*grpc.ClientConn, ctx.Context, ctx.CancelFunc, error) { ... opts := []grpc.DialOption{grpc.WithKeepaliveParams(kacp), grpc.WithStreamInterceptor(ClientStreamInterceptor), grpc.WithTransportCredentials(creds)} conn, err := grpc.Dial(conf.TunnelConf.TunnlMode.EDGE.StreamEdge.Client.ServerName, opts...) . }Copy the code
In the call GRPC. Dial will transfer GRPC. WithStreamInterceptor ClientStreamInterceptor DialOption, ClientStreamInterceptor is passed to grPC.ClientConn as a StreamClientInterceptor, waits for the GRPC connection state to become Ready, and then executes the Send function. StreamClient. TunnelStreaming call StreamClientInterceptor return wrappedClientStream object
func ClientStreamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ... grpc.CallOption) (grpc.ClientStream, error) { ... opts = append(opts, grpc.PerRPCCredentials(oauth.NewOauthAccess(&oauth2.Token{AccessToken: clientToken}))) ... return newClientWrappedStream(s), nil }Copy the code
ClientStreamInterceptor constructs the edge node name and token as oauth2.token.AccessToken for authentication, and builds wrappedClientStream
Stream. Will Send wrappedClientStream concurrent calls. SendMsg and wrappedClientStream. Send and accept the RecvMsg respectively used for tunnel – edge, and blocks to wait
Note: Tunnel-edge registers node information with tunnel-cloud when the gRPC Stream is set up, not when grpc.connClient is created
The whole process is shown in the figure below:
Accordingly, when tunnel-cloud is initialized, grPC.streamInterceptor (ServerStreamInterceptor) is built into GRPC ServerOption, Pass ServerStreamInterceptor to grpc.Server as StreamServerInterceptor:
func StartServer() {
...
opts := []grpc.ServerOption{grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp), grpc.StreamInterceptor(ServerStreamInterceptor), grpc.Creds(creds)}
s := grpc.NewServer(opts...)
proto.RegisterStreamServer(s, &stream.Server{})
...
}
Copy the code
The cloud gRPC service calls ServerStreamInterceptor when it receives a tunnel-Edge request to set up a Stream. ServerStreamInterceptor will parse the gRPC metadata from the edge node name and token corresponding to the gRPC connection, and verify the token. The handler function will call stream.tunnelStreaming, and wrappedServerStream will be used as a handler to communicate with the edge node. And pass it wrappedServerStream (which implements the proto.Stream_TunnelStreamingServer interface).
func ServerStreamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { md, ok := metadata.FromIncomingContext(ss.Context()) ... tk := strings.TrimPrefix(md["authorization"][0], "Bearer ") auth, err := token.ParseToken(tk) ... if auth.Token ! = token.GetTokenFromCache(auth.NodeName) { klog.Errorf("invalid token node = %s", auth.NodeName) return ErrInvalidToken } err = handler(srv, newServerWrappedStream(ss, auth.NodeName)) if err ! = nil { ctx.GetContext().RemoveNode(auth.NodeName) klog.Errorf("node disconnected node = %s err = %v", auth.NodeName, err) } return err }Copy the code
When the TunnelStreaming method exits, ServerStreamInterceptor is required to remove the node’s logic ctx.getContext ().removenode
Will TunnelStreaming concurrent invocations wrappedServerStream. SendMsg and wrappedServerStream. Send and accept the RecvMsg respectively used for tunnel – cloud, and blocks waiting for:
func (s *Server) TunnelStreaming(stream proto.Stream_TunnelStreamingServer) error {
errChan := make(chan error, 2)
go func(sendStream proto.Stream_TunnelStreamingServer, sendChan chan error) {
sendErr := sendStream.SendMsg(nil)
...
sendChan <- sendErr
}(stream, errChan)
go func(recvStream proto.Stream_TunnelStreamingServer, recvChan chan error) {
recvErr := stream.RecvMsg(nil)
...
recvChan <- recvErr
}(stream, errChan)
e := <-errChan
return e
}
Copy the code
SendMsg will receive StreamMsg from the edge node corresponding to wrappedServerStream and call serverstream. SendMsg to send the message to tunnel-edge
func (w *wrappedServerStream) SendMsg(m interface{}) error { if m ! = nil { return w.ServerStream.SendMsg(m) } node := ctx.GetContext().AddNode(w.node) ... for { msg := <-node.NodeRecv() ... err := w.ServerStream.SendMsg(msg) ... }}Copy the code
RecvMsg will continuously receive StreamMsg from tunny-edge and call StreamMsg. The corresponding handler does the operation
Summary:
- Stream module is responsible for establishing gRPC connection and communication (cloud side tunnel)
- On the edge node, tunnel-edge connects to the tunnel-cloud service on the cloud, and the tunnel-cloud Service forwards requests to the tunnel-cloud Pod based on the load balancing policy
- After a gRPC connection is established between tunnel-edge and tunnel-cloud, tunnel-cloud writes the mapping of its podIp and nodeName of the node where tunnel-edge resides to tunnel-coreDNS. When the gRPC connection is disconnected, tunnel-cloud will delete the mapping between the relevant podIp and the node name
- Tunnel-edge uses the edge node name and token to build a gRPC connection, while tunnel-cloud analyzes the edge node corresponding to the gRPC connection through authentication information. And build a wrappedServerStream for each edge node for processing (the same tunnel-cloud can handle multiple tunnel-edge connections).
- Tunnel-cloud synchronizes the edge node name and name to the configmap configuration file of the hosts plug-in of tunnel-coreDNS every one minute (considering the time for configmap to synchronize the pod mount file of tunnel-cloud) Tunnel-cloud podIp mapping; In addition, the configMap local mount file is introduced to optimize the performance of tunnel-CoreDNS synchronization for multiple clusters in managed mode
- Tunnel-edge sends the normal heartbeat StreamMsg to tunnel-cloud every one minute, and tunnel-Cloud responds after receiving the heartbeat (the heartbeat is to detect whether the gRPC Stream is normal).
- StreamMsg includes different types of messages such as heartbeat, TCP proxy, and HTTPS requests. At the same time, tunnel-cloud uses context.node to distinguish connecting tunnels with gRPC of different edge nodes
The HTTPS proxy
The HTTPS module is responsible for establishing the HTTPS proxy on the cloud side and forwarding the HTTPS requests from the cloud component (such as kube-Apiserver) to the side service (such as Kubelet).
func (https *Https) Start(mode string) { context.GetContext().RegisterHandler(util.CONNECTING, util.HTTPS, httpsmsg.ConnectingHandler) context.GetContext().RegisterHandler(util.CONNECTED, util.HTTPS, httpsmsg.ConnectedAndTransmission) context.GetContext().RegisterHandler(util.CLOSED, util.HTTPS, httpsmsg.ConnectedAndTransmission) context.GetContext().RegisterHandler(util.TRANSNMISSION, util.HTTPS, httpsmsg.ConnectedAndTransmission) if mode == util.CLOUD { go httpsmng.StartServer() }}
Copy the code
The Start function first registers StreamMsg’s handler, where the CLOSED handler handles the connection closing message and starts the HTTPS Server. When the cloud component sends an HTTPS request to tunnel-cloud, serverHandler first parses the node name from the request.Host field. If a TLS connection is established first, then the HTTP request object is written into the connection. In this case, request.Host may not be set, and the node name must be resolved from request.tls. ServerName. The HTTPS Server reads request.Body and request.Header to build the HttpsMsg structure and serializes it into StreamMsg. Send the StreamMsg through Send2Node, put it into the node Channel corresponding to Streammsg. node, and send it to tunnel-edge by Stream module
func (serverHandler *ServerHandler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { var nodeName string nodeinfo := strings.Split(request.Host, ":") if context.GetContext().NodeIsExist(nodeinfo[0]) { nodeName = nodeinfo[0] } else { nodeName = request.TLS.ServerName } ... Node. Send2Node (StreamMsg)}Copy the code
Tunnel-edge receives StreamMsg and calls ConnectingHandler to handle it:
func ConnectingHandler(msg *proto.StreamMsg) error { go httpsmng.Request(msg) return nil}func Request(msg *proto.StreamMsg) { httpConn, err := getHttpConn(msg) ... rawResponse := bytes.NewBuffer(make([]byte, 0, util.MaxResponseSize)) rawResponse.Reset() respReader := bufio.NewReader(io.TeeReader(httpConn, rawResponse)) resp, err := http.ReadResponse(respReader, nil) ... node.BindNode(msg.Topic) ... if resp.StatusCode ! = http.StatusSwitchingProtocols { handleClientHttp(resp, rawResponse, httpConn, msg, node, conn) } else { handleClientSwitchingProtocols(httpConn, rawResponse, msg, node, conn) }}Copy the code
ConnectingHandler calls Request to process the StreamMsg. Reqeust first establishes a TLS connection with the edge node Server through getHttpConn. Parse the data returned by the TLS connection to get the HTTP Response, Status Code is 200, send the Response content to the tunnel-cloud, Status Code is 101, The binary data of the Response read from the TLS connection is sent to tunnel-cloud, where streammsg. Type is CONNECTED.
Upon receipt of the StreamMsg, Tunnel-Cloud calls ConnectedAndTransmission for processing:
func ConnectedAndTransmission(msg *proto.StreamMsg) error { conn := context.GetContext().GetConn(msg.Topic) ... conn.Send2Conn(msg) return nil}
Copy the code
The conn is obtained from MSG.Topic(conn UID) and the message is inserted into the pipe corresponding to the conn through Send2Conn
After the CLOUD HTTPS Server receives the CONNECTED message from the cloud, it considers that the HTTPS agent is successfully established. And continue to perform handleClientHttp or handleClientSwitchingProtocols for data transmission, not only analysis handleClientHttp agreement enhance the data transmission process, The processing logic of the HTTPS Client is as follows:
func handleClientHttp(resp *http.Response, rawResponse *bytes.Buffer, httpConn net.Conn, msg *proto.StreamMsg, node context.Node, conn context.Conn) { ... go func(read chan *proto.StreamMsg, response *http.Response, buf *bytes.Buffer, stopRead chan struct{}) { rrunning := true for rrunning { bbody := make([]byte, util.MaxResponseSize) n, err := response.Body.Read(bbody) respMsg := &proto.StreamMsg{ Node: msg.Node, Category: msg.Category, Type: util.CONNECTED, Topic: msg.Topic, Data: bbody[:n], } ... read <- respMsg } ... }(readCh, resp, rawResponse, stop) running := true for running { select { case cloudMsg := <-conn.ConnRecv(): ... case respMsg := <-readCh: ... node.Send2Node(respMsg) ... }}... }Copy the code
Here the handleClientHttp will always try to read packets from the side component and send them to the tunnel-Cloud as a StreamMsg of TRANSNMISSION type. After receiving StreamMsg, tunnel-cloud calls the ConnectedAndTransmission function and puts StreamMsg in the conn.Channel of the HTTPS module corresponding to Streammsg. Type
func handleServerHttp(rmsg *HttpsMsg, writer http.ResponseWriter, request *http.Request, node context.Node, conn context.Conn) { for k, v := range rmsg.Header { writer.Header().Add(k, v) } flusher, ok := writer.(http.Flusher) if ok { running := true for running { select { case <-request.Context().Done(): ... case msg := <-conn.ConnRecv(): ... _, err := writer.Write(msg.Data) flusher.Flush() ... }}... }Copy the code
HandleServerHttp, after receiving StreamMsg, sends MSG.Data, which is a packet from the side component, to the cloud component. The whole data flow is one-way from the edge to the cloud, as shown below:
For a request like Kubectl exec, the data flow is bidirectional. At this time, the edge component (Kubelet) will return a packet with StatusCode 101, indicating protocol promotion. After the tunnel – cloud and tunnel – edge will be cut to handleServerSwitchingProtocols respectively and handleClientSwitchingProtocols to HTTPS The underlying connection reads and writes, completing two-way transmission of data streams.
The architecture is as follows:
The HTTPS module is summarized as follows:
summary
- HTTPS: Responsible for the establishment of cloud side HTTPS proxy (eg: cloud kube-apiserver <-> kubelet) and data transmission
- Tunnel-cloud reads the name of the edge node carried in the HTTPS request of the cloud component and attempts to establish an HTTPS proxy with the edge node. Rather than randomly selecting a cloud-side tunnel for forwarding like a TCP proxy
- When the cloud apiserver or other cloud applications access the Kubelet or other applications on the edge node, tunnel-DNS forwards the Request through DNS hijacking (the node name in the Request host is resolved to the podIp of the tunnel-cloud) On the POD of tunnel-cloud,tunnel-cloud encapsulates the request information as StreamMsg and sends it to tunnel-edge through the cloud-side tunnel corresponding to the node name. Tunnel-edge receives the StreamMsg Addr Field and the certificate in the configuration file establish a TLS connection to the edge Server and write the request information from the StreamMsg to the TLS connection. Tunnel-edge reads the return data from the edge Server from the TLS connection, encapsulates it as StreamMsg and sends it to tunnel-cloud. Tunnel-cloud writes the received data to the connection established between the cloud component and tunnel-cloud.
TCP
TCP module is responsible for establishing a TCP proxy tunnel between cloud control cluster and edge independent cluster in multi-cluster management:
func (tcp *TcpProxy) Start(mode string) { context.GetContext().RegisterHandler(util.TCP_BACKEND, tcp.Name(), tcpmsg.BackendHandler) context.GetContext().RegisterHandler(util.TCP_FRONTEND, tcp.Name(), tcpmsg.FrontendHandler) context.GetContext().RegisterHandler(util.CLOSED, tcp.Name(), tcpmsg.ControlHandler) if mode == util.CLOUD { ... for front, backend := range Tcp.Addr { go func(front, backend string) { ln, err := net.Listen("tcp", front) ... for { rawConn, err := ln.Accept() .... fp := tcpmng.NewTcpConn(uuid, backend, node) fp.Conn = rawConn fp.Type = util.TCP_FRONTEND go fp.Write() go fp.Read() } }(front, backend) } }
Copy the code
The Start function first registers the handler function of StreamMsg, where the CLOSED handler mainly processes the connection closing message, and then starts the TCP Server in the cloud. After receiving a request from the cloud component, TCP Server encapsulates the request into a StremMsg and sends it to StreamServer. Streammsg. Type=FrontendHandler, streammsg. Node randomly selects a Node from the established cloud side tunnel. Upon receiving the StreamMsg, tunnel-edge calls the FrontendHandler function
func FrontendHandler(msg *proto.StreamMsg) error { c := context.GetContext().GetConn(msg.Topic) if c ! = nil { c.Send2Conn(msg) return nil } tp := tcpmng.NewTcpConn(msg.Topic, msg.Addr, msg.Node) tp.Type = util.TCP_BACKEND tp.C.Send2Conn(msg) tcpAddr, err := net.ResolveTCPAddr("tcp", tp.Addr) if err ! = nil { ... conn, err := net.DialTCP("tcp", nil, tcpAddr) ... tp.Conn = conn go tp.Read() go tp.Write() return nil}Copy the code
The FrontendHandler first uses Streammsg. Addr to establish a TCP connection with the Edge Server, and then starts the coroutine to asynchronically Read and Write the TCP connection. Create a Conn object (conn.uid= Streammsg. Topic) and write eammsg. Data to the TCP connection. After receiving the returned data from the Edge Server, tunnel-edge encapsulates it as StreamMsg(Streammsg. Topic=BackendHandler) and sends it to tunnel-cloud
The whole process is shown in the figure below:
summary
- TCP: Responsible for the establishment of cloud and side TCP proxies in multi-cluster management
- The cloud component accesses the Server on the edge through the TCP module. When receiving the request, the CLOUD TCP Server will encapsulate the request into StreamMsg and pass the cloud side tunnel (randomly select one of the connected tunnels, Therefore, it is recommended to use TCP proxy when there is only one tunnel-edge. *tunnel-edge establishes a TCP connection with the edge Server by receiving the Addr field of StreamMag. And writes the request to the TCP connection. Tunnel-edge reads the return message from the Server at the edge of the TCP connection and sends it to the tunnel-cloud through the cloud edge tunnel. After receiving the message, the tunnel-cloud writes the message to the connection established between the cloud component and the TCP Server
Looking forward to
- Support for more network protocols (HTTPS and TCP already supported)
- Supports cloud access to edge node service POD Servers
- When multiple edge nodes are added to the cluster at the same time, multiple copies of tunnel-cloud Pod are not locked when updating configMap in the hosts plug-in configuration file of tunnel-coreDNS. Although the probability is low, write conflicts still exist in theory
Refs
- kubernetes-reading-notes