Project 1.0 version source code
https://github.com/wephone/MeiZhuoRPC/tree/1.0
In the last blog with you about the implementation of RPC train of thought, after all, just train of thought so this article with the source code to explain to you all the specific problems in the implementation process
If you don’t have a clear understanding of the basic knowledge required to read this article, please read it yourself
- Java dynamic proxy
- Basic use of netty framework
- Basic Spring configuration
The use of the final project is as follows
/ * * * call server-side code and spring configuration * / @ RunWith SpringJUnit4ClassRunner. Class @ ContextConfiguration (locations = {"file:src/test/java/rpcTest/ClientContext.xml"})
public class Client {
@Test
public void start(){
Service service= (Service) RPC.call(Service.class);
System.out.println("Test Integer,Double input return String object :"+ service. StringMethodIntegerArgsTest (233666.66)); Public interface Service {String} public interface Service {String stringMethodIntegerArgsTest(Integer a,Double b); */ public class ServiceImpl implements Service {@override public String stringMethodIntegerArgsTest(Integer a, Double b) {return "String"+a+b; }}Copy the code
Version 1.0 comes in 3 packages
- The Client calls the end
- Server implementation side
- Core Core method
Look at this code first
The calling side simply calls the definition interface and all the methods in the interface that are passed in after the interface class type are implemented by the implementation side
Service service= (Service) RPC.call(Service.class);
Copy the code
All this does is generate a dynamic proxy on the calling side
/** * expose static methods used by the caller to generate dynamic proxy objects for the abstract interface * TODO consider later optimization is not in use but still need to be strong * @param CLS abstract interface class type * @return*/ public static Object call(Class CLS){RPCProxyHandler handler=new RPCProxyHandler(); Object proxyObj=Proxy.newProxyInstance(cls.getClassLoader(),new Class<? >[]{cls},handler);return proxyObj;
}
Copy the code
RPCProxyHandler This invoke is executed every time a method is called after a dynamic proxy method is called
@param proxy @param method @param args * @param proxy @param method @param args * @param proxy @param method @param argsreturn* @throws Throwable */ @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { RPCRequest request=new RPCRequest(); request.setRequestID(buildRequestID(method.getName())); request.setClassName(method.getDeclaringClass().getName()); Request.setmethodname (method.getName())); // Return a Class object representing the Class or interface that declares the Method represented by this Method object. // request.setParameterTypes(method.getParameterTypes()); // Return the parameter type request.setParameters(args); / / input arguments RPCRequestNet. RequestLockMap. Put (request. GetRequestID (), the request). RPCRequestNet.connect().send(request); / / called after removing the corresponding condition RPCRequestNet. The mapping relationship between requestLockMap. Remove (request) getRequestID ());returnrequest.getResult(); // Return result of target method}Copy the code
The requestLockMap is used to collect information about the interface to be called and send it to the implementation
- Because our network calls are all asynchronous
- However, RPC calls are synchronized until the remote method returns completely
- So the request object for each request is used as an object lock. After each request is sent, the lock is held until the network asynchronous call returns
- Generate the ID of each request and here I’m using a random number with a timestamp
- The request ID and the request object are maintained in a static global map. The implementation side corresponds to the request by ID
- After the asynchronous call is returned, the thread whose ID notify is used to wake up the requesting object is called. Netty the asynchronous call is returned to release the object lock
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String responseJson= (String) msg; RPCResponse response= (RPCResponse) RPC.responseDecode(responseJson); Synchronized (RPCRequestNet. RequestLockMap. Get (response. GetRequestID ())) {/ / wake lock on the objectwaitThread RPCRequest request = (RPCRequest) RPCRequestNet. RequestLockMap. Get (response) getRequestID ()); request.setResult(response.getResult()); request.notifyAll(); }}Copy the code
Rpcrequestnet.connect ().send(request); The RPCRequestNet constructor uses netty to connect TCP to the implementation end. The send method is as follows
Try {// Determines whether the connection is complete. Blocks only when the connection is startedif(RPCRequestHandler.channelCtx==null){ connectlock.lock(); System.out.println("Waiting for connection implementation end"); connectCondition.await(); connectlock.unlock(); } String requestJson= null; try { requestJson = RPC.requestEncode(request); } catch (JsonProcessingException e) { e.printStackTrace(); } ByteBuf requestBuf= Unpooled.copiedBuffer(requestJson.getBytes()); RPCRequestHandler.channelCtx.writeAndFlush(requestBuf); System.out.println("Call"+request.getRequestID()+"Sent"); Synchronized (request) {// Abandon the object lock and block notify request.wait(); } System.out.println("Call"+request.getRequestID()+"Received complete");
} catch (InterruptedException e) {
e.printStackTrace();
}
Copy the code
Condition and lock are also used to synchronously wait for the return of asynchronous IO. The send method is basically used to encode and decode JSON and send it to the implementation end
The calling end basically implements the synchronization lock sent by the agent mentioned above
The following is the use and implementation of the server side
/ * * * implementing server-side code and spring configuration * / @ RunWith SpringJUnit4ClassRunner. Class @ ContextConfiguration (locations = {"file:src/test/java/rpcTest/ServerContext.xml"})
public class Server {
@Test
public void start(){// Spring can be started after the container is loaded in case rpc.start (); }}Copy the code
Start () is used to start the Netty server. The server side handles the client message callback as follows
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException {
String requestJson= (String) msg;
System.out.println("receive request:"+requestJson); RPCRequest request= RPC.requestDeocde(requestJson); Object result=InvokeServiceUtil.invoke(request); Ctx.write (resp); // Ctx.write (resp); // Ctx.write (resP); // Ctx.write (resP); RPCResponse Response =new RPCResponse(); response.setRequestID(request.getRequestID()); response.setResult(result); String respStr=RPC.responseEncode(response); ByteBuf responseBuf= Unpooled.copiedBuffer(respStr.getBytes()); ctx.writeAndFlush(responseBuf); }Copy the code
Mainly codec JSON reflection corresponding methods we look at reflection utility class
/** * reflection calls the corresponding implementation class and results in * @param request * @return*/ public static Object invoke(RPCRequest request){ Object result=null; String implClassName= rpc.getServerConfig ().getServerImplMap().get(request.getClassName()); try { Class implClass=Class.forName(implClassName); Object[] parameters=request.getParameters(); int parameterNums=request.getParameters().length; Class[] parameterTypes=new Class[parameterNums];for (int i = 0; i <parameterNums ; i++) {
parameterTypes[i]=parameters[i].getClass();
}
Method method=implClass.getDeclaredMethod(request.getMethodName(),parameterTypes);
Object implObj=implClass.newInstance();
result=method.invoke(implObj,parameters);
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (NoSuchMethodException e) {
e.printStackTrace();
} catch (InstantiationException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
return result;
}
Copy the code
Parses Parameters getClass to get their class type reflected by calling the corresponding method
There’s one thing to notice here
- This article originally used Gson to handle JSON, but by default Gson converts an int to a double, such as 2 to 2.0, which is not appropriate for this scenario and I don’t want to specifically adapt
- So they switched to Jackson
- When common JSON processing frameworks deserialize objects, basic types such as int and Long become their wrapper class Integer Long
- So in this example, the parameters of the remote scheduling interface method cannot use basic types such as int
- Otherwise method. Invoke (implObj, parameters); The corresponding method cannot be found to report an error
- Because parameters is already a wrapper class and Method is still an int, there is no corresponding method
Finally, with the help of spring configuration base configuration, I wrote two classes ServerConfig ClientConfig as the configuration of the calling side and the server side. Just configure these two beans in Spring and start the IOC container
Calls to end
<? xml version="1.0" encoding="UTF-8"? > <beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean class="org.meizhuo.rpc.client.ClientConfig">
<property name="host" value="127.0.0.1"></property>
<property name="port" value="9999"></property>
</bean>
</beans>
Copy the code
achieve
<? xml version="1.0" encoding="UTF-8"? > <beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean class="org.meizhuo.rpc.server.ServerConfig">
<property name="port" value="9999"></property>
<property name="serverImplMap"> <map> <! -- Configure the corresponding abstract interface and its implementation --> <entry key="rpcTest.Service" value="rpcTest.ServiceImpl"></entry>
</map>
</property>
</bean>
</beans>
Copy the code
There’s one last little problem
Our framework is introduced as a dependency package. We can’t read the corresponding Spring XML in our framework. This completely takes away the flexibility of the framework The answer is the ApplicationContextAware interface provided by Spring
/** * Created by wephone on 17-12-26. */ public class ClientConfig implements ApplicationContextAware { private String host; private int port; // Call timeout time private long overtime; public StringgetHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public long getOvertime() {
return overtime;
}
public void setOvertime(long overtime) { this.overtime = overtime; } /** * When loading the Spring configuration file, if the Bean class defined in the Spring configuration file * implements the ApplicationContextAware interface * then when loading the Spring configuration file, * @param applicationContext * @throws BeansException */ @Override public void is automatically calledsetApplicationContext(ApplicationContext applicationContext) throws BeansException { RPC.clientContext=applicationContext; }}Copy the code
So we maintain a static IOC container context inside the RPC class by simply getting the configuration RPC.getServerConfig().getPort()
public static ServerConfig getServerConfig() {return serverContext.getBean(ServerConfig.class);
}
Copy the code
That concludes the core of the RPC framework
This routine is only version 1.0. Later blogs will add exception handling zooKeeper supports load balancing policies. Zookeeper supports star Issue