preface
Before a summary
In the last article we briefly covered what RPC is, the three procedures, why we need it, its features and scenarios, the flow and protocol definition of RPC, and a little bit about its framework. Theories often make people drowsy and confused. It would be much easier to understand if you could combine it with some code
Previous link
High concurrency from scratch (part 1) – Basic concepts of Zookeeper
High concurrency from zero (2) – Zookeeper implementation of distributed lock
High concurrency from zero (3) : Establishment of Zookeeper cluster and leader election
High concurrency from zero (4) – Distributed queue of Zookeeper
High concurrency from zero (5) – Configuration center application of Zookeeper
High concurrency from scratch (6) – Zookeeper’s Master election
High concurrency from zero (7) – introduction, protocol and framework of RPC
Content 1: RPC process and task
1. RPC process
In fact, this was already mentioned in 2 – ① in the last article, if you forget, it doesn’t matter, I will copy it again
Stub: A stub in distributed computing is a piece of code that transforms parameters passed between Client and server during a remote procedure call
1. The Client stub is called during client processing (just like a local method), passing in parameters
2. The Client stub groups parameters into messages and sends messages to the server through system calls
3. The local operating system of the client sends messages from the client to the server
4. The server operating system forwards the received data packets to the Client stub
5. The server stub ungroup message is used as a parameter
6. The server stub invokes the process on the server. The process execution result is sent to the client in the same step in the opposite direction
2. Start from the user’s perspective
Define a process interface. 2. Implement the entire process of the interface on the server. 3Copy the code
Content 2: The design and implementation of RPC framework
1. Prepare a Student entity class and base interface
The client generates proxy objects for the process interface by designing a client proxy factory and using JDK dynamic proxies to generate proxy objects for the interface
① Define a StudentService interface
Student class name (String), age (int), sex (String), getters, setters, and toString
Public interface StudentService {/** * getInfo * @return */ public Student getInfo(); Public Boolean printInfo(student student); public Boolean printInfo(student student); }Copy the code
And provide a simple implementation, in fact, is to print a Student information out
@Service(StudentService.class) public class StudentServiceImpl implements StudentService { public Student getInfo() { Student person = new Student(); person.setAge(25); Person.setname (" Say what you wish ~"); Person. SetSex (" male "); return person; } public boolean printInfo(Student person) { if (person ! = null) { System.out.println(person); return true; } return false; } public static void main(String[] args) { new Thread(()->{ System.out.println("111"); }).start();; }}Copy the code
2. Set up the client
① Learn what you need from the test class
First, the client gets the proxy class for StudentService from our local proxy. There is no local implementation for StudentService, so we give the address directly
Public class ClientTest {@test public void Test () { RpcClientProxy proxy = new RpcClientProxy("192.168.80.1", 9998); StudentService service = proxy.getProxy(StudentService.class); System.out.println(service.getInfo()); Student student = new Student(); student.setAge(23); student.setName("hashmap"); Student. SetSex (" male "); System.out.println(service.printInfo(student)); }}Copy the code
At this point our focus turns to how does the client help us broker
② RpcClientProxy implements the InvocationHandler interface
/** * RpcClientProxy */ public class RpcClientProxy implements InvocationHandler{private String host; Private int port; Public RpcClientProxy(String host, int port){this.host = host; this.port = port; } /** * generates a proxy object for the business interface, which does something in the invoke method. * @param Clazz Proxy type (interface) * @return */ @suppressWarnings ("unchecked") public <T> T getProxy(Class<T> clazz){// clazz Return (T) proxy.newproxyInstance (clazz.getClassLoader(), new Class<? >[]{ clazz }, RpcClientProxy.this); } /** * Dynamic proxies do things where the implementation of the interface is not local. In other processes on the network, we invoke remote services through objects that implement Rpc clients. */ public Object invoke(Object obj, Method method, Object[] params) throws Throwable {// Before calling system.out.println (" You can do something before executing a remote method "); RpcRequest request = new RpcRequest(); request.setClassName(method.getDeclaringClass().getName()); request.setMethodName(method.getName()); request.setParamTypes(method.getParameterTypes()); request.setParams(params); RpcClient client = new RpcClient(); Object rst = client.start(request, host, port); // After system.out.println (" After executing a remote method, you can also do something "); return rst; }}Copy the code
The JDK provides a Proxy Class to implement dynamic proxies. NewProxyInstance (ClassLoader var0, Class<? >[] var1, InvocationHandler var2) to instantiate a proxy object. Clazz must be an interface. If it is not, JDK dynamic proxies cannot be used
The third argument rpCClientProxy.this is the newProxyInstance() method that creates the instance for us, but the action must be provided by the InvocationHandler
Invoke (Object VAR1, Method VAR2, Object[] VAR3) throws Throwable The first is the proxy object, the second is the method to execute, and the third is the required set of parameters
Going back to our code, when I execute system.out.println (service.getInfo()), our logic jumps to the invoke() implementation, which is specified in the invoke() method comment, First we need to call the remote service to encapsulate a parameter, then we need to make a network connection to send these parameters to our server, at this time we need to use RpcClient
(3) RpcClient
In the start() method, our RpcRequest request implements Serializable interface, so the encapsulated data will be converted to a binary and then flushed (). At this point, the message has been sent and we need to wait for the server response. In response we need to receive an input stream through our server ObjectOutputStream
/** * RpcClient * Rpc client, representing the business code as the client, making requests to the remote service. */ public class RpcClient {/** * through network IO, open remote service connection, write request data to network, and get response result. * * @param Request Request data to be sent * @param host Remote service domain name or IP address * @param port Remote service port number * @return Server response result * @throws Throwable Public Object start(RpcRequest Request, String host, Int port) throws Throwable{// Open the remote service connection. Socket server = new Socket(host, port); ObjectInputStream oin = null; ObjectOutputStream oout = null; Oout = new ObjectOutputStream(server.getOutputStream()); oout.writeObject(request); oout.flush(); Oin = new ObjectInputStream(server.getinputStream ()); oin = new ObjectInputStream(server.getinputStream); Object res = oin.readObject(); RpcResponse response = null; if(! (res instanceof RpcResponse)){throw new InvalidClassException(" return parameter incorrect, should be: "+ rpCresponse.class +" type "); }else{ response = (RpcResponse) res; If (response.geterror ()! = null){// The server raises an exception throw response.geterror (); } return response.getResult(); }finally{try {// Clean up the resource, close the stream if(oin! = null) oin.close(); if(oout ! = null) oout.close(); if(server ! = null) server.close(); } catch (IOException e) { e.printStackTrace(); }}}}Copy the code
(4) RpcRequest for parameter encapsulation
/** * RpcRequest * Rpc request object that requests the content of the remote service to be transmitted over the network. */ public class RpcRequest implements Serializable{// Private String className; // request methodName private String methodName; // Request method parameter type private Class<? >[] paramTypes; Private Object[] params; public String getClassName() { return className; } public void setClassName(String className) { this.className = className; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } public Class<? >[] getParamTypes() { return paramTypes; } public void setParamTypes(Class<? >[] paramTypes) { this.paramTypes = paramTypes; } public Object[] getParams() { return params; } public void setParams(Object[] params) { this.params = params; }}Copy the code
⑤ THE Rpc server responds to the result by wrapping RpcResponse
It also implements the JDK default Serializable
/** * RpcResponse * Rpc server responds to the result wrapper class and transmits it over the network. */ public class RpcResponse implements Serializable {// Private Throwable error; Private Object result; public Throwable getError() { return error; } public void setError(Throwable error) { this.error = error; } public Object getResult() { return result; } public void setResult(Object result) { this.result = result; }}Copy the code
3. Set up the server
① Server simulation ServerTest
public class ServerTest {
@Test
public void startServer() {
RpcServer server = new RpcServer();
server.start(9998, "rpc.simple.RpcServer");
}
public static void main(String[] args) {
}
}
Copy the code
Given a port number, the parameter has a packet, the function is to scan a packet under the service
② Implementation of start() method
Create a map-type collection services to hold classes that are scanned to provide RPC services, where there is no addressing because they are not in the registry. It will be placed in the ZooKeeper registry later
ServerTest = ServerTest (); ServerTest = ServerTest (); ServerTest = ServerTest (); ServerTest = ServerTest (); If the directory has a problem, you can throw an exception, if no problem, started to traverse the all files in the directory, traverse out if it is found that this file is the result of the class files, instantiate, and judge whether there is a custom annotation @ service, marking the annotation of class is the implementation of the RPC service classes. If the annotation exists, that’s the RPC service we’re looking for, so put it in a result set called CLASSES. If the directory is still a directory, call yourself until you see the class file
When we find all the classes, we go back to getService() and put them all in a classList. Then we Map them, using the interface name as the key. Treat the instance as value(services.put(cla.getannotation (service.class).value().getName(), obj)).
Finally, back to start(), there is an RpcServerHandler after the service scan
/** * RpcServer * Rpc service provider */ public class RpcServer {/** ** starts the specified network port service and listens for the request data on the port. Once the request data is obtained, the request information is delegated to the service handler for execution in the thread pool. * @param clazz service class package name, */ public void start(int port, String clazz) {ServerSocket server = null; Try {// 1. Create socket connection server = new ServerSocket(port); Map<String, Object> services = getService(clazz); Executor = new ThreadPoolExecutor(5, 10, 10, timeunit.seconds, new LinkedBlockingQueue<Runnable>()); While (true){// 4. Socket client = server.accept(); RpcServerHandler service = new RpcServerHandler(client, services); executor.execute(service); } } catch (IOException e) { e.printStackTrace(); }finally{// Close listener if(server! = null) try { server.close(); } catch (IOException e) { e.printStackTrace(); }}} /** * instantiates all RPC service classes, which can also be used to expose service information to the registry. * @param clazz service class package name, * @return */ public Map<String,Object> getService(String clazz){try {Map<String, Object> services = new HashMap<String, Object>(); String[] clazzes = clazz.split(","); List<Class<? >> classes = new ArrayList<Class<? > > (); for(String cl : clazzes){ List<Class<? >> classList = getClasses(cl); classes.addAll(classList); } // loop instantiate for(Class<? > cla:classes){ Object obj = cla.newInstance(); services.put(cla.getAnnotation(Service.class).value().getName(), obj); } return services; } catch (Exception e) { throw new RuntimeException(e); @param pckgname @return @throws ClassNotFoundException */ public static List<Class<? >> getClasses(String pckgname) throws ClassNotFoundException {List<Class<? >> classes = new ArrayList<Class<? > > (); // find the specified package directory File directory = null; try { ClassLoader cld = Thread.currentThread().getContextClassLoader(); If (CLD == null) throw new ClassNotFoundException(" failed to get ClassLoader"); String path = pckgname.replace('.', '/'); URL resource = cld.getResource(path); If (resource == null) throw new ClassNotFoundException(" no such resource: "+ path); directory = new File(resource.getFile()); } catch (NullPointerException x) {throw new ClassNotFoundException(pckgname + "(" + directory + ") not a valid resource "); } if (directory.exists()) {String[] files = directory.list(); File[] fileList = directory.listFiles(); For (int I = 0; fileList ! = null && i < fileList.length; i++) { File file = fileList[i]; If (file.isfile () && file.getName().endswith (".class")) {Class<? > clazz = Class.forName(pckgname + '.' + files[i].substring(0, files[i].length() - 6)); if(clazz.getAnnotation(Service.class) ! = null){ classes.add(clazz); }}else if(file.isdirectory ()){// List<Class<? >> result = getClasses(pckgname+"."+file.getName()); if(result ! = null && result.size() ! = 0){ classes.addAll(result); }}}} else{throw new ClassNotFoundException(pckgName + "not a valid package name "); } return classes; }}Copy the code
③ RpcServerHandler for processing
Very similar to RpcClient, it is a serialization and deserialization process, mainly the process of invoking () method and putting the result into response after obtaining the instance and method and its parameters in step 3
/** * RpcServerHandler * Server request processing, processing the service request from the network IO, and the result to the network IO. */ public class RpcServerHandler implements Runnable {private socket clientSocket; Private Map<String, Object> serviceMap; /** * @param client socket * @param services */ public RpcServerHandler(socket client, Map<String, Object> services) { this.clientSocket = client; this.serviceMap = services; } /** * Read the network client request information, find the request method, execute the local method to obtain the result, write network IO output. * */ public void run() { ObjectInputStream oin = null; ObjectOutputStream oout = null; RpcResponse response = new RpcResponse(); Try {/ / 1. Access to and operation flow for oin = new ObjectInputStream (clientSocket. GetInputStream ()); oout = new ObjectOutputStream(clientSocket.getOutputStream()); Object param = oin.readObject(); RpcRequest request = null; if(! (Param instanceof RpcRequest)){response.seterror (new Exception(" parameter error ")); oout.writeObject(response); oout.flush(); return; }else{// deserialize RpcRequest request = (RpcRequest) param; Object service = Servicemap. get(request.getClassName()); Class<? > clazz= service.getClass(); Method method = clazz.getMethod(request.getMethodName(), request.getParamTypes()); Object result = method.invoke(service, request.getParams()); // 4. Return RPC response, serialize RpcResponse response.setResult(result); // serialize the result oout.writeObject(response); oout.flush(); return; } catch (Exception e) {try {// if(oout! = null){ response.setError(e); oout.writeObject(response); oout.flush(); } } catch (Exception e1) { e1.printStackTrace(); } return; }finally{try {// Close the stream if(oin! = null) oin.close(); if(oout ! = null) oout.close(); if(clientSocket ! = null) clientSocket.close(); } catch (IOException e) { e.printStackTrace(); }}}}Copy the code
4. Running result
Open ServerTest first and then ClientTest, easy and quick, do not right-click to run the main method
Content 3: Client optimization measures
1. Introduction of discoverers
Design on the client side, two things need to be completed in the ClientStubInvocationHandler for marshalling network message and send the request, and the organization of the content of the request for the message it is to the client stub proxy, it besides message protocol and network layer of the transaction, may also have a service information discovery, In addition, the messaging protocol may also change, and we need to support multiple protocols, which is actually related to the breadth of protocol support by the framework. Dubbo, for example, has more flexible protocol support than Spring Cloud
At this point we need to know what protocol a service is using, so we need to introduce a service finder, right
2. The protocol layer
We want to support multiple protocols, how should the class be designed (interface oriented, policy pattern, composition)
At this point, our protocol needs to be abstracted, and the content of the protocol needs to be marshalled and unmarshalled. For example, the two different implementations of JSON and HTTP are provided above. At this point, the stub of the client side not only needs the service discoverer, but also needs our support for this protocol
① Supplementary: How to obtain registration information from ZooKeeper
The regist() method is used to concatenate the service information during registration and create temporary nodes with persistent parent nodes. ServicePath is a directory structure similar to dubbo, a root directory/RPC +service name serviceName+service, loadServiceResouces() is also not difficult, according to these addresses to obtain their child nodes, Load all urls to the caller
public class RegistCenter { ZkClient client = new ZkClient("localhost:2181"); private String centerRootPath = "/rpc"; public RegistCenter() { client.setZkSerializer(new MyZkSerializer()); } public void regist(ServiceResource serviceResource) { String serviceName = serviceResource.getServiceName(); String uri = JsonMapper.toJsonString(serviceResource); try { uri = URLEncoder.encode(uri, "UTF-8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } String servicePath = centerRootPath + "/"+serviceName+"/service"; if(! client.exists(servicePath)) { client.createPersistent(servicePath, true); } String uriPath = servicePath+"/"+uri; client.createEphemeral(uriPath); } /** * load ServiceResource information in the configuration center * @param serviceName * @return */ public List<ServiceResource> loadServiceResouces(String) serviceName) { String servicePath = centerRootPath + "/"+serviceName+"/service"; List<String> children = client.getChildren(servicePath); List<ServiceResource> resources = new ArrayList<ServiceResource>(); for(String ch : children) { try { String deCh = URLDecoder.decode(ch, "UTF-8"); ServiceResource r = JsonMapper.fromJsonString(deCh, ServiceResource.class); resources.add(r); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } return resources; } private void sub(String serviceName, ChangeHandler handler) { /* String path = centerRootPath + "/"+serviceName+"/service"; client.subscribeChildChanges(path, new IZkChildListener() { @Override public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception { handler(); }}); client.subscribeDataChanges(path, new IZkDataListener() { @Override public void handleDataDeleted(String dataPath) throws Exception { handler(); } @Override public void handleDataChange(String dataPath, Object data) throws Exception { handler(); }}); */} interface ChangeHandler {@param resource */ void itemChange(ServiceResource resource); }}Copy the code
(2) ClientStubProxyFactory
/** * ClientStubProxyFactory */ public class ClientStubProxyFactory {private ServiceInfoDiscoverer sid; private Map<String, MessageProtocol> supportMessageProtocols; private NetClient netClient; private Map<Class<? >, Object> objectCache = new HashMap<>(); /** * * * @param <T> * @param interf * @return */ @SuppressWarnings("unchecked") public <T> T getProxy(Class<T> interf) { T obj = (T) this.objectCache.get(interf); if (obj == null) { obj = (T) Proxy.newProxyInstance(interf.getClassLoader(), new Class<? >[] { interf }, new ClientStubInvocationHandler(interf)); this.objectCache.put(interf, obj); } return obj; } public ServiceInfoDiscoverer getSid() { return sid; } public void setSid(ServiceInfoDiscoverer sid) { this.sid = sid; } public Map<String, MessageProtocol> getSupportMessageProtocols() { return supportMessageProtocols; } public void setSupportMessageProtocols(Map<String, MessageProtocol> supportMessageProtocols) { this.supportMessageProtocols = supportMessageProtocols; } public NetClient getNetClient() { return netClient; } public void setNetClient(NetClient netClient) { this.netClient = netClient; ClientStubInvocationHandler} / * * * * the client stub proxy call * @ date on April 12, 2019 afternoon 2:38:30 * / private class ClientStubInvocationHandler implements InvocationHandler { private Class<? > interf; public ClientStubInvocationHandler(Class<? > interf) { super(); this.interf = interf; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {// 1. Obtain service information. String serviceName = this.interf.getName(); ServiceInfo sinfo = sid.getServiceInfo(serviceName); If (sinfo == null) {throw new Exception(" Remote service does not exist!" ); } req = new request (); req.setServiceName(sinfo.getName()); req.setMethod(method.getName()); req.setPrameterTypes(method.getParameterTypes()); req.setParameters(args); Marshalling / / / / 3, protocol layer to obtain the corresponding protocol MessageProtocol protocol = supportMessageProtocols. Get (sinfo. GetProtocol ()); / / marshalling the request byte [] data = protocol. The marshallingRequest (the req); Byte [] repData = netClient.sendrequest (data, sinfo); byte[] repData = netClient.sendrequest (data, sinfo); / / 5 unmarshall Response message Response RSP = protocol. UnmarshallingResponse (repData); If (rsp.getException()! = null) { throw rsp.getException(); } return rsp.getReturnValue(); }}}Copy the code
The ServiceInfo getServiceInfo(String Name) method is used to obtain remote service information based on the service name. We also define an interface for supportMessageProtocols and MessageProtocol. This interface needs to be more detailed, encoded into two-level system and decoded into Request, etc. The same process applies to response
/** * MessageProtocol */ public interface MessageProtocol {/** * @param req * @return */ byte[] marshallingRequest(Request req); /** * unmarshallingRequest message * @param data * @return */ Request unmarshallingRequest(byte[] data); /** * marshallingResponse message * @param RSP * @return */ byte[] marshallingResponse(Response RSP); /** * unmarshallingResponse message * @param data * @return */ Response unmarshallingResponse(byte[] data); }Copy the code
At this time, there are some problems, it is not enough to simply rely on the method of marshalling and unmarshalling. The operation objects of marshalling and unmarshalling are request and response, but their contents are different. At this time, we need to define the framework standard request and response class
Request has the service name, service method, message header, parameter type, and parameter. Similarly, response has the state (via enumeration), message header, return value and type, and whether there is an exception.
At this point, the protocol layer expands to four methods
Separate the messaging protocol into a layer that both clients and servers need to use
3. The network layer
The main work of the network layer is to send the request and get the response. If we need to initiate the network request, we must know the service address first. At this time, we use the serviceInfo object as the required dependency in the figure below. Both BIO and Netty implementations are presented
So we need three dependencies, one is the service discoverer, one is the protocol support, and then is our network layer NetClient
4. General
Purple represents the client proxy portion, light green for service discovery, and light blue for protocol
5. Code part (can be ignored directly)
Because this code has nothing to do with the main idea, just some functional code, it can be ignored. If you really want to run yourself, you can also ask me for a demo.
① Still back to our ClientStubProxyFactory
It can be compared with RpcClientProxy in content 2, adding three dependencies on ServiceInfoDiscoverer, supportMessageProtocols and netClient on the original basis
In ClientStubProxyFactory, a cache is made for objects. If the cache already exists, it is returned directly. If not, it is added to the cache and then new.
② The invoke() method is changed
@Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {// 1. Obtain service information. String serviceName = this.interf.getName(); ServiceInfo sinfo = sid.getServiceInfo(serviceName); If (sinfo == null) {throw new Exception(" Remote service does not exist!" ); } req = new request (); req.setServiceName(sinfo.getName()); req.setMethod(method.getName()); req.setPrameterTypes(method.getParameterTypes()); req.setParameters(args); Marshalling / / / / 3, protocol layer to obtain the corresponding protocol MessageProtocol protocol = supportMessageProtocols. Get (sinfo. GetProtocol ()); / / marshalling the request byte [] data = protocol. The marshallingRequest (the req); Byte [] repData = netClient.sendrequest (data, sinfo); byte[] repData = netClient.sendrequest (data, sinfo); / / 5, ungroup Response message Response RSP = protocol. UnmarshallingResponse (repData); If (rsp.getException()! = null) { throw rsp.getException(); } return rsp.getReturnValue(); }Copy the code
The first is service discovery. When we execute getProxy() method mentioned in ①, the interface of the proxy has been directly told us, so we directly get the interface information interf, and then call getName() method to get the name of the interface. Call the getServiceInfo() method provided by ServiceInfo to get the details of the service, put them in the request parameter request, and then assign values to the properties of the request
After that, we began to look for the protocol corresponding to the service. After obtaining the protocol, we could obtain the object supported by the protocol. After that, we organized the request, converted it into binary, and sent it through netClient, along with the information of the server side. Obtain the result repData for ungrouping (binary back to Response), and then proceed with the result processing.
③ Realization of service finder
As mentioned earlier, the service finder ServiceInfoDiscoverer provides the getServiceInfo() method as an interface
There are two different implementations, local implementations where we can make our own configuration file and load it in, and get the service information in there
The zooKeeper service discovery implementation is as follows, similar to the ZooKeeper supplement we initially added in 2 – ①
public class ZookeeperServiceInfoDiscoverer implements ServiceInfoDiscoverer { ZkClient client = new ZkClient("localhost:2181"); private String centerRootPath = "/rpc"; public ZookeeperServiceInfoDiscoverer() { client.setZkSerializer(new MyZkSerializer()); } public void regist(ServiceInfo serviceResource) { String serviceName = serviceResource.getName(); String uri = JSON.toJSONString(serviceResource); try { uri = URLEncoder.encode(uri, "UTF-8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } String servicePath = centerRootPath + "/"+serviceName+"/service"; if(! client.exists(servicePath)) { client.createPersistent(servicePath, true); } String uriPath = servicePath+"/"+uri; client.createEphemeral(uriPath); } @param serviceName * @return */ public List<ServiceInfo> loadServiceResouces { String servicePath = centerRootPath + "/"+serviceName+"/service"; List<String> children = client.getChildren(servicePath); List<ServiceInfo> resources = new ArrayList<ServiceInfo>(); for(String ch : children) { try { String deCh = URLDecoder.decode(ch, "UTF-8"); ServiceInfo r = JSON.parseObject(deCh, ServiceInfo.class); resources.add(r); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } return resources; } @Override public ServiceInfo getServiceInfo(String name) { List<ServiceInfo> list = loadServiceResouces(name); ServiceInfo info = list.get(0); list.forEach((e)->{ if(e ! = info) { info.addAddress(e.getAddress().get(0)); }}); return info; }}Copy the code
(4) Protocol support
Only JSON is implemented here, via fastJSON
public class JSONMessageProtocol implements MessageProtocol { @Override public byte[] marshallingRequest(Request req) { Request temp = new Request(); temp.setServiceName(req.getServiceName()); temp.setMethod(req.getMethod()); temp.setHeaders(req.getHeaders()); temp.setPrameterTypes(req.getPrameterTypes()); if (req.getParameters() ! = null) { Object[] params = req.getParameters(); Object[] serizeParmas = new Object[params.length]; for (int i = 0; i < params.length; i++) { serizeParmas[i] = JSON.toJSONString(params[i]); } temp.setParameters(serizeParmas); } return JSON.toJSONBytes(temp); } @Override public Request unmarshallingRequest(byte[] data) { Request req = JSON.parseObject(data, Request.class); if(req.getParameters() ! = null) { Object[] serizeParmas = req.getParameters(); Object[] params = new Object[serizeParmas.length]; for(int i = 0; i < serizeParmas.length; i++) { Object param = JSON.parseObject(serizeParmas[i].toString(), Object.class); params[i] = param; } req.setParameters(params); } return req; } @Override public byte[] marshallingResponse(Response rsp) { Response resp = new Response(); resp.setHeaders(rsp.getHeaders()); resp.setException(rsp.getException()); resp.setReturnValue(rsp.getReturnValue()); resp.setStatus(rsp.getStatus()); return JSON.toJSONBytes(resp); } @Override public Response unmarshallingResponse(byte[] data) { return JSON.parseObject(data, Response.class); }}Copy the code
(5) NetClient related
BIO:
public class BioNetClient implements NetClient { @Override public byte[] sendRequest(byte[] data, ServiceInfo sinfo) { List<String> addressList = sinfo.getAddress(); int randNum = new Random().nextInt(addressList.size()); String address = addressList.get(randNum); String[] addInfoArray = address.split(":"); try { return startSend(data, addInfoArray[0], Integer.valueOf(addInfoArray[1])); } catch (Throwable e) { e.printStackTrace(); } return null; } /** * Through the network IO, open the remote service connection, write the request data to the network, and obtain the response result. * * @param requestData specifies the requestData to be sent. * @param host specifies the remote service domain name or IP address. * @param port specifies the remote service port number Private byte[] startSend(byte[] requestData, String host, Int port) throws Throwable{// Open the remote service connection. Socket serverSocket = new Socket(host, port); InputStream in = null; OutputStream out = null; Try {/ / 1. The service side output stream, written request data, send the request data out = serverSocket. GetOutputStream (); out.write(requestData); out.flush(); / / 2. The service side input stream, obtain return data, transformation parameters type / / similar to the deserialization process in = serverSocket. GetInputStream (); byte[] res = new byte[1024]; int readLen = -1; ByteArrayOutputStream baos = new ByteArrayOutputStream(); while((readLen = in.read(res)) > 0) { baos.write(res, 0, readLen); } return baos.toByteArray(); }finally{try {// Clean up the resource, close the stream if(in! = null) in.close(); if(out ! = null) out.close(); if(serverSocket ! = null) serverSocket.close(); } catch (IOException e) { e.printStackTrace(); }}}}Copy the code
Netty mode:
public class NettyNetClient implements NetClient { private SendHandler sendHandler; private Map<String, SendHandler> sendHandlerMap = new ConcurrentHashMap<String, SendHandler>(); @Override public byte[] sendRequest(byte[] data, ServiceInfo sinfo) { try { List<String> addressList = sinfo.getAddress(); int randNum = new Random().nextInt(addressList.size()); String address = addressList.get(randNum); String[] addInfoArray = address.split(":"); SendHandler handler = sendHandlerMap.get(address); if(handler == null) { sendHandler = new SendHandler(data); new Thread(()->{ try { connect(addInfoArray[0], Integer.valueOf(addInfoArray[1])); } catch (NumberFormatException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } }).start(); } byte[] respData = (byte[]) sendHandler.rspData(); return respData; } catch (NumberFormatException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } return null; } public void connect(String host, int port) throws Exception {// Configure the client EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); //EchoClientHandler handler = new EchoClientHandler(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(sendHandler); }}); ChannelFuture f = b.connection (host, port).sync(); F.channel ().closeFuture().sync(); Exceptionexception: exceptionexception (exceptionexception) {exception: exceptionexception (exceptionexception); }}}Copy the code
⑥ Running result
Can simulate a consumer and a producer to test, not posted here
finally
We’ll continue dubbo after that
High Concurrency from scratch (9) – Dubbo’s core functions and protocols