This article uses HelloService as an example to analyze thrFIT’s request processing flow.

Server startup

The server of HelloService is started in HelloServer, which is our custom class with only one main method:

public static void main(String[] args) {
    try {
        // Create the handler, which is the class that will ultimately process the request
        HelloService.Processor processor = new HelloService.Processor<>(new HelloServiceImpl());
        // Configure the transport type
        TServerTransport transport = new TServerSocket(SERVER_PORT);
        // Configure the server
        TServer server = new TSimpleServer(new TServer.Args(transport).processor(processor));
        System.out.println("Starting the simple server...");
        // Provide external services
        server.serve();
    } catch(Exception e) { e.printStackTrace(); }}Copy the code

The key parts of this method are explained in detail in the comments, so let’s walk through these steps step by step.

Create handler

The code to create the processor is

HelloService.Processor processor = new HelloService.Processor<>(new HelloServiceImpl())
Copy the code

We go to the helloService.processor #Processor(I) method:

public Processor(I iface) {
    super(iface, getProcessMap(new java.util.HashMap<java.lang.String, 
        org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>()));
}
Copy the code

The parameters to the Processor(I) method are iFace, the object of HelloServiceImpl, which we implemented ourselves:

public class HelloServiceImpl implements HelloService.Iface {
    @Override
    public String hello(String text) throws TException {
        return "hello, " + text + "!"; }}Copy the code

getProcessMap(...)methods

In the Processor(I) method, super(…) is called. Method, notice that super(…) GetProcessMap (…) is called as an argument to the method. Helloservice.processor #getProcessMap method:

private static<I extends Iface> java.util.Map<... > getProcessMap(java.util.Map<... > processMap) { processMap.put("hello".new hello());
    return processMap;
}
Copy the code

processMap.put(“hello”, new hello()); “Hello” is the name of the method in HelloService. If there are multiple methods in HelloService, processMap will put multiple objects.

If multiple methods have the same name, then the first object is overwritten by the second object. (Thrift does not support method overloading!)

So what is new Hello ()? We go in:

public static class hello<I extends Iface> 
        extends org.apache.thrift.ProcessFunction<I.hello_args> {
    public hello(a) {
        super("hello"); }... }Copy the code

Hello inherits ProcessFunction and continues to the parent ProcessFunction#ProcessFunction:

public abstract class ProcessFunction<I.T extends TBase> {
  private final String methodName;

  public ProcessFunction(String methodName) {
    this.methodName = methodName; }... }Copy the code

This gives us a sense that it wraps hello’s method name as a ProcessFunction object.

TBaseProcessor#TBaseProcessor

Let’s go back to the helloService.processor# Processor(I) method:

public Processor(I iface) {
    super(iface, getProcessMap(new java.util.HashMap<java.lang.String, 
        org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>()));
}
Copy the code

Enter the super (…). TBaseProcessor#TBaseProcessor:

protected TBaseProcessor(I iface, Map
       
        > processFunctionMap)
       ,> {
    this.iface = iface;
    this.processMap = processFunctionMap;
}
Copy the code

Two things are stored in TBaseProcessor:

  • The implementation class of the service (provided by the developer) is hereHelloServiceImpl
  • Methods and method objects of the service (defined bythriftGenerated)

In code, this step is to wrap the self-implemented HelloServiceImpl as a thrift Processor.

new TServerSocket(SERVER_PORT)

Moving on, let’s analyze the configuration transport type and enter TServerSocket#TServerSocket(int) :

  public TServerSocket(ServerSocketTransportArgs args) throws TTransportException {
    clientTimeout_ = args.clientTimeout;
    if(args.serverSocket ! =null) {
      this.serverSocket_ = args.serverSocket;
      return;
    }
    try {
      / / create a ServerSocket
      serverSocket_ = new ServerSocket();
      // Address reuse, namely IP and port reuse
      serverSocket_.setReuseAddress(true);
      // Bind the IP address and port
      serverSocket_.bind(args.bindAddr, args.backlog);
    } catch (IOException ioe) {
      close();
      throw new TTransportException("Could not create ServerSocket on address " 
        + args.bindAddr.toString() + ".", ioe); }}Copy the code

This method is mainly used to enable the socket service, using ServerSocket, that is, blocking IO.

new TServer.Args(transport)

New tserver.args (transport) :

public abstract class TServer {

  public static class Args extends AbstractServerArgs<Args> {
    public Args(TServerTransport transport) {
      The AbstractServerArgs constructor is called
      super(transport); }}/** * The class that holds the parameters */
  public static abstract class AbstractServerArgs<T extends AbstractServerArgs<T>> {
    final TServerTransport serverTransport;
    TProcessorFactory processorFactory;
    TTransportFactory inputTransportFactory = new TTransportFactory();
    TTransportFactory outputTransportFactory = new TTransportFactory();
    TProtocolFactory inputProtocolFactory = new TBinaryProtocol.Factory();
    TProtocolFactory outputProtocolFactory = new TBinaryProtocol.Factory();

    // The method finally called
    public AbstractServerArgs(TServerTransport transport) { serverTransport = transport; }... }}Copy the code

This section mainly does some configuration, which is to save the ServerSocket object created earlier into an AbstractServerArgs object.

new TServer.Args(transport).processor(processor)

TServer. AbstractServerArgs# processor method content is as follows:

public T processor(TProcessor processor) {
    this.processorFactory = new TProcessorFactory(processor);
    return (T) this;
}
Copy the code

The object type returned is again tServer.args.

This step simply puts the processor into the TProcessorFactory, which looks like this:

public class TProcessorFactory {

  private final TProcessor processor_;

  public TProcessorFactory(TProcessor processor) {
    processor_ = processor;
  }

  public TProcessor getProcessor(TTransport trans) {
    return processor_;
  }

  public boolean isAsyncProcessor(a) {
      return processor_ instanceofTAsyncProcessor; }}Copy the code

There is only one processor_, and it is returned as-is in the getProcessor() method.

new TSimpleServer()

Moving on, let’s get to the TSimpleServer constructor:

  public TSimpleServer(AbstractServerArgs args) {
    super(args);
  }
Copy the code

TSimpleServer implements TServer. The TServer is constructed as follows:

protected TServer(AbstractServerArgs args) {
    processorFactory_ = args.processorFactory;
    serverTransport_ = args.serverTransport;
    inputTransportFactory_ = args.inputTransportFactory;
    outputTransportFactory_ = args.outputTransportFactory;
    inputProtocolFactory_ = args.inputProtocolFactory;
    outputProtocolFactory_ = args.outputProtocolFactory;
}
Copy the code

This step is to set the various properties for TSimpleServer, meaning that the properties in AbstractServerArgs are assigned to the properties of TServer. The property values in args are the default values set in tServer.args (transport) and provided by thrift.

server.serve()

Now comes the heavy lifting on the server side: providing external services by using TSimpleServer#serve:

public void serve(a) {
    try {
      // Start the listener, indicating that you can listen to the connection of the port
      serverTransport_.listen();
    } catch (TTransportException ttx) {
      LOGGER.error("Error occurred during listening.", ttx);
      return;
    }

    if(eventHandler_ ! =null) {
      // Run the eventHandler_.preserve () method
      eventHandler_.preServe();
    }

    setServing(true);
    // The loop keeps getting connections
    while(! stopped_) { TTransport client =null;
      TProcessor processor = null;
      TTransport inputTransport = null;
      TTransport outputTransport = null;
      TProtocol inputProtocol = null;
      TProtocol outputProtocol = null;
      ServerContext connectionContext = null;
      try {
        // Get the connection, this will block
        client = serverTransport_.accept();
        if(client ! =null) {
          processor = processorFactory_.getProcessor(client);
          inputTransport = inputTransportFactory_.getTransport(client);
          outputTransport = outputTransportFactory_.getTransport(client);
          inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
          outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
          if(eventHandler_ ! =null) {
            / / run eventHandler_. CreateContext (...). methods
            connectionContext = eventHandler_.createContext(inputProtocol, outputProtocol);
          }
          while (true) {
            if(eventHandler_ ! =null) {
              / / run eventHandler_. ProcessContext (...). methods
              eventHandler_.processContext(connectionContext, inputTransport, outputTransport);
            }
            The HelloServiceImpl method is executedprocessor.process(inputProtocol, outputProtocol); }}}catch (...) {
        ...
      }

      if(eventHandler_ ! =null) {
        / / run eventHandler_. DeleteContext (...). methods
        eventHandler_.deleteContext(connectionContext, inputProtocol, outputProtocol);
      }

      if(inputTransport ! =null) {
        inputTransport.close();
      }

      if(outputTransport ! =null) {
        outputTransport.close();
      }

    }
    setServing(false);
}
Copy the code

This is the whole process of server processing request, let’s analyze step by step.

Start service listening:serverTransport_.listen()

The TServerSocket#listen method reads as follows:

public void listen(a) throws TTransportException {
  // Make sure to block on accept
  if(serverSocket_ ! =null) {
    try {
      serverSocket_.setSoTimeout(0);
    } catch (SocketException sx) {
      LOGGER.error("Could not set socket timeout.", sx); }}}Copy the code

You can see that only one property is configured: soTimeout. Let’s go straight to the comment:

Enable/disable SO_TIMEOUT with the specified timeout period, in milliseconds. By setting this option to a non-zero timeout, the accept () call to this ServerSocket will block only for that amount of time. If the timeout expired will throw java.net.SocketTimeoutException, although ServerSocket is still valid. This option must be enabled before the block action can be performed. The timeout period must be greater than 0. Zero timeout is interpreted as infinite timeout.

Enable/disable {@link SocketOptions#SO_TIMEOUT SO_TIMEOUT} with the specified timeout, in milliseconds. With this option set to a non-zero timeout, a call to accept() for this ServerSocket will block for only this amount of time. If the timeout expires, a java.net.SocketTimeoutException is raised, though the ServerSocket is still valid. The option must be enabled prior to entering the blocking operation to have effect. The timeout must be {@code > 0}. A timeout of zero is interpreted as an infinite timeout.

This does not mean that this parameter is used to set the timeout, which is set to 0 for unlimited timeout.

runeventHandler_.xxx(...)methods

The eventHandler_ type is TServerEventHandler, which is defined as follows:

public interface TServerEventHandler {

  /** * Called before the server begins
  void preServe(a);

  /** * Called when a new client has connected and is about to be processed
  ServerContext createContext(TProtocol input, TProtocol output);

  /** * Called when a client has finished request-handling to delete server * context
  void deleteContext(ServerContext serverContext, TProtocol input, TProtocol output);

  /** * Called when a client is about to call the processor
  void processContext(ServerContext serverContext, TTransport inputTransport, TTransport outputTransport);

Copy the code

As you can see, this is an interface that defines several methods that are called during service processing. When we want to listen for some operation on the connection, we can implement this interface and add it to the TServerSocket, like this:

TServerTransport transport = new TServerSocket(port);
TServer server = new TSimpleServer(new TServer.Args(transport).processor(processor));
/ / set the ServerEventHandler
server.setServerEventHandler(new MyTServerEventHandler());
server.serve();
Copy the code

Get connections and processing

The core functions of server.serve() are as follows:

while(! stopped_) { ...try {
    // Get the connection, this will block
    client = serverTransport_.accept();
    if(client ! =null) { processor = processorFactory_.getProcessor(client); .while (true) {...The HelloServiceImpl method is executedprocessor.process(inputProtocol, outputProtocol); }}}catch (...) {
    ...
  }
Copy the code
  1. useTServerSocket#acceptGets the connection request, which is provided by the JDK
  2. useprocessorFactory_.getProcessor(client);Methods to obtainprocessor
  3. useprocessor.process(...)Implement specific methods

This first have an impression, after the analysis of the execution, and then use the way of debugging to specific analysis.

Client startup

The client startup class is HelloClient, which is independently implemented by us. The code is as follows:

public static void main(String[] args) {
    TTransport transport = null;
    try {
        // Open the connection
        transport = new TSocket("localhost", SERVER_PORT);
        transport.open();

        // Specify the transport protocol
        TProtocol protocol = new TBinaryProtocol(transport);
        // Create a client
        HelloService.Client client = new HelloService.Client(protocol);
        // Call the HelloService#hello method
        String result = client.hello("thrift world");
        System.out.println("result=" + result);
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        if(null! = transport) { transport.close(); }}}Copy the code

Open a connection

Open the connection as follows:

transport = new TSocket("localhost", SERVER_PORT);
transport.open();
Copy the code

Enter the TSocket constructor:

  public TSocket(TConfiguration config, String host, int port, int socketTimeout, 
        int connectTimeout) throws TTransportException {
    // Parameter assignment
    super(config);
    host_ = host;
    port_ = port;
    socketTimeout_ = socketTimeout;
    connectTimeout_ = connectTimeout;
    initSocket();
  }

  /** * Initializes the socket object */
  private void initSocket(a) {
    socket_ = new Socket();
    try {
      socket_.setSoLinger(false.0);
      socket_.setTcpNoDelay(true);
      socket_.setKeepAlive(true);
      socket_.setSoTimeout(socketTimeout_);
    } catch (SocketException sx) {
      LOGGER.error("Could not configure socket.", sx); }}Copy the code

This step just creates the TSocket object, and the TSocket constructor just does some assignment.

Let’s look at the TSocket#open method:

public void open(a) throws TTransportException {
    // Omit the judgment operation.try {
      / / the connection
      socket_.connect(new InetSocketAddress(host_, port_), connectTimeout_);
      inputStream_ = new BufferedInputStream(socket_.getInputStream());
      outputStream_ = new BufferedOutputStream(socket_.getOutputStream());
    } catch (IOException iox) {
      close();
      throw newTTransportException(TTransportException.NOT_OPEN, iox); }}Copy the code

The method to open the connection is java.net.Socket#connect(java.net.SocketAddress, int), using BIO.

Get a client

TProtocol protocol = new TBinaryProtocol(transport);
HelloService.Client client = new HelloService.Client(protocol);
Copy the code

This code creates a binary protocol object using new TBinaryProtocol(transport) and enters TBinaryProtocol#TBinaryProtocol(…). Methods:

  public TBinaryProtocol(TTransport trans, long stringLengthLimit, 
        long containerLengthLimit, boolean strictRead, boolean strictWrite) {
    super(trans);
    stringLengthLimit_ = stringLengthLimit;
    containerLengthLimit_ = containerLengthLimit;
    strictRead_ = strictRead;
    strictWrite_ = strictWrite;
  }
Copy the code

This method calls the parent constructor first, then a bunch of assignments, we go to the parent constructor TProtocol#TProtocol(…) In:

protected TProtocol(TTransport trans) {
  trans_ = trans;
}
Copy the code

As you can see, the whole creation process is just a bunch of assignments.

Let’s take a look at Client fetching, go to HelloService.client# Client(…) Methods:

public Client(org.apache.thrift.protocol.TProtocol prot)
{
    super(prot, prot);
}
Copy the code

Go ahead and enter TServiceClient:

public abstract class TServiceClient {
  public TServiceClient(TProtocol prot) {
    this(prot, prot);
  }

  public TServiceClient(TProtocol iprot, TProtocol oprot) { iprot_ = iprot; oprot_ = oprot; }... }Copy the code

As you can see, client creation is still assignment.

Perform operations

With that in place, you can then execute the method, namely:

String result = client.hello("thrift world");
Copy the code

This line of code ends up calling the HelloService#hello method on the server side, which is HelloServiceHandler#hello:

public class HelloServiceHandler implements HelloService.Iface {
    @Override
    public String hello(String text) throws TException {
        return "hello "+ text; }}Copy the code

How can a local method called on the client be invoked on a remote service? Now let’s look at the operation.

Execute the process

The client calls the server from HelloService.Client#hello, and we enter this method:

public java.lang.String hello(java.lang.String text) 
    throws org.apache.thrift.TException
{
    send_hello(text);
    return recv_hello();
}
Copy the code

This method consists of two lines of code. From the name of the code, you can roughly guess the meaning of these two lines:

  • send_hello(...)Send:hello()Method
  • recv_hello(...): receivehello()Method call result

Client sends request:send_hello(text)

Go to the send_hello(text) method:

public void send_hello(java.lang.String text) throws org.apache.thrift.TException
{
    hello_args args = new hello_args();
    args.setText(text);
    sendBase("hello", args);
}
Copy the code

The hello_args wrapper is the method’s argument. After setting the parameter, sendBase(…) is finally called. Methods:

TServiceClient#sendBase(String, TBase<? ,? >, byte)

  private void sendBase(String methodName, TBase<? ,? > args,byte type) throws TException {
    oprot_.writeMessageBegin(new TMessage(methodName, type, ++seqid_));
    args.write(oprot_);
    oprot_.writeMessageEnd();
    oprot_.getTransport().flush();
  }
Copy the code

The current object is an instance of TSocket, and outputStream_ in outputStream_.Flush () is the outputStream_ held by TSocket. After the flush operation is complete, data is sent to the server, including method names and parameter values.

Client receives response:recv_hello()

Let’s look at the process of receiving the data, which is the recv_hello() method:

HelloService.Client#recv_hello

public java.lang.String recv_hello(a) throws org.apache.thrift.TException
{
    hello_result result = new hello_result();
    // Continue processing
    receiveBase(result, "hello");
    if (result.isSetSuccess()) {
     return result.success;
    }
    throw new org.apache.thrift.TApplicationException(
        org.apache.thrift.TApplicationException.MISSING_RESULT, 
            "hello failed: unknown result");
}
Copy the code

In the above method, you create a Hello_result object that holds the execution result of the method, and then call receiveBase(…). Methods: TServiceClient# receiveBase

  protected void receiveBase(TBase
        result, String methodName) throws TException { TMessage msg = iprot_.readMessageBegin(); ./ / read operation
    result.read(iprot_);
    iprot_.readMessageEnd();
  }

Copy the code

This method mainly calls result.read(iprot_), continuing:

HelloService.hello_result.hello_resultStandardScheme#read

  public void read(org.apache.thrift.protocol.TProtocol iprot, hello_result struct) 
        throws org.apache.thrift.TException {
    org.apache.thrift.protocol.TField schemeField;
    iprot.readStructBegin();
    while (true)
    {
      schemeField = iprot.readFieldBegin();
      // Read the finished logo
      if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
        break;
      }
      switch (schemeField.id) {
        case 0: // SUCCESS
          if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
            // Return the result as String, read directly
            struct.success = iprot.readString();
            struct.setSuccessIsSet(true);
          } else { 
            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
          }
          break;
        default:
          org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
      }
      iprot.readFieldEnd();
    }
    iprot.readStructEnd();
    struct.validate();
  }
Copy the code

This step is to read the content of the returned result.

At this point, the client is done reading and writing.

Server processing:TSimpleServer#serve

Let’s take a look at how the server handles the request, using the TSimpleServer#serve method:

public void serve(a) {...while(! stopped_) { ...try {
        client = serverTransport_.accept();
        if(client ! =null) {...while (true) {
            if(eventHandler_ ! =null) {
              eventHandler_.processContext(connectionContext, 
                    inputTransport, outputTransport);
            }
            // This is where requests are processedprocessor.process(inputProtocol, outputProtocol); }}}catch(...). {... }... } setServing(false);
  }
Copy the code

Continue to enter the org. Apache. Thrift. TBaseProcessor# process:

  @Override
  public void process(TProtocol in, TProtocol out) throws TException {
    TMessage msg = in.readMessageBegin();
    / / get ProcessFunction
    ProcessFunction fn = processMap.get(msg.name);
    if (fn == null) {... }else {
      // Continue processingfn.process(msg.seqid, in, out, iface); }}Copy the code

Through debugging, the fn obtained can be seen as follows:

Continue to enter the org. Apache. Thrift. ProcessFunction# process method:

  public final void process(int seqid, TProtocol iprot, TProtocol oprot, I iface) 
      throws TException {
    T args = getEmptyArgsInstance();
    try {
      // Read parameters
      args.read(iprot);
    } catch (TProtocolException e) {
      ...
      return;
    }
    iprot.readMessageEnd();
    TSerializable result = null;
    byte msgType = TMessageType.REPLY;

    try {
      // Process the result
      result = getResult(iface, args);
    } catch (...) {
      ...
    }

    if(!isOneway()) {
      ...
    }
  }
Copy the code

This method mainly reads the execution parameters of the method, read the following:

Here, the service classes, methods, and methods of parameters have been obtained, the next is the method of execution, continue to enter the HelloService. Processor. Hello# getResult:

  public hello_result getResult(I iface, hello_args args) 
      throws org.apache.thrift.TException {
    hello_result result = new hello_result();
    result.success = iface.hello(args.text);
    return result;
  }
Copy the code

This iFace is HelloServiceImpl, and ultimately the HelloServiceImpl# Hello method is executed.

conclusion

This article mainly analyzes the thrift request processing process, which is as follows:

  1. When a client calls a local method, the local method passes the class name, method name, and method parameterssocketThe connection is sent to the server;
  2. After receiving the data from the client, the server finds the corresponding processing method according to the class name and method name, and the parameter value used when calling the method is the parameter value passed by the client.
  3. After the server invokes a specific method, the execution result of the method is passedsocketReturn to the client;
  4. Client passsocketWhen the result is received, it is returned to the local method.

Limited to the author’s personal level, there are inevitable mistakes in the article, welcome to correct! Original is not easy, commercial reprint please contact the author to obtain authorization, non-commercial reprint please indicate the source.

This article was first published in the wechat public number Java technology exploration, if you like this article, welcome to pay attention to the public number, let us explore together in the world of technology!