General call

Java handwritten RPC (01) based socket implementation from scratch

Java hand-written RPC (02) -Netty4 implements both client and server side from scratch

Java from scratch handwritten RPC (03) how to implement client call server?

Java handwritten RPC (04) – serialization from scratch

Our previous example was a fixed input and output parameter, fixed method implementation.

This section implements generic calls to make the framework more widely useful.

The basic idea

All method calls are implemented based on reflection.

The service side

Core classes

  • RpcServer

The adjustment is as follows:

serverBootstrap.group(workerGroup, bossGroup)
    .channel(NioServerSocketChannel.class)
    // Prints logs
    .handler(new LoggingHandler(LogLevel.INFO))
    .childHandler(new ChannelInitializer<Channel>() {
        @Override
        protected void initChannel(Channel ch) throws Exception {
            ch.pipeline()
            / / decoding bytes = > resp
            .addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)))
             // request=>bytes
             .addLast(new ObjectEncoder())
             .addLast(newRpcServerHandler()); }})// This parameter affects connections that have not yet been accepted
    .option(ChannelOption.SO_BACKLOG, 128)
    // The server will send an ACK packet to determine if the client is still alive after a period of time when the client does not respond.
    .childOption(ChannelOption.SO_KEEPALIVE, true);
Copy the code

ObjectDecoder and ObjectEncoder are netty built-in implementations.

RpcServerHandler

package com.github.houbb.rpc.server.handler;

import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.common.rpc.domain.RpcRequest;
import com.github.houbb.rpc.common.rpc.domain.impl.DefaultRpcResponse;
import com.github.houbb.rpc.server.service.impl.DefaultServiceFactory;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/ * * *@author binbin.hou
 * @since0.0.1 * /
public class RpcServerHandler extends SimpleChannelInboundHandler {

    private static final Log log = LogFactory.getLog(RpcServerHandler.class);

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        final String id = ctx.channel().id().asLongText();
        log.info("[Server] channel {} connected " + id);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        final String id = ctx.channel().id().asLongText();
        log.info("[Server] channel read start: {}", id);

        // Accept client requests
        RpcRequest rpcRequest = (RpcRequest)msg;
        log.info("[Server] receive channel {} request: {}", id, rpcRequest);

        // Write back to the client
        DefaultRpcResponse rpcResponse = handleRpcRequest(rpcRequest);
        ctx.writeAndFlush(rpcResponse);
        log.info("[Server] channel {} response {}", id, rpcResponse);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    /** * process request information *@paramRpcRequest requests information *@returnResult Information@since0.0.6 * /
    private DefaultRpcResponse handleRpcRequest(final RpcRequest rpcRequest) {
        DefaultRpcResponse rpcResponse = new DefaultRpcResponse();
        rpcResponse.seqId(rpcRequest.seqId());

        try {
            // Get the corresponding service implementation class
            // rpcRequest=>invocationRequest
            / / invoke
            Object result = DefaultServiceFactory.getInstance()
                    .invoke(rpcRequest.serviceId(),
                            rpcRequest.methodName(),
                            rpcRequest.paramTypeNames(),
                            rpcRequest.paramValues());
            rpcResponse.result(result);
        } catch (Exception e) {
            rpcResponse.error(e);
            log.error("[Server] execute meet ex for request", rpcRequest, e);
        }

        // Build the resulting value
        returnrpcResponse; }}Copy the code

Same as before, but handleRpcRequest is a little more complicated.

You need to call the corresponding method based on the launch.

pojo

The output and input parameters used are as follows:

RpcRequest

package com.github.houbb.rpc.common.rpc.domain;

import java.util.List;

/** * serialization related processing * (1) call creation time -createTime * (2) callType * (3) timeOut timeOut ** additional information: * (1) context information **@author binbin.hou
 * @since0.0.6 * /
public interface RpcRequest extends BaseRpc {

    /** * create time *@returnCreation time *@since0.0.6 * /
    long createTime(a);

    /** * Service unique identifier *@returnUnique service identifier *@since0.0.6 * /
    String serviceId(a);

    /** * method name *@returnMethod name *@since0.0.6 * /
    String methodName(a);

    /** * Method type name list *@returnList of names@since0.0.6 * /
    List<String> paramTypeNames(a);

    // Call parameter information list

    /** * Call parameter value *@returnParameter value array *@since0.0.6 * /
    Object[] paramValues();

}
Copy the code

RpcResponse

package com.github.houbb.rpc.common.rpc.domain;

/** * serialization related processing *@author binbin.hou
 * @since0.0.6 * /
public interface RpcResponse extends BaseRpc {

    /** * Exception message *@returnException information@since0.0.6 * /
    Throwable error(a);

    /** * request result *@returnRequest result@since0.0.6 * /
    Object result(a);

}
Copy the code

BaseRpc

package com.github.houbb.rpc.common.rpc.domain;

import java.io.Serializable;

/** * serialization related processing *@author binbin.hou
 * @since0.0.6 * /
public interface BaseRpc extends Serializable {

    /** * Get unique identifier * (1) This parameter uniquely identifies a call to obtain the corresponding response information of the call. *@returnUnique identifier */
    String seqId(a);

    /** * Set the unique identifier *@paramTraceId Unique identifier *@return this
     */
    BaseRpc seqId(final String traceId);

}
Copy the code

ServiceFactory- ServiceFactory

To facilitate the uniform management of all service classes, a Service factory class is defined here.

ServiceFactory

package com.github.houbb.rpc.server.service;

import com.github.houbb.rpc.server.config.service.ServiceConfig;
import com.github.houbb.rpc.server.registry.ServiceRegistry;

import java.util.List;

** * (1) Expose as few methods as possible. * (2) For external calls, such as Telnet governance, which service list can be used? * What are the method names for a single service? * * and so on the basic information of the inquiry, this period is temporarily all hidden. * * (3) As little exposure as possible in the early stage. *@author binbin.hou
 * @since 0.0.6
 * @seeServiceRegistry puts service information in this class for unified management. *@seeServiceMethod Method information */
public interface ServiceFactory {

    /** * Registration service list information *@paramServiceConfigList Service configuration list *@return this
     * @since0.0.6 * /
    ServiceFactory registerServices(final List<ServiceConfig> serviceConfigList);

    (1) For method reflection here, to improve performance, all class.getFullName() is concatenated and put into the key. * *@paramServiceId Indicates the service name *@paramMethodName methodName *@paramParamTypeNames Parameter type name list *@paramParamValues Parameter value *@returnThe method call returns the value *@since0.0.6 * /
    Object invoke(final String serviceId, final String methodName,
                  List<String> paramTypeNames, final Object[] paramValues);

}
Copy the code

DefaultServiceFactory

The default implementation is as follows:

package com.github.houbb.rpc.server.service.impl;

import com.github.houbb.heaven.constant.PunctuationConst;
import com.github.houbb.heaven.util.common.ArgUtil;
import com.github.houbb.heaven.util.lang.reflect.ReflectMethodUtil;
import com.github.houbb.heaven.util.util.CollectionUtil;
import com.github.houbb.rpc.common.exception.RpcRuntimeException;
import com.github.houbb.rpc.server.config.service.ServiceConfig;
import com.github.houbb.rpc.server.service.ServiceFactory;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/** * Default service repository implementation *@author binbin.hou
 * @since0.0.6 * /
public class DefaultServiceFactory implements ServiceFactory {

    /** * service map *@since0.0.6 * /
    private Map<String, Object> serviceMap;

    / * * * * (1) direct access to the corresponding method information key: serviceId: methodName: param1 @ param2 @ param3 * (2) the value: the corresponding method of information * /
    private Map<String, Method> methodMap;

    private static final DefaultServiceFactory INSTANCE = new DefaultServiceFactory();

    private DefaultServiceFactory(a){}

    public static DefaultServiceFactory getInstance(a) {
        return INSTANCE;
    }

    /** * Service registration is normally processed at project startup. * are heavy operations, and a service should be initialized only once. * Lock here to ensure thread safety. *@paramServiceConfigList Service configuration list *@return this
     */
    @Override
    public synchronized ServiceFactory registerServices(List<ServiceConfig> serviceConfigList) {
        ArgUtil.notEmpty(serviceConfigList, "serviceConfigList");

        // Set initialization
        serviceMap = new HashMap<>(serviceConfigList.size());
        // This is just an estimate, usually for 2 services.
        methodMap = new HashMap<>(serviceConfigList.size()*2);

        for(ServiceConfig serviceConfig : serviceConfigList) {
            serviceMap.put(serviceConfig.id(), serviceConfig.reference());
        }

        // Store method name
        for(Map.Entry<String, Object> entry : serviceMap.entrySet()) {
            String serviceId = entry.getKey();
            Object reference = entry.getValue();

            // Get a list of all methods
            Method[] methods = reference.getClass().getMethods();
            for(Method method : methods) {
                String methodName = method.getName();
                if(ReflectMethodUtil.isIgnoreMethod(methodName)) {
                    continue; } List<String> paramTypeNames = ReflectMethodUtil.getParamTypeNames(method); String key = buildMethodKey(serviceId, methodName, paramTypeNames); methodMap.put(key, method); }}return this;
    }


    @Override
    public Object invoke(String serviceId, String methodName, List<String> paramTypeNames, Object[] paramValues) {
        // Check parameters
        ArgUtil.notEmpty(serviceId, "serviceId");
        ArgUtil.notEmpty(methodName, "methodName");

        // Provide a cache to quickly locate the corresponding method based on the first three values
        // Reflect according to method.
        // String concatenation for paramTypes.
        final Object reference = serviceMap.get(serviceId);
        final String methodKey = buildMethodKey(serviceId, methodName, paramTypeNames);
        final Method method = methodMap.get(methodKey);

        try {
            return method.invoke(reference, paramValues);
        } catch (IllegalAccessException | InvocationTargetException e) {
            throw newRpcRuntimeException(e); }}/** * (1) Multiple parameters are used: separate * (2) Parameters are separated by @@paramServiceId indicates the serviceId *@paramMethodName methodName *@paramParamTypeNames Parameter type name *@returnBuild the complete key *@since0.0.6 * /
    private String buildMethodKey(String serviceId, String methodName, List<String> paramTypeNames) {
        String param = CollectionUtil.join(paramTypeNames, PunctuationConst.AT);
        returnserviceId+PunctuationConst.COLON+methodName+PunctuationConst.COLON +param; }}Copy the code

ServiceRegistry- ServiceRegistry class

interface

package com.github.houbb.rpc.server.registry;

/** * Service registration class * (1) unique for each application * (2) The exposure protocol for each service should be consistent * no special treatment for individual services is provided for the time being, and ** can be added later@author binbin.hou
 * @since0.0.6 * /
public interface ServiceRegistry {

    /** * Exposed RPC service port information *@paramPort Port information *@return this
     * @since0.0.6 * /
    ServiceRegistry port(final int port);

    /** * Register service implementation *@paramServiceId indicates the serviceId *@paramServiceImpl Service implementation *@return this
     * @since0.0.6 * /
    ServiceRegistry register(final String serviceId, final Object serviceImpl);

    /** * Expose all service information * (1) Start the server *@return this
     * @since0.0.6 * /
    ServiceRegistry expose(a);

}
Copy the code

implementation

package com.github.houbb.rpc.server.registry.impl;

import com.github.houbb.heaven.util.common.ArgUtil;
import com.github.houbb.rpc.common.config.protocol.ProtocolConfig;
import com.github.houbb.rpc.server.config.service.DefaultServiceConfig;
import com.github.houbb.rpc.server.config.service.ServiceConfig;
import com.github.houbb.rpc.server.core.RpcServer;
import com.github.houbb.rpc.server.registry.ServiceRegistry;
import com.github.houbb.rpc.server.service.impl.DefaultServiceFactory;

import java.util.ArrayList;
import java.util.List;

/** * Default server registration class *@author binbin.hou
 * @since0.0.6 * /
public class DefaultServiceRegistry implements ServiceRegistry {

    /** * Singleton information *@since0.0.6 * /
    private static final DefaultServiceRegistry INSTANCE = new DefaultServiceRegistry();

    /** * RPC server port number *@since0.0.6 * /
    private int rpcPort;

    /** * protocol configuration * (1) by default only TCP * (2) later can be extended to implement web-service/ HTTP/HTTPS, etc. *@since0.0.6 * /
    private ProtocolConfig protocolConfig;

    /** * Service configuration list *@since0.0.6 * /
    private List<ServiceConfig> serviceConfigList;

    private DefaultServiceRegistry(a){
        // Initialize the default parameters
        this.serviceConfigList = new ArrayList<>();
        this.rpcPort = 9527;
    }

    public static DefaultServiceRegistry getInstance(a) {
        return INSTANCE;
    }

    @Override
    public ServiceRegistry port(int port) {
        ArgUtil.positive(port, "port");

        this.rpcPort = port;
        return this;
    }

    /** * Register service implementation * (1) mainly used for late service invocation * (2) how to get implementation by ID? Very simple, id is unique. * If yes, it does. If no, it throws an exception and returns directly. * (3) If according to {@linkCom.github.houbb.rpc.com mon. RPC. Domain. RpcRequest} to obtain the corresponding method. * * 3.1 Obtain a unique implementation based on serviceId * 3.2 Obtain a unique implementation based on {@linkClass#getMethod(String, Class[])} Class#getMethod(String, Class[])}@linkjava.lang.reflect.Method#invoke(Object, Object...) } Execute method * *@paramServiceId indicates the serviceId *@paramServiceImpl Service implementation *@return this
     * @since0.0.6 * /
    @Override
    @SuppressWarnings("unchecked")
    public synchronized DefaultServiceRegistry register(final String serviceId, final Object serviceImpl) {
        ArgUtil.notEmpty(serviceId, "serviceId");
        ArgUtil.notNull(serviceImpl, "serviceImpl");

        // Build the corresponding other information
        ServiceConfig serviceConfig = new DefaultServiceConfig();
        serviceConfig.id(serviceId).reference(serviceImpl);
        serviceConfigList.add(serviceConfig);

        return this;
    }

    @Override
    public ServiceRegistry expose(a) {
        // Register all service information
        DefaultServiceFactory.getInstance()
                .registerServices(serviceConfigList);

        // Expose netty Server information
        new RpcServer(rpcPort).start();
        return this; }}Copy the code

ServiceConfig is the configuration information of some services. The interface is defined as follows:

package com.github.houbb.rpc.server.config.service;

/** * A single service configuration class ** simplifies user usage: * This class should be invisible while the user is using it. * Provide the corresponding service registration class directly. * * Expanded * (1) Version information * (2) Server timeout * *@author binbin.hou
 * @since 0.0.6
 * @param<T> implements class generics */
public interface ServiceConfig<T> {

    /** * gets the unique identifier *@returnGets the unique identifier *@since0.0.6 * /
    String id(a);

    /** * Sets the unique identifier *@paramId Indicates the id *@return this
     * @since0.0.6 * /
    ServiceConfig<T> id(String id);

    /** * get the reference entity implementation *@returnEntity implementation@since0.0.6 * /
    T reference(a);

    /** * sets the reference entity implementation *@paramReference implementation *@return this
     * @since0.0.6 * /
    ServiceConfig<T> reference(T reference);

}
Copy the code

test

Maven is introduced into

Maven package for importing server:

<dependency>
    <groupId>com.github.houbb</groupId>
    <artifactId>rpc-server</artifactId>
    <version>0.0.6</version>
</dependency>
Copy the code

Server startup

// Start the service
DefaultServiceRegistry.getInstance()
        .register(ServiceIdConst.CALC, new CalculatorServiceImpl())
        .expose();
Copy the code

A computing service is registered and the corresponding implementation is set up.

Similar to the previous implementation, I won’t repeat it here.

Startup log:

[the DEBUG] [the 2021-10-05 13:39:42. 638] [the main] [C.G.H.L.I.C.L ogFactory. SetImplementation] - Logging the initialized using 'class Com. Making. Houbb. Log. Integration. Adaptors. Stdout. StdOutExImpl 'adapter. [INFO] [the 2021-10-05 13:39:42. 645] [Thread - 0] [C.G.H.R.S.C.R pcServer. Run] xml-rpc service to start the service side On October 5, 2021 io.net ty 1:39:43 afternoon. Handler. Logging. LoggingHandler channelRegistered information: [id: 0 xec4dc74f] REGISTERED on October 5, 2021 io.net ty 1:39:43 afternoon. Handler. Logging. LoggingHandler bind information: [id: 0 xec4dc74f] bind: 0.0.0.0/0.0.0.0:05, 9527 October 2021 io.net ty 1:39:43 afternoon. Handler. Logging. LoggingHandler channelActive information: [id: 0xec4dc74f, L: / 0:0:0:0:0:0: ACTIVE 0:0:9 527] [INFO] [the 2021-10-05 13:39:43. 893] [Thread - 0] [C.G.H.R.S.C.R pcServer. Run] xml-rpc server startup is complete, Listen on port [9527]Copy the code

Ps: I forgot to add the corresponding register log, so I can add the corresponding registerListener extension.

summary

In order to facilitate learning, the above source code has been open source:

github.com/houbb/rpc

I hope this article is helpful to you. If you like it, please click to collect and forward a wave.

I am old ma, looking forward to meeting with you next time.