“This is the 28th day of my participation in the First Challenge 2022. For details: First Challenge 2022”

I want to do something before or after each RPC method, I want to do uniform special processing for the RPC method of a business module, I want to authenticate the RPC method, I want to do context timeout control for the RPC method, I want to log every RPC method request, how do I do that?

The answer to all of these requirements, and so on, is in the Interceptor, which you can use to implement a lot of custom functionality without breaking directly into business code.

1 Type of interceptor

In gRPC, interceptors can be classified into the following two categories according to the type of RPC calls intercepted by interceptors:

  • Unary Interceptor: Intercepts and handles Unary RPC calls.
  • Stream Interceptor: Intercepts and handles Stream RPC calls.

Although there are generally only two interceptor categories, further down the line, client and server each have their own element and specific type of stream interceptor. As a result, there are four different types of interceptors in gRPC.

2 Client and server interceptors

2.1 the client

2.1.1 Unary interceptor

The client’s unary interceptor type is UnaryClientInterceptor, and the method prototype is as follows:

type UnaryClientInterceptor func( ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ... CallOption, ) errorCopy the code

The implementation of unary interceptors can generally be divided into three parts: preprocessing, calling RPC methods, and post-processing. It is divided into seven parameters, respectively: RPC context, called method, request parameters and response results of RPC method, client connection handle, called RPC method and called configuration.

2.1.2 Stream interceptor

The client stream interceptor type is StreamClientInterceptor, and the method prototype is as follows:

type StreamClientInterceptor func( ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ... CallOption, ) (ClientStream, error)Copy the code

The implementation of the stream interceptor includes preprocessing and stream operation interception. Instead of making RPC method calls and post-processing after the fact, it intercepts user flow operations.

2.2 the service side

2.2.1 Unary interceptor

The unary interceptor type on the server side is UnaryServerInterceptor, and the method prototype is as follows:

type UnaryServerInterceptor func(
    ctx context.Context, 
    req interface{}, 
    info *UnaryServerInfo, 
    handler UnaryHandler,
) (resp interface{}, err error)
Copy the code

It contains four parameters, namely, RPC context, request parameters of RPC method, all information of RPC method and RPC method itself.

2.2.2 Stream interceptor

The stream interceptor type on the server side is StreamServerInterceptor, and the method prototype is as follows:

type StreamServerInterceptor func(
    srv interface{}, 
    ss ServerStream, 
    info *StreamServerInfo, 
    handler StreamHandler,
) error
Copy the code

Implement an interceptor

Now that we know the basic concepts of gRPC interceptors, let’s open up the RPC service we implemented earlier and do a simple experiment by opening the main.go file and adding interceptor related code:

. func runGrpcServer() *grpc.Server { opts := []grpc.ServerOption{ grpc.UnaryInterceptor(HelloInterceptor), } s := grpc.NewServer(opts...) pb.RegisterTagServiceServer(s, server.NewTagServer()) reflection.Register(s) return s } func HelloInterceptor(ctx context.Context, req interface{}, Info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {log.println (" hello ") resp, Err := handler(CTX, req) log.Println(" goodbye ") return resp, err}Copy the code

In the above code, in addition to implementing a simple unary interceptor, we also use grpc.ServerOption for the first time. GRPC Server properties can be set here, such as the credentials, Keepalive and other parameters. Server-side interceptors are also registered here, but need to be wrapped in a specified type, for example unary interceptors use grpc.unaryInterceptor.

In terms of verification, we need to restart the service after modification, call the corresponding RPC interface, and check whether the console outputs two strings of “hello” and “goodbye”, if so, the implementation is correct.

How many interceptors can be used

Since we have implemented one interceptor, we must have more than one interceptor in the real application. Generally speaking, since we have supported one interceptor, it is not too much to support the registration and use of multiple interceptors. Let’s try again, the code is as follows:

func runGrpcServer() *grpc.Server { opts := []grpc.ServerOption{ grpc.UnaryInterceptor(HelloInterceptor), grpc.UnaryInterceptor(WorldInterceptor), } s := grpc.NewServer(opts...) pb.RegisterTagServiceServer(s, server.NewTagServer()) reflection.Register(s) return s } func HelloInterceptor(...) (interface{}, error) {log.Println(" hello, red, red, red ") resp, err := handler(CTX, req) log.Println(" bye, Return resp, err} func WorldInterceptor(... (interface{}, error) {log.Println(" hello, fry fish ") resp, err := handler(CTX, req) log.Println(" goodbye, fry fish ") return resp, err}Copy the code

Rerun the service and view the following output:

panic: The unary server interceptor was already set and may not be reset.
Copy the code

You will find an error when you start the service and will be told “unary server interceptor is set and cannot be reset”, meaning that only one interceptor of one type is allowed to be set.

5 really needs multiple interceptors

Grpc-go officially only allows one interceptor, but that doesn’t mean we have to “use” one.

In practice, we often want to design different functions as different interceptors. In this case, in addition to implementing a set of logic for multiple interceptors (just call interceptors within interceptors), You can also directly use the GrPC. UnaryInterceptor and grpc.StreamInterceptor provided by go-Grpc-Middleware in the GrPC-ecosystem Chain method to do this.

5.1 installation

Run the following installation commands in the project root directory:

$go get -u github.com/grpc-ecosystem/[email protected]Copy the code

5.2 the use of

Change gRPC Sever code to register multiple interceptors as follows:

func runGrpcServer() *grpc.Server { opts := []grpc.ServerOption{ grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(  HelloInterceptor, WorldInterceptor, )), } s := grpc.NewServer(opts...) pb.RegisterTagServiceServer(s, server.NewTagServer()) reflection.Register(s) return s }Copy the code

In GRPC. UnaryInterceptor of nested grpc_middleware. ChainUnaryServer after restart the service, view the output results:

Hello, braised fish and fried fish, goodbye, steamed fish and fried fish, braised fish and fried fishCopy the code

Two interceptor calls were successful, fulfilling the requirement of regular multiple interceptors.

5.3 How is it implemented

Go-grpc-middleware doesn’t work just by being able to use it. Let’s take a look at go-grpc-Middleware and find out how it works.

func ChainUnaryClient(interceptors ... grpc.UnaryClientInterceptor) grpc.UnaryClientInterceptor { n := len(interceptors) if n > 1 { lastI := n - 1 return func(...) error { var ( chainHandler grpc.UnaryInvoker curI int ) chainHandler = func(...) error { if curI == lastI { return invoker(...) } curI++ err := interceptors[curI](...) Return interceptors[0](CTX, method, req, reply, cc, chainHandler, opts...) }}... }Copy the code

When the number of interceptors is greater than one, starting with interceptors[1], each recursive interceptor [I] is recursively executed until the handler that actually executes the RPC method is removed.

6 Server – Common interceptor

In the running of a project, there are often some application interceptors that are necessary, so we can summarize a simple and effective set of “common” interceptors. In this section, we will simulate the implementation of the actual use scenario.

The server_interceptor.go file is created in the internal/ Middleware directory of the project to store the server interceptor, and subsequent server interceptor registrations are handled in the runGrpcServer method, for example:

func runGrpcServer() *grpc.Server { opts := []grpc.ServerOption{ grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(  middleware.XXXXX, )), } ... }Copy the code

6.1 log

In the running of applications, we often need some information to assist us in troubleshooting and tracing problems, so it is very necessary to record and process log information in time. Next, we will output logs for common access logs and error logs. In practice, you can replace the default log instance in the case with the mode of the file log that is actually used in the application (for example, see the logger in Chapter 2).

6.1.1 Access Logs

Open the server_interceptor.go file and add a log interceptor for access records as follows:

func AccessLog(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
requestLog := "access request log: method: %s, begin_time: %d, request: %v"
beginTime := time.Now().Local().Unix()
log.Printf(requestLog, info.FullMethod, beginTime, req)

resp, err := handler(ctx, req)

responseLog := "access response log: method: %s, begin_time: %d, end_time: %d, response: %v"
endTime := time.Now().Local().Unix()
log.Printf(responseLog, info.FullMethod, beginTime, endTime, resp)
return resp, err
}
Copy the code

Once the AccessLog interceptor is written, it is registered with the gRPC Server, and then the service is restarted for validation, which is when the RPC method is called, and two log records are printed.

Some readers may wonder why a similar but not identical log is printed before and after the execution of the RPC method. What is the purpose of this log? Why not just output a log after the execution of the RPC method?

In fact, if the log is only output after RPC method execution, then we can assume two examples:

  1. The RPC method ran into some unexpected issues and was executed for a long time without knowing when it would return (if nothing else).
  2. OOM appeared in the execution process due to extreme circumstances, and the system killed RPC method before it was executed.

These two examples of what may be the cause of the problem, in general, will go to the log, basic because of problems in the application system has now, so the first case, it is very common, if only the RPC method after the execution of the log, single log, may be there just isn’t what you need to access log, because it is still in execution; And in the second case, it’s not complete at all.

As a result, partial loss of logs may cause you to misjudge the cause of the current accident, affecting your full-link ˙ tracking, and requiring more efforts in troubleshooting.

6.1.2 Error Logs

Open the server_interceptor.go file and add a log interceptor for common error logging. The code is as follows:

func ErrorLog(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { resp, err := handler(ctx, req) if err ! = nil { errLog := "error log: method: %s, code: %v, message: %v, details: %v" s := errcode.FromError(err) log.Printf(errLog, info.FullMethod, s.Code(), s.Err().Error(), s.Details()) } return resp, err }Copy the code

Once written and registered, like the previous interceptor, this interceptor can log error returns from all RPC methods, making it easy to uniformly regulate and observe errors at the error level.

6.2 Exception Catching

Next we deal with catching exceptions. Before we start writing interceptors, let’s try:

func (t *TagServer) GetTagList(ctx context.Context, r *pb.GetTagListRequest) (*pb.GetTagListReply, Error) {panic(" Test throws an exception!" )... }Copy the code

In the GetTagList method of gRPC Server, we add the call of panic statement to simulate the situation of throwing an exception, and then run the service again to see what happens without any “protection”, as follows:

$ go run main.go access request log: method: /proto.TagService/GetTagList, begin_time: 159999999, request: panic: Test throwing exceptions! goroutine 40 [running]: github.com/go-programming-tour-book/tag-service/server.(*TagServer).GetTagList(0x1a4f728, 0x169af00, 0xc00016e720, 0xc00019a400, 0x1a4f728, 0xc0001c1790, 0x102eac1) /Users/eddycjy/go-programming-tour-book/tag-service/server/tag.go:21 +0x39 github.com/go-programming-tour-book/tag-service/proto._TagService_GetTagList_Handler.func1(0x169af00, 0xc00016e720, 0x1594260, 0xc00019a400, 0xc0001c17a0, 0xc0001c1870, 0x10ec347, 0xc0000b2000) /Users/eddycjy/go-programming-tour-book/tag-service/proto/tag.pb.go:265 +0x86 ...Copy the code

You read that right, the service is broken directly because of an exception thrown, which is a very bad situation, meaning that the service can no longer provide a response. To fix this, we need to add a custom exception catcher. Open the middleware.go file and add the following code:

func Recovery(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { defer func() { if e := recover(); e ! = nil { recoveryLog := "recovery log: method: %s, message: %v, stack: %s" log.Printf(recoveryLog, info.FullMethod, e, string(debug.Stack()[:])) } }() return handler(ctx, req) }Copy the code

As with previous interceptor, after writing to register, the interceptor can for all the exception thrown by the RPC method for capturing and records, ensure that won’t stop because of unknown panic the execution of the statement led to the suspension of the service, in the actual project application, you can according to the company within the observability technology stack, some customized processing, Then it will be more perfect.

7 Client – Common interceptor

We’ll create a new client_interceptor.go file in the internal/ Middleware directory of our project to store client interceptors, and write some interceptors for common scenarios. In addition, subsequent client-interceptor registration-related actions are registered with the DialOption configuration option before calling grpc.Dial or grpc.DialContext, for example:

var opts []grpc.DialOption opts = append(opts, grpc.WithUnaryInterceptor( grpc_middleware.ChainUnaryClient( middleware.XXXXX(), ), )) opts = append(opts, grpc.WithStreamInterceptor( grpc_middleware.ChainStreamClient( middleware.XXXXX(), ), )) clientConn, err := grpc.DialContext(ctx, target, opts...) .Copy the code

7.1 Timeout Control (Context)

Setting and properly controlling timeout times is an important security in microservices architecture.

Let’s assume an application scenario where you have multiple services, A, B, C, and D, and they are the simplest associated dependencies, A= “B=” C= “D. In one day, you have A demand for online, the content of the modified code is just is related to service D, just the use of this demand is corresponds to the round of business boom, but found that you don’t know why, your services A, B, C, D all the response is slow, started to application system as A whole, avalanche… .What the hell is going on?

Basically speaking, there is a problem with service D, which leads to a chain reaction of upstream and downstream services. The default timeout time is not set in service invocation, or the timeout time is set too long, which will lead to the avalanche of the whole invocation chain under multiple services, resulting in very serious accidents. Therefore, setting the default timeout for any call is very necessary, and TL is emphasized in gRPC; DR (Too long, Don’t read) and recommends always setting deadlines.

Therefore, in this section we will set the default timeout control for internal RPC calls. In the client_interceptor. Go file, add the following code:

func defaultContextTimeout(ctx context.Context) (context.Context, context.CancelFunc) { var cancel context.CancelFunc if _, ok := ctx.Deadline(); ! ok { defaultTimeout := 60 * time.Second ctx, cancel = context.WithTimeout(ctx, defaultTimeout) } return ctx, cancel }Copy the code

In the above code, we check the incoming context by calling the ctx.Deadline method, which returns false if no Deadline is set, The default timeout is set to 60 seconds when the context.WithTimeout method is called. (This timeout is set for the entire calling link and can be adjusted in the application code if needed.)

Next, we write corresponding client interceptors for unary and streaming calls of gRPC respectively, as follows:

func UnaryContextTimeout() grpc.UnaryClientInterceptor { return func(ctx context.Context, method string, req, resp interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ... grpc.CallOption) error { ctx, cancel := defaultContextTimeout(ctx) if cancel ! = nil { defer cancel() } return invoker(ctx, method, req, resp, cc, opts...) } } func StreamContextTimeout() grpc.StreamClientInterceptor { return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ... grpc.CallOption) (grpc.ClientStream, error) { ctx, cancel := defaultContextTimeout(ctx) if cancel ! = nil { defer cancel() } return streamer(ctx, desc, cc, method, opts...) }}Copy the code

After the interceptor is written, registration can take effect before an RPC call.

7.2 Retry Operations

In the overall service operation, occasionally there will be some “strange” network fluctuation, flow limit, server resources dash forward show abnormal (but soon decline), need to wait a moment, then moved strategy, we often need to adopt some undertakes 2 times after a short wait try again, to ensure the success of the application of the final, So a basic retry is necessary for our gRPC client. If there are no customization requirements, we can use the GRPC_Retry interceptor in the gRPC ecosystem to implement basic retry functions as follows:

var opts []grpc.DialOption
opts = append(opts, grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(
grpc_retry.UnaryClientInterceptor(
grpc_retry.WithMax(2),
grpc_retry.WithCodes(
  codes.Unknown,
  codes.Internal,
  codes.DeadlineExceeded,
),
),
),
))
...
Copy the code

In the GRPC_Retry interceptor, we set the maximum number of retries to 2 only if the gRPC error code is Unknown, Internal, or DeadlineExceeded.

The first point to note here is that it determines whether to retry dimensions based on error code standards, so do service try again the first point, is that you need in the design of micro service application, clear the status code of rules, to ensure that the standards of many service status code is consistent (can fall to the ground by way of infrastructure, public library), The second point is to make sure that the interface design is idempotent as much as possible, allowing for retries without causing catastrophic problems, such as restocking.

8 Actual Combat

In the interceptor for timeout control, we improved the default timeout control. Is there a similar risk in our system? Of course there is, and it’s a classic problem we have in Go programming. When we implement the GetTagList method for gRPC Server, the data source is the blog-service from Chapter 2, as follows:

func (t *TagServer) GetTagList(ctx context.Context, r *pb.GetTagListRequest) (*pb.GetTagListReply, Error) {API := bapi.newapi ("http://127.0.0.1:8000") Body, err := api.gettagList (CTX, r.getName ())... }Copy the code

So let’s simulate this. Suppose the blog backend application has a problem and is suspended and does not return, as follows:

func (t Tag) List(c *gin.Context) {
time.Sleep(time.Hour)
...
}
Copy the code

We open the blog backend application in Chapter 2, hibernate the “get tag list interface” for a new increase in time, and then call the gRPC Server interface, for example: When your gRPC Server on port 8000, call the http://127.0.0.1:8010/api/v1/tags.

We will see the gRPC Server output access log as follows:

The access request log: method: / proto TagService/GetTagList, begin_time: XXXXX, request: / / not below, for a long time, such as the response log have been slow to appear.Copy the code

You will find that there is no response, only the request log from the access time. You can leave the request on hold and you will find that it will not return for any length of time until the sleep time is over.

If we think about a real world scenario, we would normally use the HTTP API, basically because we rely on third-party interfaces. So let’s say there’s a problem with this third-party interface, which is that the interface is extremely slow, or even dead, and doesn’t respond at all. However, your application is normal, so traffic will continually into your application, it can form a vicious circle, blocking waiting for coroutines will be more and more, spending more and more big, will eventually lead to the upstream service problems, then you the downstream service will gradually collapse, eventually forming a chain reaction.

8.1 why

Let’s look at the HTTP API SDK code for why a sleep can cause such a big problem:

func (a *API) httpGet(ctx context.Context, path string) ([]byte, error) {
resp, err := http.Get(fmt.Sprintf("%s/%s", a.URL, path))
...
}
Copy the code

We use the http.Get method by default, its internal source code:

func Get(url string) (resp *Response, err error) {
return DefaultClient.Get(url)
}
Copy the code

In fact, it uses the package global variable DefaultClient scheduled in the standard library, and the default Timeout value of DefaultClient is zero, equivalent to 0, so when the Timeout value is 0, it is considered that there is no Timeout limit by default. That is, it waits indefinitely until it responds, which is one of the root causes of its problems.

8.2 Solution

We have at least two solutions to this problem, either by customizing HTTPClient or by using our timeout control:

func (a *API) httpGet(ctx context.Context, path string) ([]byte, error) {
resp, err := ctxhttp.Get(ctx, http.DefaultClient, fmt.Sprintf("%s/%s", a.URL, path))
...
}
Copy the code

We change the http.Get method to ctxhttp.Get, passing context (CTX) to the method, and it will be time-out controlled by the context. This method, however, requires that the client register the timeout control interceptor at call time, as follows:

func main() { ctx := context.Background() clientConn, err := GetClientConn(ctx, "tag-service", []grpc.DialOption{grpc.WithUnaryInterceptor( grpc_middleware.ChainUnaryClient(middleware.UnaryContextTimeout()), )})... }Copy the code

Verify again as follows:

tagServiceClient.GetTagList err: rpc error: code = DeadlineExceeded desc = context deadline exceeded
exit status 1
Copy the code

The client will automatically disconnect after the exceeded deadline, indicating DeadlineExceeded. Therefore, in terms of results, if the upstream service fails, your current service will not be affected to call it again, because you have made a timely stop loss through the timeout time. So setting the default timeout and how much it is is very meaningful and elegant.

At this point, however, the server itself may still be blocking indefinitely, and the client may disconnect only itself, so the server itself recommends setting a default maximum execution time to ensure maximum availability and avoid the worst case scenario of a client that forgets to set timeout controls.

8.3 Thinking and how to find out

I just told you the problem and solution derived from the reverse result. How would you find and solve it if you were yourself?

The easiest way to do this is through log discovery and link tracing. Assuming the problem mentioned above, the access log will only return the Request log, but not the response log. If you’re using a distributed link tracing system, it’s very obvious that a call chain of one Span will take a long time, which is a dangerous smell. Even through the analysis of these index data, when this kind of situation occurs, directly through the analysis to determine whether to alarm, self-healing, that would be more appropriate.

9 subtotal

In this chapter, we first introduce the types of interceptors in gRPC and explain and analyze their use through simple examples. After introducing the basics of interceptors, we wrote a series of common interceptors based on common problems in real projects to protect our application.

Go-grpc-middleware is the go-grpc-middleware solution we recommend for use of multiple blockers, but since GRPC V1.28.0, we’ve contributed and combined the middleware solution (see Issues #935). You can choose according to the actual project situation.