Why do we need interceptors
Let’s get the question out of the way.
- Procedure panic head pain.
- Parameter verification code everywhere bloom, can not appreciate it.
- There is no timeout for one request. The server is overloaded with requests.
- False unification is flying around.
- Logging and business code are intertwined.
- . The above things do not want to do, 🤮.
Program was created to panic
func (g greeterImpl) OutOfIndex(ctx context.Context, request *pb.OutOfIndexRequest) (*pb.OutOfIndexResponse, error) {
request.Ids = make([]int64.0)
request.Ids[1] = 1
return &pb.OutOfIndexResponse{Data: "ok"}, nil
}
Copy the code
The above code, once run the entire program Gg, how to deal with it? Docker-compose Doc alway Restart can make the docker-compose doc continue to run, but there are problems with that. There seems to be no problem. Isn’t it to let the program that has already died resume? There’s nothing wrong with it. What about other obligations that have been partially addressed but not yet completed? This is a really big question.
That’s ok, do a panic intercept in the code, don’t let the program throw errors to the top of the call. Let’s change the code to look like this.
func (g greeterImpl) OutOfIndex(ctx context.Context, request *pb.OutOfIndexRequest) (*pb.OutOfIndexResponse, error) {
defer func(a) {
// Interception error
if err:=recover(a); err ! =nil {
glog.Errorf("panic:%s\n".string(debug.Stack()))
}
}() // Basically add the processing of the defer function
request.Ids = make([]int64.0)
request.Ids[1] = 1
return &pb.OutOfIndexResponse{Data: "ok"}, nil
}
Copy the code
If it is really written like this, it is really a big head… If there are 10 such examples, do you really want to repeat 10 such examples? It’s really heady. What? I’ve mastered CTRL command + C && CTRL Command + V. That’s all right. Maybe it should have been that way. But maybe things don’t need to be this complicated, maybe they aren’t….
How to gracefully intercept panic
Use the interceptors provided by GRPC. Server. There are two interceptors provided by GRPC, respectively corresponding to request-Response mode and stream mode (generally used for large file transfer type requests)
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)
type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error
Copy the code
How to use it?
Use the UnaryInterceptor for an example
srv := grpc.NewServer(grpc.UnaryInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
defer func(a) {
if err := recover(a); err ! =nil {
glog.Errorf("method:%s, time:%s, err:%v, fatal%s", info.FullMethod, time.Now().Format("20060102-15:04:05:06"), err,string(debug.Stack()))
}
}() // Intercepts the exception thrown by the downlink here
// Execute the corresponding business method
resp, err = handler(ctx, req)
return resp, err
}))
Copy the code
Of course, you can add some other operations, such as request logging to ES, etc., such as parameter verification, etc. To avoid variable contamination, the variables of parameter verification are contained in a closure function
srv := grpc.NewServer(grpc.UnaryInterceptor(func(a) grpc.UnaryServerInterceptor {
var (
validate = validator.New()
uni = ut.New(zh.New())
trans, _ = uni.GetTranslator("zh")
)
err := zh_translations.RegisterDefaultTranslations(validate, trans)
iferr ! =nil {
panic(err)
}
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
fmt.Printf("ctx %#v, req:%v, info:%#v",ctx,req,info)
defer func(a) {
if err := recover(a); err ! =nil {
glog.Errorf("method:%s, time:%s, err:%v, fatal%s", info.FullMethod, time.Now().Format("20060102-15:04:05:06"), err,string(debug.Stack()))
}
}()
// Check parameters
iferr := validate.Struct(req); err ! =nil {
if transErr, ok := err.(validator.ValidationErrors); ok {
translations := transErr.Translate(trans)
var buf bytes.Buffer
for _, s2 := range translations {
buf.WriteString(s2) // This function is written with an error return.......
}
err = status.New(codes.InvalidArgument, buf.String()).Err()
return resp, err
}
err = status.New(codes.Unknown, fmt.Sprintf("error%s", err)).Err()
return resp, err
}
resp, err = handler(ctx, req)
return resp, err
}
}()))
Copy the code
Expand the writing of middleware
If the task is simple enough, the above notation is sufficient, but if it is not, such as the following:
srv := grpc.NewServer(grpc.UnaryInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
fmt.Printf("ctx %#v, req:%v, info:%#v", ctx, req, info)
defer func(a) {
if err := recover(a); err ! =nil {
glog.Errorf("method:%s, time:%s, err:%v, fatal%s", info.FullMethod, time.Now().Format("20060102-15:04:05:06"), err, string(debug.Stack()))
}
}()
// If no timeout is set, set a timeout of 6 s
if_, ok := ctx.Deadline(); ! ok {var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Second*6)
defer cancel()
}
data, err := json.Marshal(req)
iferr ! =nil {
err = status.New(codes.Internal, err.Error()).Err()
return resp, err
}
jData := string(data)
glog.Errorf("method:%s, request:%v", info.FullMethod, jData)
// Check parameters
iferr := validate.Struct(req); err ! =nil {
if transErr, ok := err.(validator.ValidationErrors); ok {
translations := transErr.Translate(trans)
var buf bytes.Buffer
for _, s2 := range translations {
buf.WriteString(s2)
}
err = status.New(codes.InvalidArgument, buf.String()).Err()
return resp, err
}
err = status.New(codes.Unknown, fmt.Sprintf("error%s", err)).Err()
return resp, err
}
start := time.Now()
resp, err = handler(ctx, req)
glog.Infof("method:%s, request:%#v, resp:%#v, latency:%v, status:%v", info.FullMethod, req, resp, time.Now().Sub(start), status.Convert(err))
return resp, err
}))
Copy the code
Also do not have what problem actually, can finish the job like…. But it can be done much better.. For example,….
package main
import (
"bufio"
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"net"
"os"
"time"
"github.com/go-playground/locales/zh"
ut "github.com/go-playground/universal-translator"
"github.com/go-playground/validator/v10"
zh_translations "github.com/go-playground/validator/v10/translations/zh"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"mio_grpc/pb"
)
type greeterImpl struct{}// Call the chain handler
type HandlerFunc func(*Context)// RepackagedContext;
type Context struct {
req interface{} // Enter parameters
resp interface{} // Output parameters
info *grpc.UnaryServerInfo // Service information
ctx context.Context // Context information for the service method
handler grpc.UnaryHandler // Request processing for the corresponding service
err error / / error
reqJsData string // Enter the parameter as a string in js format
respJsData string // The input parameter is a string in js format
handlerFunc []HandlerFunc // There is a handler by default
index int // The level of the current callback
data map[string]interface{} // 设置的 data
}
Create a new context
// @param context. context The context requested by the GRPC method
// @param req requests the input parameter of the method
// @param resp request method output parameters
// @param info GRPC method name information, which contains the requested method name
// @param handler this argument is a function that calls the corresponding GRPC method
//
// @ret *Context returns the rewrapped Context
func newContext(ctx context.Context, req, resp interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) *Context {
data, _ := json.Marshal(req) // Serialize the request parameters
return &Context{
req: req,
resp: resp,
info: info,
err: nil,
ctx: ctx,
reqJsData: string(data),
respJsData: "",
handlerFunc: make([]HandlerFunc, 0),
handler: handler,
data: make(map[string]interface{}, 16),}}// Check parameters
func validate(a) HandlerFunc {
var (
validate = validator.New() / / validator
uni = ut.New(zh.New())
trans, _ = uni.GetTranslator("zh") // Noon translator
)
// Associate validator and translator
err := zh_translations.RegisterDefaultTranslations(validate, trans)
iferr ! =nil {
panic(err)
}
return func(c *Context) {
// Check parameters
iferr := validate.Struct(c.req); err ! =nil {
// Check whether the error is a validation field error
// If the parameter verification fails, translate the error message into the corresponding description
if transErr, ok := err.(validator.ValidationErrors); ok {
translations := transErr.Translate(trans)
var buf bytes.Buffer
for _, s2 := range translations {
buf.WriteString(s2)
}
// The GRPC returned an error
err = status.New(codes.InvalidArgument, buf.String()).Err()
// Abort the call early
c.AbortWith(err)
return
}
// If an error is encountered while validating GRPC input parameters, but the error is not translated as an error, return unknown error
err = status.New(codes.Unknown, fmt.Sprintf("error%s", err)).Err()
// Abort the call early
c.AbortWith(err)
return}}}// Get the request parameters
func (c *Context) GetReq(a) interface{} {
return c.req
}
// Get the js format of the request parameters
func (c *Context) GetReqJsData(a) string {
if c == nil {
return ""
}
return c.reqJsData
}
/ / set the JsData
func (c *Context) SetReqJsData(str string) {
if c == nil {
return
}
if json.Valid([]byte(str)) {
c.reqJsData = str
}
}
// Sets the js string that returns the request parameters
func (c *Context) SetRespJsData(str string) {
if c == nil {
return
}
if json.Valid([]byte(str)) {
c.respJsData = str
}
}
// Get the name of the method that the current GRPC needs to request
func (c *Context) FullMethod(a) string {
if c == nil || c.info == nil {
return ""
}
return c.info.FullMethod
}
func (c *Context) SetData(key string, value interface{}) {
if c == nil {
return
}
c.data[key] = value
}
func (c *Context) GetData(key string) interface{} {
if c == nil {
return nil
}
return c.data[key]
}
// The next level in the hierarchy where the chain method is currently called
func (c *Context) Next(a) {
if c == nil {
return
}
c.index++
for (c.index) < len(c.handlerFunc) {
c.handlerFunc[c.index](c)
c.index++
}
}
// Terminate all calls early,
func (c *Context) AbortWith(err error) {
const (
abortLevel = 1 << 32
)
c.err = err
c.index = abortLevel
}
// Simulate log output to es
func log2es(a) HandlerFunc {
// Simulate the input log to es
file, err := os.OpenFile("./my.txt", os.O_CREATE|os.O_RDWR|os.O_APPEND, 0766)
iferr ! =nil {
panic(err)
}
w := bufio.NewWriter(file)
defer w.Flush()
return func(c *Context) {
start := time.Now()
c.Next() // Request the next method
_, _ = file.WriteString(
fmt.Sprintf(
"method:%s, status:%v, latency:%v, req:%s, resp:%s\n", c.FullMethod(), status.Convert(c.err).Code().String(), time.Now().Sub(start), c.GetReqJsData(), ""))
//fmt.Println("log2es after ", time.Now(), writeString, err2, c.FullMethod())}}// Call the handler function, which is called by default, when all calls are complete
func procHandler(ctx *Context) {
ctx.resp, ctx.err = ctx.handler(ctx.ctx, ctx.req)
}
// The package handles multiple handler funcs
func WrapperHandler(handFunc ... HandlerFunc) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
// Generate a new Context by taking the argument from the GRPC interceptor's callback
c := newContext(ctx, req, resp, info, handler)
// Construct a default callback handler that intercepts Panic as the first business handler
c.handlerFunc = append(c.handlerFunc, func(c *Context) {
defer func(a) {
if err := recover(a); err ! =nil {
c.AbortWith(status.New(codes.Internal, fmt.Sprintf("errors:%v", err)).Err())
}
}()
c.Next()
})
// Take the processing of user input as the intermediate processing
c.handlerFunc = append(c.handlerFunc, handFunc...)
// When the user is done, the real service method is called
c.handlerFunc = append(c.handlerFunc, procHandler)
// Start invoking the service
for c.index = 0; c.index < len(c.handlerFunc); c.index++ {
c.handlerFunc[c.index](c)
}
// Return to services RESP and ERR
return c.resp, c.err
}
}
func (g greeterImpl) OutOfIndex(ctx context.Context, request *pb.OutOfIndexRequest) (resp *pb.OutOfIndexResponse, err error) {
fmt.Println("OutOfIndex", request)
time.Sleep(time.Second * 4)
//defer func() {
// // interception error
// if err := recover(); err ! = nil {
// glog.Errorf("panic:%s\n", string(debug.Stack()))
// }
/ /} ()
resp = &pb.OutOfIndexResponse{Data: "ok"}
request.Ids = make([]int64.0)
request.Ids[1] = 1
return
}
func (g greeterImpl) NilPointer(ctx context.Context, request *pb.NilPointerRequest) (*pb.NilPointerResponse, error) {
request.Data.Data = "work man"
return &pb.NilPointerResponse{Data: "ok"}, nil
}
func (g greeterImpl) Hello(ctx context.Context, request *pb.HelloRequest) (*pb.HelloResponse, error) {
//panic("implement me")
fmt.Println("Hello:", request)
return &pb.HelloResponse{ErrCode: "err_code"}, nil
}
func main(a) {
flag.Parse()
srv := grpc.NewServer(grpc.UnaryInterceptor(WrapperHandler(log2es(), validate())))
listen, err := net.Listen("tcp".": 8086")
iferr ! =nil {
panic(err)
}
pb.RegisterGreeterServer(srv, &greeterImpl{})
iferr = srv.Serve(listen); err ! =nil {
panic(err)
}
}
Copy the code
It looks complicated. Well, it does, it looks complicated, mainly because it’s got more lines of code, but
srv := grpc.NewServer(grpc.UnaryInterceptor(WrapperHandler(func(c *Context) {
c.Next()
}, log2es(), validate(), func(c *Context) {
c.Next()
}, func(c *Context) {
fmt.Println("hello ")
c.Next()
})))
Copy the code
This is simpler, and you can handle more business logic
Pb. go tag protoc-go-inject-tag is used as an additional tool to replace pb.go tag, which is used to inject validate
message HelloRequest {
//@inject_tag: validate:"required,gte=0"
int64 id = 1;
//@inject_tag:validate:"required"
string user_name = 2;
//@inject_tag:validate:"required"
string user_address = 3;
int64 book_time = 4;
//@inject_tag:validate:"required"
string random_str = 5;
}
Copy the code
The complete code is here, poke me straight up 🦀🦀