1. Introduction to basic RPC framework

In distributed computing, Remote Procedure Calls (RPC for short) allow a program running on one computer to Call a program on another address space computer just as if it were calling a local program, without additional programming for proxy object builds, network protocols, and so on involved in this interaction.

The general RPC architecture has at least three structures, namely registry, service provider, and service consumer. As shown in Figure 1.1, the registry provides registration services and notification of changes in registration information, the service provider runs on the server to provide the service, and the service consumer uses the service provider’s services.

Service provider (RPC Server), running on the Server side, provides service interface definition and service implementation classes, and exposes service interfaces externally. Registry, running on the server side, is responsible for recording service objects of service providers and providing remote service information query services and change notification services. A service consumer (RPC Client) that runs on the Client side and invokes remote services through a remote proxy object.

1.1 RPC Call Process

The following figure describes the call process of RPC. Interface Description Language (IDL) is the Interface Description Language, enabling programs running on different platforms and programs written in different languages to communicate with each other.

1) The client invokes the client pile module. The call is a local procedure call, where parameters are pushed onto the stack in the normal way.

2) The client stub module packages the parameters into the message and makes the system call to send the message. The packaging parameter is called marshalling.

3) The local operating system of the client sends messages from the client computer to the server computer.

4) The local operating system on the server computer passes the incoming data packets to the server peg module.

5) The server peg module unpacks the parameters from the message. The unpacking parameter is called unmarshalling.

6) Finally, the server peg module executes the server program flow. A reply is the same step in the opposite direction.

Tars Java client design introduction

The overall design of Tars Java client is basically consistent with the mainstream RPC framework. Let’s start with the Tars Java client initialization process.

2.1 Tars Java Client Initialization Process

Figure 2.1 illustrates the Tars Java initialization process.

1) Create a CommunicatorConfig configuration item, named CommunicatorConfig, and set parameters such as locator, moduleName, and connections as required.

2) through the CommunicatorConfig configuration items, named config, then call CommunicatorFactory. GetInstance () getCommunicator (config), create a Communicator object, It’s called Communicator.

3) suppose objectName = “MESSAGE. The ControlCenter. The Dispatcher”, need to generate proxy interface for Dispatcher. Class, Call communicator. StringToProxy (objectName, Dispatcher. Class) method to generate a proxy object implementation class.

4) In the stringToProxy() method, first initialize the QueryHelper proxy object, call the getServerNodes() method to get the list of remote service objects, and set the return value to the objectName field in communicatorConfig. For code analysis of specific proxy objects, see “2.3 Proxy Generation” below.

5) Check whether the LoadBalance parameter is set before calling stringToProxy. If not, generate the default DefaultLoadBalance object using the RR rotation algorithm.

6) Create a TarsProtocolInvoker call object, where the procedure is to get a list of urls by parsing objectName and simpleObjectName in communicatorConfig, one of which corresponds to a remote service object. TarsProtocolInvoker initializes ServantClient objects corresponding to each URL, one of which confirms how many ServantClient objects are generated based on communicatorConfig’s Connections configuration item. Then initialize TarsInvoker objects with ServantClients and set the collection of TarsInvoker objects to the allInvokers member variable of TarsProtocolInvoker. Each of these urls corresponds to a TarsInvoker object. The above analysis shows that a remote service node corresponds to a TarsInvoker object, and a TarsInvoker object contains connections ServantClient objects. For TCP, a ServantClient object corresponds to a TCP connection.

7) using the API, objName servantProxyConfig, loadBalance, protocolInvoker, This.com municator generates an ObjectProxy object that implements the JDK proxy interface InvocationHandler.

8) when the ObjectProxy object is generated and initialized, the loadbalancer.refresh () method will be executed to refresh the remote service node to the loadBalancer for routing by subsequent tars remote calls.

9) and registration statistics report, which is reported methods using JDK ScheduledThreadPoolExecutor report for regular training in rotation.

10) Register the service list refresher and adopt the same technical method as the statistical information reporting device mentioned above.

2.2 Usage Examples

The following code is a simplified example. The default values are used in CommunicatorConfig. After communicator generates a remote service proxy object, specify the name, IP address, and port of the remote service object.

CommunicatorConfig CFG = new CommunicatorConfig(); // Generate a Communicator object from the CommunicatorConfig configuration above. Communicator communicator = CommunicatorFactory.getInstance().getCommunicator(cfg); // Generate a remote service proxy object by specifying the Tars remote service object name, IP address, and port.

// Initialize the basic Tars configuration
    CommunicatorConfig cfg = new CommunicatorConfig();
    // Generate a Communicator object from the CommunicatorConfig configuration above.
    Communicator communicator = CommunicatorFactory.getInstance().getCommunicator(cfg);
    // Generate a remote service proxy object by specifying the Tars remote service object name, IP address, and port.
    HelloPrx proxy = communicator.stringToProxy(HelloPrx.class, "TestApp. HelloServer. HelloObj @ TCP 127.0.0.1 - p - 18601 - h t 60000");
    // call synchronously, blocking until the remote service object's method returns a result
    String ret = proxy.hello(3000."Hello World");
    System.out.println(ret);
    // Call asynchronously, regardless of the final condition of the asynchronous call
    proxy.async_hello(null.3000."Hello World");
      // Async call, which registers a callback object that implements the TarsAbstractCallback interface.
    proxy.async_hello(new HelloPrxCallback() {
        @Override
        public void callback_expired(a) { // Timeout event processing
        }
        @Override
        public void callback_exception(Throwable ex) { // Exception event handling
        }
        @Override
        public void callback_hello(String ret) { // Call the success event handler
            Main.logger.info("invoke async method successfully {}", ret); }},1000."Hello World");
Copy the code

In the above example, two common invocation methods are demonstrated, namely synchronous invocation and asynchronous invocation. For asynchronous calls, if the caller wants to capture the end result of an asynchronous call, it can register an implementation class that implements the TarsAbstractCallback interface to handle exceptions, timeouts, and success events of tars calls.

2.3 Proxy Generation

Tars Java’s client stub module uses the JDK native Proxy method for remote Proxy objects. As shown in the source code below, ObjectProxy realizes the Java. Lang. Reflect. InvocationHandler interface method, the interface is the agent JDK interface.

The proxy implementation

public final class ObjectProxy<T> implements ServantProxy.InvocationHandler {
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class<? >[] parameterTypes = method.getParameterTypes(); InvokeContext context =this.protocolInvoker.createContext(proxy, method, args);
        try {
            if ("toString".equals(methodName) && parameterTypes.length == 0) {
                return this.toString();
            } else if
                //***** omit code *****
            } else {
                // Select a remote call class in the load balancer, encapsulate the application layer protocol, and finally call the TCP transport layer for sending.
                Invoker invoker = this.loadBalancer.select(context);
                returninvoker.invoke(context); }}catch (Throwable var8) {
            // ***** omit code *****}}}Copy the code

Of course, the generation of the above remote service proxy class involves the auxiliary class. Tars Java uses ServantProxyFactory to generate the above ObjectProxy and store ObjectProxy objects into the Map structure, which is convenient for the callers to directly reuse the existing remote service proxy objects for secondary use.

Specific related logic as shown in the source code, ObjectProxyFactory is the auxiliary factory class to generate ObjectProxy, and ServantProxyFactory is different, it does not cache generated proxy objects.

class ServantProxyFactory {
    private final ConcurrentHashMap<String, Object> cache = new ConcurrentHashMap();
    // ***** omit code *****
    public <T> Object getServantProxy(Class<T> clazz, String objName, ServantProxyConfig servantProxyConfig, LoadBalance loadBalance, ProtocolInvoker<T> protocolInvoker) {
        Object proxy = this.cache.get(objName);
        if (proxy == null) {
            this.lock.lock(); // Lock to ensure that only one remote service proxy object is generated.
            try {
                proxy = this.cache.get(objName);
                if (proxy == null) {
                    / / create the Java JDK. Lang. Reflect the InvocationHandler interface object
                    ObjectProxy<T> objectProxy = this.communicator.getObjectProxyFactory().getObjectProxy(clazz, objName, servantProxyConfig, loadBalance, protocolInvoker);
                    // Use JDK java.lang.reflect.proxy to generate the actual Proxy object
                    this.cache.putIfAbsent(objName, this.createProxy(clazz, objectProxy));
                    proxy = this.cache.get(objName); }}finally {
                this.lock.unlock(); }}return proxy;
    }
    /** Use the JDK proxy. newProxyInstance to generate Proxy objects */
    private <T> Object createProxy(Class<T> clazz, ObjectProxy<T> objectProxy) {
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{clazz, ServantProxy.class}, objectProxy);
    }
    // ***** omit code *****
}
Copy the code

CreateProxy uses the JDK proxy.newProxyinstance method to generate a remote service Proxy object.

2.4 Remote service addressing methods

As an RPC remote framework, in distributed systems, calling remote services involves the problem of how to route, that is, how to select a service node from multiple remote service nodes to call. Of course, Tars Java supports calling remote services by directly connecting to specific nodes, as described in 2.2 Using examples above.

As shown in the figure below, an invocation from ClientA at some point uses the Service3 node for a remote service invocation, while an invocation from ClientB at some point uses the Service2 node. Tars Java implementation class offers a variety of load balancing algorithm, including the RR training in rotation algorithm RoundRobinLoadBalance, consistency of hash algorithm HashLoadBalance ConsistentHashLoadBalance and common hash algorithm.

As shown in the following source code, if you want to customize the load balancer to define the remote call routing rules, so you need to implement com.qq.tars.rpc.com mon. LoadBalance interface, including LoadBalance. Select () method is responsible for routing in accordance with the rules, Select the corresponding Invoker object, and then make a remote call, the specific logic of the source agent implementation. Because the remote service node may change, for example, when the remote service node is connected to or offline, the routing information of the local load balancer needs to be refreshed. The logic for updating this information is implemented in the loadbalance-refresh () method.

Load balancing interface

public interface LoadBalance<T> {
    /** Select invoker */ based on the load balancing policy
    Invoker<T> select(InvokeContext invokeContext) throws NoInvokerException;
    /** Notifies invoker list of updates */
    void refresh(Collection<Invoker<T>> invokers);
}
Copy the code

2.5 Network Model

Tars Java IO mode uses the JDK’s NIO Selector mode. Network processing is described in terms of TCP. As shown in the following source code, Reactor is a thread whose run() method calls selector. Select (), which means that the thread will block until an event is generated in the network.

If a network event occurs at this point, the thread will wake up and execute subsequent code, one of which is dispatcheEvent(key), which is the dispatch of the event.

Which according to the corresponding condition, call acceptor handleConnectEvent (key) method to handle the client connection is successful, or acceptor. HandleAcceptEvent (key) method to handle the server accepts the connection is successful, Or call acceptor. HandleReadEvent (key) method to read data from the Socket, or acceptor. HandleWriteEvent (key) method to write data to the Socket.

Reactor Event Processing

public final class Reactor extends Thread {
    protected volatile Selector selector = null;
    private Acceptor acceptor = null;
    //***** omit code *****
    public void run(a) {
        try {
            while(! Thread.interrupted()) {// Block until a network event occurs.
                selector.select();
                //***** omit code *****
                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    iter.remove();
                    if(! key.isValid())continue;
                    try {
                        //***** omit code *****
                        // Sends transport layer protocol TCP or UDP network events
                        dispatchEvent(key);
                //***** omit code *****}}//***** omit code *****
    }
        //***** omit code *****
    private void dispatchEvent(final SelectionKey key) throws IOException {
        if (key.isConnectable()) {
            acceptor.handleConnectEvent(key);
        } else if (key.isAcceptable()) {
            acceptor.handleAcceptEvent(key);
        } else if (key.isReadable()) {
            acceptor.handleReadEvent(key);
        } else if(key.isValid() && key.isWritable()) { acceptor.handleWriteEvent(key); }}}Copy the code

Network processing adopts Reactor event-driven mode, Tars defines a Reactor object corresponding to a Selector object, and creates two default reactors for each remote service (the whole service cluster, not a single node program) for processing. By modifying the JVM startup com.qq.tars.net.client.selectorPoolSize parameter values to determine a remote service specific objects created several Reactor.

The thread pools in the figure above that handle the Read Event implementation and Write Event are configured at Communicator initialization. The specific logic is shown in the source code, in which the thread pool parameter configuration is determined by the corePoolSize, maxPoolSize, keepAliveTime and other parameters of CommunicatorConfig.

The read-write event thread pool is initialized

private void initCommunicator(CommunicatorConfig config) throws CommunicatorConfigException {
    //***** omit code *****
    this.threadPoolExecutor = ClientPoolManager.getClientThreadPoolExecutor(config);
    //***** omit code *****
}
​
public class ClientPoolManager {
    public static ThreadPoolExecutor getClientThreadPoolExecutor(CommunicatorConfig communicatorConfig) {
        //***** omit code *****
        clientThreadPoolMap.put(communicatorConfig, createThreadPool(communicatorConfig));
        //***** omit code *****
        return clientPoolExecutor;
    }    
     
    private static ThreadPoolExecutor createThreadPool(CommunicatorConfig communicatorConfig) {
        int corePoolSize = communicatorConfig.getCorePoolSize();
        int maxPoolSize = communicatorConfig.getMaxPoolSize();
        int keepAliveTime = communicatorConfig.getKeepAliveTime();
        int queueSize = communicatorConfig.getQueueSize();
        TaskQueue taskqueue = new TaskQueue(queueSize);
​
        String namePrefix = "tars-client-executor-";
        TaskThreadPoolExecutor executor = new TaskThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, taskqueue, new TaskThreadFactory(namePrefix));
        taskqueue.setParent(executor);
        returnexecutor; }}Copy the code

2.6 Remote Invocation interaction model

Calling a method of the proxy class brings you to the Invoke method in ObjectProxy that implements the InvocationHandler interface.

The following diagram depicts the flow of a remote service invocation. I’m going to focus on a couple of things here. One is how to write data to network IO. The second is how Tars Java makes synchronous or asynchronous calls, and what underlying technologies are used.

2.6.1 I/O Writing Process

As shown in the figure, the ServantClient invokes the underlying network write operation. In the invokeWithSync method, the ServantClient gets its own member variable TCPSession and calls the tcpsession.write () method. As shown in the figure (the underlying code writing IO process) and the following source code (initialization of the thread pool of read and write events), obtain Encode to Encode the request contents into IoBuffer objects, and finally put the java.nio.bytebuffer contents into the queue member variable of TCPSession. Then call key.selector().wakeup() to wakeup selector. Select () in the run() Reactor to perform subsequent writes.

The ByteBuffer object is cycled from tcpsession. queue if the Reactor checks that it can write IO (key.iswritable ()) is true, then the ByteBuffer object is cycled from tcpsession. queue. Socketchannel.write (byteBuffer) is called to perform the actual write network Socket operation. The code logic is described in the doWrite() method in the source code.

The read-write event thread pool is initialized

public class TCPSession extends Session {
    public void write(Request request) throws IOException {
        try {
            IoBuffer buffer = selectorManager.getProtocolFactory().getEncoder().encodeRequest(request, this);
            write(buffer);
        //***** omit code *****
    }
    protected void write(IoBuffer buffer) throws IOException {
        //***** omit code *****
        if (!this.queue.offer(buffer.buf())) {
            throw new IOException("The session queue is full. [ queue size:" + queue.size() + "]");
        }
        if(key ! =null) { key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); key.selector().wakeup(); }}protected synchronized int doWrite(a) throws IOException {
        int writeBytes = 0;
        while (true) {
            ByteBuffer wBuf = queue.peek();
            //***** omit code *****
            int bytesWritten = ((SocketChannel) channel).write(wBuf);
            //***** omit code *****
        returnwriteBytes; }}Copy the code

2.6.2 Low-level technical implementation of synchronous and asynchronous invocation

For synchronous method invocation, the ServantClient invokes the underlying network write operation, creating a Ticket object in the invokeWithSync method, which as its name implies means Ticket, as shown in the figure (remote invocation flow) and source code (synchronous invocation of ServantClient). This ticket uniquely identifies the network call.

Synchronous invocation of ServantClient

public class ServantClient {
    public <T extends ServantResponse> T invokeWithSync(ServantRequest request) throws IOException {
            //***** omit code *****
            ticket = TicketManager.createTicket(request, session, this.syncTimeout);
            Session current = session;
            current.write(request);
            if(! ticket.await(this.syncTimeout, TimeUnit.MILLISECONDS)) {
            //***** omit code *****
            response = ticket.response();
            //***** omit code *****
            return response;
            //***** omit code *****
        returnresponse; }}Copy the code

As shown in the code, following the session.write() operation, the ticket.await() method is executed. The thread of the method waits until the remote service replies and returns the result to the client. Eventually the invokeWithSync method returns a response object. The Ticket waiting for wake up function in Java. The internal util. Concurrent. CountDownLatch.

For asynchronous method calls, will perform ServantClient invokeWithAsync method, also can create a Ticket, and execute the Session, the write () operation, although not call Ticket. Await (), but in the Reactor to receive remote reply, First, the Tars protocol header will be parsed to obtain the Response object, and then the Response object will be put into the IO read-write thread pool as shown in figure (Network event processing model of Tars-Java) for further processing, as shown in the following source code (asynchronous callback event processing). The workthread.run () method is called and ticket.NotifyResponse (resp) is executed in the run() method, which executes a callback similar to the TarsAbstractCallback interface in code 2.1 above.

Asynchronous callback event handling

public final class WorkThread implements Runnable {
    public void run(a) {
        try {
            //***** omit code *****
                Ticket<Response> ticket = TicketManager.getTicket(resp.getTicketNumber());
            //***** omit code *****
                ticket.notifyResponse(resp);
                ticket.countDown();
                TicketManager.removeTicket(ticket.getTicketNumber());
            }
            //***** omit code *****}}Copy the code

As shown in the source code below, TicketManager has a timed task rotation to check whether all calls have timed out. If (currentTime -t.startTime) > t.timout is true, then t.expired() is called to tell the callback object that the call has timed out.

Invoke timeout event handler

public class TicketManager {
            //***** omit code *****
    static {
        executor.scheduleAtFixedRate(new Runnable() {
            long currentTime = -1;
            public void run(a) { Collection<Ticket<? >> values = tickets.values(); currentTime = System.currentTimeMillis();for(Ticket<? > t : values) {if((currentTime - t.startTime) > t.timeout) { removeTicket(t.getTicketNumber()); t.expired(); }}}},500.500, TimeUnit.MILLISECONDS); }}Copy the code

Third, summary

Code call is generally recursive call, code call depth and breadth are very large, through the debugging code step by step to learn the source code, it is easier to understand the meaning and design concept of the source code.

There is no essential difference between Tars and other RPC frameworks. The design concept of Tars Java can be better understood by comparing the design concept of other frameworks.

Iv. References

1.Remote procedure call

2.Tars Java source Github repository

3. Introduction and principle of RPC framework

Author: Ke Shengkai, Vivo Internet Server Team