For a framework, there is usually something called middleware, sometimes an interceptor, which is similar to a hook.

GRPC is no exception. But how does using Python apply to our interceptor? What can an interceptor do?

1: What can GRPC’s interceptor do?

The concept of the interceptor itself is similar to our middleware, so our middleware can do things like fastAPI and interceptors can do:

  • The authentication
  • Log request record
  • Global context information processing, etc
  • Multiple interceptors and multiple middleware follow request rules that are the Onion model
  • The interceptor must have a return value, which is a response style

PS: And relative to GRPC, not only do we have hooks on the server side, but we also have hooks (interceptors) on the client side, similar to the hook functions provided by our HTTPX library!

PS: interceptors can work on both client and server: client interceptors and server interceptors

2: interceptor classification of GRPC

  • UnaryServerInterceptor – in the client
  • StreamClientInterceptor – in the client
  • The server in Python implements the ServerInterceptor

3: Implement GRPC interceptor in Python

View the interceptor parameters passed by the service:

3.1 Built-in interceptor on the server

Main points to note:

  • The interceptor is passed an instantiated object
  • The incoming interceptor list can be either a tuple or a list
  • The multi-interceptor format follows the Onion model

Server-side interceptors need to implement the interceptor’s abstract methods:

Complete server-side sample code:

From Concurrent import futures import time import GRPC import hello_pb2 import hello_pb2_grPC import signal # implementation GreeterServicer class Greeter(hello_PB2_grpc.greeterServicer) Def SayHello(self, request, context): Return hello_pb2.HelloReply(message='hello {MSG}'. Format (MSG =request.name)) def SayHelloAgain(self, request, context): # returns is our definition of the object of the response body. # # # set the abnormal status code context set_code (GRPC. StatusCode. PERMISSION_DENIED) # context. Set_details (" you don't have the access permissions)" Print (" Received request header metadata ", Set_trailing_metadata ((('name', '223232'), ('sex', '23232'))) # # compression mechanism of three kinds of processing NoCompression = _compression. # NoCompression Deflate = _compression. # Gzip Deflate = Context. Set_compression (grpc.Com compression.Gzip) return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name)) class MyUnaryServerInterceptor1(grpc.ServerInterceptor): def intercept_service(self,continuation, handler_call_details): Print (" I am interceptor 1: start ----1") respn = continuation(handler_call_details) print(" I am interceptor 1: Respn) - 2 "the end of the return respn class MyUnaryServerInterceptor2 (GRPC. ServerInterceptor) : def intercept_service(self,continuation, handler_call_details): Print (" I am interceptor 2: start ----1") respn = continuation(handler_call_details) End ----2",respn) return respn def serve(): Instantiate an RPC service Options = [('grpc.max_send_message_length', 60 * 1024 * 1024), Max_receive_message_length ', 60 * 1024 * 1024), # limit to receive the size of the largest data] # # compression mechanism of three kinds of processing NoCompression = _compression. # NoCompression Deflate = _compression. # Gzip Deflate = Gzip # configure the service to enable global data transmission compression = grpc.Com compression.Gzip server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=options, compression=compression, Interceptors = [MyUnaryServerInterceptor1 (), MyUnaryServerInterceptor2 ()]) # add our service hello_pb2_grpc.add_GreeterServicer_to_server(Greeter(), Add_insecure_port ('[::]:50051') # def stop_serve(signum, frame): Print (" Process terminated !!!!" ) # sys.exit(0) raise KeyboardInterrupt # cancel the associated signal # SIGINT corresponding to windos command CTRL + C # SIGTERM corresponding to Linux kill command signal.signal(signal.SIGINT, stop_serve) # signal.signal(signal.SIGTERM, Stop_serve) # wait_for_termination A loop is required to run the process server.wait_for_termination() if __name__ == '__main__': serve()Copy the code

The key configuration areas are:

At this point, we use our client to request the server, the server will output the following information:

This is Interceptor 1: Go ----1 This is interceptor 2: Go ----1 This is interceptor 2: ----2 rpcHandler (request_streaming=False, response_streaming=False, request_deserializer=<built-in method FromString of GeneratedProtocolMessageType object at 0x00000175D2988558>, response_serializer=<method 'SerializeToString' of 'google.protobuf.pyext._message.CMessage' objects>, unary_unary=<bound method Greeter.SayHelloAgain of <__main__.Greeter object at 0x00000175D46167B8>>, unary_stream=None, Stream_unary =None, stream_stream=None) ----2 rpcHandler (request_streaming=False, response_streaming=False, request_deserializer=<built-in method FromString of GeneratedProtocolMessageType object at 0x00000175D2988558>, response_serializer=<method 'SerializeToString' of 'google.protobuf.pyext._message.CMessage' objects>, unary_unary=<bound method Greeter.SayHelloAgain of <__main__.Greeter object at 0x00000175D46167B8>>, unary_stream=None, Stream_unary =None, stream_stream=None) Received request header metadata information (_Metadatum(key='mesasge', value='1010'), _Metadatum(key='error', Value ='No Error'), _Metadatum(key='user-agent', value='grpc-python/1.41.1 grpc-c/19.0.0 (Windows; chttp2)'))Copy the code

3.2 The interceptor comes with the client

The client interceptor needs to implement a different class than the server:

And when we use the client interceptor, the main link to our RPC will also change:

Complete client sample code:

import grpc import hello_pb2 import hello_pb2_grpc class ClientServerInterceptor1(grpc.UnaryUnaryClientInterceptor): def intercept_unary_unary(self, continuation, client_call_details, request): Print (" client interceptor: -- start ") RESp = continuation(client_call_details, request) - the end of the 2 ", resp) return resp class ClientServerInterceptor2 (GRPC. UnaryUnaryClientInterceptor) : def intercept_unary_unary(self, continuation, client_call_details, request): Print (" client interceptor 2: -- start 1") RESp = continuation(client_call_details, request) -- end 2", resp) return resp def run(): Options = [('grpc.max_send_message_length', 100 * 1024 * 1024), ('grpc.max_receive_message_length', 100 * 1024 * 1024), ('grpc.enable_retries', 1), ('grpc.service_config', '{ "retryPolicy":{ "maxAttempts": 4, "initialBackoff": "0.1s", "maxBackoff": "1s", "backoffMutiplier": 2, "retryableStatusCodes": [" UNAVAILABLE "]}} ')] # # compression mechanism of three kinds of processing NoCompression = _compression. # NoCompression Deflate = _compression. # Deflate Gzip # configure the compression mechanism used by the service to enable global data transmission grpc.insecure_channel(target='localhost:50051', # options=options, # compression=compression # ) as channel: with grpc.insecure_channel(target='localhost:50051', options=options, compression=compression ) as channel: Intercept_channel interceptor_channel = grpc.intercept_channel(channel, ClientServerInterceptor1(),ClientServerInterceptor2()) stub = hello_pb2_grpc.GreeterStub(interceptor_channel) # When generating the function that requests our service, we need to pass the body of the argument, which is placed in hello_pb2 - the body of the request is: hello_pb2. reest_header = ( ('mesasge', '1010'), ('error', 'No Error') ) response, Callbask = stub.sayHelloagain. with_call(request=hello_pb2.HelloRequest(name=' welcome next time ')), Metadata =reest_header,) print("SayHelloAgain ") "+ response.message) print("SayHelloAgain ", callbask.trailing_metadata()) except grpc._channel._InactiveRpcError as e: print(e.code()) print(e.details()) if __name__ == '__main__': run()Copy the code

4: GRPC interceptor context passing

We all know that as middleware, there are generally some business scenarios where there are some task drops that carry the request context, but with their own interceptors, there seems to be no corresponding at all

request, context
Copy the code

Related to the introduction of passing, what if we need to pass context? This cannot be achieved !!!!

To implement contextual delivery interceptors, use third-party libraries:

 pip install grpc-interceptor
Copy the code

This library also has dictionary related tests:

$ pip install grpc-interceptor[testing]
Copy the code

4.1 Modifying the server interceptor

Example of a complete server transformation with a third-party library:

from concurrent import futures import time import grpc import hello_pb2 import hello_pb2_grpc import signal from typing Import Any,Callable # GreeterServicer class Greeter(hello_PB2_grpc.greeterServicer): Def SayHello(self, request, context): Return hello_pb2.HelloReply(message='hello {MSG}'. Format (MSG =request.name)) def SayHelloAgain(self, request, context): # returns is our definition of the object of the response body. # # # set the abnormal status code context set_code (GRPC. StatusCode. PERMISSION_DENIED) # context. Set_details (" you don't have the access permissions)" Print (" Received request header metadata ", Set_trailing_metadata ((('name', '223232'), ('sex', '23232'))) # # compression mechanism of three kinds of processing NoCompression = _compression. # NoCompression Deflate = _compression. # Gzip Deflate = Context. Set_compression (grpc.Com compression.Gzip) return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name)) from grpc_interceptor import ServerInterceptor from grpc_interceptor.exceptions import GrpcException from grpc_interceptor.exceptions import NotFound class MyUnaryServerInterceptor1(ServerInterceptor): def intercept( self, method: Callable, request: Any, context: grpc.ServicerContext, method_name: str, ) -> Any: Rsep = None try: print(" I am interceptor 1: start ----1") rsep= method(request, context) except GrpcException as e: context.set_code(e.status_code) context.set_details(e.details) raise finally: Print (" I am the interceptor: 1 end - 2 ", rsep) return rsep class MyUnaryServerInterceptor2 (ServerInterceptor) : def intercept( self, method: Callable, request: Any, context: grpc.ServicerContext, method_name: str, ) -> Any: Rsep = None try: print(" I am interceptor 2: start ----1") rsep= method(request, context) except GrpcException as e: context.set_code(e.status_code) context.set_details(e.details) raise finally: Print (" I am interceptor 2: end ----2",rsep) return rsep def serve(): Instantiate an RPC service Options = [('grpc.max_send_message_length', 60 * 1024 * 1024), Max_receive_message_length ', 60 * 1024 * 1024), # limit to receive the size of the largest data] # # compression mechanism of three kinds of processing NoCompression = _compression. # NoCompression Deflate = _compression. # Gzip Deflate = Gzip # configure the service to enable global data transmission compression = grpc.Com compression.Gzip server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=options, compression=compression, Interceptors = [MyUnaryServerInterceptor1 (), MyUnaryServerInterceptor2 ()]) # add our service hello_pb2_grpc.add_GreeterServicer_to_server(Greeter(), Add_insecure_port ('[::]:50051') # def stop_serve(signum, frame): Print (" Process terminated !!!!" ) # sys.exit(0) raise KeyboardInterrupt # cancel the associated signal # SIGINT corresponding to windos command CTRL + C # SIGTERM corresponding to Linux kill command signal.signal(signal.SIGINT, stop_serve) # signal.signal(signal.SIGTERM, Stop_serve) # wait_for_termination A loop is required to run the process server.wait_for_termination() if __name__ == '__main__': serve()Copy the code

In this way, we can do the relevant processing for our context request! This is almost similar to the middleware for our Web framework!

4.2 Simple analysis of third-party library source code

Enter the source code of this third party library, in fact, find itself implemented

grpc.ServerInterceptor
Copy the code

Then it carries on a further layer of abstraction

  • The first step is actually the same as our own implementation, which is to get the next returned handle with processing
next_handler = continuation(handler_call_details)
Copy the code

It then performs what type of interceptor it is to return this next_handler:

- unary_unary
- unary_stream
- stream_unary
- stream_stream
Copy the code
  • Where did Lacey’s interceptor return after judging completion
handler_factory, next_handler_method
Copy the code

You then call the object that ultimately returns handler_factory

  • The handler_factory object takes the following arguments:

    • Invoke_intercept_method Method of the interceptor
    • Request_deserializer Specifies the serialization of requests
    • Response_serializer Serialization of responses
  • Our invoke_intercept_method interceptor does

    • One of the definitions passed in
    request: Any,
    context: grpc.ServicerContext,
    Copy the code
  • Then return is the method we need to implement in the end! I went to faint ~

4.3 Supplementary description handler_call_details

If we simply need to fetch the commit request header metadata from the RPC request, we can use it to read:

print("handler call details: ", handler_call_details.invocation_metadata)
Copy the code

It is a:

GRPC. $_server. _HandlerCallDetails typeCopy the code

conclusion

The above is just a personal combination of their own actual needs, do study practice notes! If there are clerical errors! Welcome criticism and correction! Thank you!

At the end

END

Jane: www.jianshu.com/u/d6960089b…

The Denver nuggets: juejin. Cn/user / 296393…

Public account: wechat search [children to a pot of wolfberry wine tea]

Let students | article | QQ: welcome to learn communication 】 【 308711822