Custom annotations enable service registration and discovery
What is the RPC
Remote Procedure Call Protocol (RPC) calls a remote service just as if it were a local service, regardless of the details of the call
RPC principle
The program to realize RPC communication consists of five parts: RPC-client, client proxy, Socket, server proxy, and RPC-server
request
- Client: When the RPC-client initiates a remote call, it actually serializes the interface, method, parameter, and parameter type to be called by the client proxy, and then sends the instance encapsulating the call parameter to the server through the socket in real time.
- Server: The socket receives information from the client, deserializes it, and delegates it to a concrete implementation object
response
- Server: After the target method is executed, it returns the result to the socket
- Client: After receiving the result, the socket returns the result to the Rpc-client
The technology applied
- java
- spring
- serialization
- socket
- reflection
- A dynamic proxy
GitHub address of the project
Github.com/autumnqfeng…
Original address of Blog
Autumn200.com/2020/06/21/…
Server project
The project structure
The RPC-server project consists of two sub-projects: order-API and order-provider
Order-api holds request interface and RpcRequest (entity class with class name, method name, parameter)
Order-provider refers to classes related to request interface implementation, socket, and proxy
order-api
order-provider
The service registry
The key to making dynamic calls to ServiceImpl is to manage the Service classes. How do we manage these service classes?
We can use spring’s @Service annotation to customize the Service registration. We define an annotation @rpCremoteservice on the ServiceImpl class and save the annotation’s class name and method name in the Map to locate the implementation class.
@RpcRemoteService
annotations
/** * Server service discovery annotation **@author: * * * *@date: 2020/6/21 16:21
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcRemoteService {
}
Copy the code
Service Registration classInitialMerdiator
After the Spring container is initialized, scan the @rpCremoteservice class and save it in mediator.routing.
/** * Initializes the intermediate proxy layer object **@author: * * * *@date: 2020/6/21 direction * /
@Component
public class InitialMerdiator implements BeanPostProcessor {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
// The bean tagged with service publication is published remotely
if (bean.getClass().isAnnotationPresent(RpcRemoteService.class)) {
Method[] methods = bean.getClass().getDeclaredMethods();
for (Method method : methods) {
String routingKey = bean.getClass().getInterfaces()[0].getName() + "." + method.getName();
BeanMethod beanMethod = newBeanMethod(); beanMethod.setBean(bean); beanMethod.setMethod(method); Mediator.ROUTING.put(routingKey, beanMethod); }}returnbean; }}Copy the code
The socket listening
Socket listens for client requests
Socket to start the classSocketServer
After the Spring container is loaded, start the socket
/** * After the spring container is started, a ContextRefreshedEven will be published@author: * * * *@date: 2020/6/21 16:51 * /
@Component
public class SocketServer implements ApplicationListener<ContextRefreshedEvent> {
private final ExecutorService executorService= Executors.newCachedThreadPool();
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
ServerSocket serverSocket=null;
try {
serverSocket = new ServerSocket(8888);
while (true) {
Socket accept = serverSocket.accept();
// The thread pool handles the socket
executorService.execute(newProcessorHandler(accept)); }}catch (IOException e) {
e.printStackTrace();
} finally {
if(serverSocket ! =null) {
try {
serverSocket.close();
} catch(IOException e) { e.printStackTrace(); }}}}Copy the code
The socket handle classProcessorHandler
Process each socket monitored
public class ProcessorHandler implements Runnable {
private Socket socket;
public ProcessorHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run(a) {
ObjectInputStream inputStream = null;
ObjectOutputStream outputStream = null;
try {
inputStream = new ObjectInputStream(socket.getInputStream());
// deserialize
RpcRequest rpcRequest = (RpcRequest) inputStream.readObject();
// The intermediate proxy executes the target method
Mediator mediator = Mediator.getInstance();
Object response = mediator.processor(rpcRequest);
System.out.println("Server execution result:"+response);
outputStream = new ObjectOutputStream(socket.getOutputStream());
outputStream.writeObject(response);
outputStream.flush();
} catch (Exception e) {
e.printStackTrace();
} finally{ closeStream(inputStream, outputStream); }}private void closeStream(ObjectInputStream inputStream, ObjectOutputStream outputStream) {
/ / close the flow
if(inputStream! =null) {try {
inputStream.close();
} catch(IOException e) { e.printStackTrace(); }}if(outputStream! =null) {try {
outputStream.close();
} catch(IOException e) { e.printStackTrace(); }}}Copy the code
Server agent
Mediator
/** * The proxy layer between the server socket and the target method **@author: * * * *@date: 2020/6/21 25 * /
public class Mediator {
/** The instance used to store the published service (the route to the service invocation) */
public static Map<String, BeanMethod> ROUTING = new ConcurrentHashMap<>();
/** The singleton creates the proxy layer instance */
private volatile static Mediator instance;
private Mediator(a) {}public static Mediator getInstance(a) {
if (instance == null) {
synchronized (Mediator.class) {
if (instance == null) {
instance = newMediator(); }}}return instance;
}
public Object processor(RpcRequest rpcRequest) {
/ / routing key
String routingKey = rpcRequest.getClassName() + "." + rpcRequest.getMethodName();
BeanMethod beanMethod = ROUTING.get(routingKey);
if (beanMethod == null) {
return null;
}
// Execute the target method
Object bean = beanMethod.getBean();
Method method = beanMethod.getMethod();
try {
return method.invoke(bean, rpcRequest.getArgs());
} catch (Exception e) {
e.printStackTrace();
}
return null; }}Copy the code
BeanMethod
/** * When the middle layer reflection calls, stores the target method, the target class entity **@author: * * * *@date: 2020/6/21 departed * /
public class BeanMethod {
private Object bean;
private Method method;
// Skip setter and getter
}
Copy the code
Client project
The project structure
Service discovery
The service finds that we also use annotations to do this. We need to follow the @AutoWired principle in Spring to customize the @RPcreference annotation, define it on the field, and inject the interface implementation’s proxy class into that field.
@RpcReference
annotations
/** * Service injection annotation **@author: * * * *@date: 2020/6/20 22:41 * /
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcReference {
Copy the code
Service discovery ClassReferenceInvokeProxy
Before spring container initialization, scan all fields of the @RPCreference annotation tag in the bean.
/** * Remotely dynamically invoke the service proxy **@author: * * * *@date: 2020/6/20 agony * /
@Component
public class ReferenceInvokeProxy implements BeanPostProcessor {
@Autowired
private RemoteInvocationHandler invocationHandler;
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
Field[] fields = bean.getClass().getDeclaredFields();
for (Field field : fields) {
if (field.isAnnotationPresent(RpcReference.class)) {
field.setAccessible(true);
// Set the RpcReference annotated field to a proxy value
Object proxy = Proxy.newProxyInstance(field.getType().getClassLoader(), newClass<? >[]{field.getType()}, invocationHandler);try {
// Set up a proxy for the annotation with RpcReference. The implementation of this proxy is invocationHandler
field.set(bean, proxy);
} catch(IllegalAccessException e) { e.printStackTrace(); }}}returnbean; }}Copy the code
Client agent
Client dynamic proxyInvocationHandler
The implementation classRemoteInvocationHandler
Encapsulate the target method name, target class name, and parameter information into RpcRequest, and then hand it to the socket to send to the server.
/ * * *@author: * * * *@date: 2020/6/20 ahaziah * /
@Component
public class RemoteInvocationHandler implements InvocationHandler {
@Autowired
private RpcNetTransport rpcNetTransport;
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setClassName(method.getDeclaringClass().getName());
rpcRequest.setMethodName(method.getName());
rpcRequest.setTypes(method.getParameterTypes());
rpcRequest.setArgs(args);
returnrpcNetTransport.send(rpcRequest); }}Copy the code
The client socket
Network transmissionRpcNetTransport
/** * RPC socket network transmission **@author: * * * *@date: 2020/6/20 space of * /
@Component
public class RpcNetTransport {
@Value("${rpc.host}")
private String host;
@Value("${rpc.port}")
private int port;
public Object send(RpcRequest rpcRequest) {
ObjectOutputStream outputStream = null;
ObjectInputStream inputStream = null;
try {
Socket socket = new Socket(host, port);
// Send target method information
outputStream = new ObjectOutputStream(socket.getOutputStream());
outputStream.writeObject(rpcRequest);
outputStream.flush();
// Receive the return value
inputStream = new ObjectInputStream(socket.getInputStream());
return inputStream.readObject();
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
} finally {
closeStream(inputStream, outputStream);
}
return null;
}
private void closeStream(ObjectInputStream inputStream, ObjectOutputStream outputStream) {
/ / close the flow
if(inputStream! =null) {try {
inputStream.close();
} catch(IOException e) { e.printStackTrace(); }}if(outputStream! =null) {try {
outputStream.close();
} catch(IOException e) { e.printStackTrace(); }}}}Copy the code