demand

The Apache Thrift software framework, for scalable cross-language services development, combines a software stack with a code generation engine to build services that work efficiently and seamlessly between C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, OCaml and Delphi and other languages.

Thrift is a lightweight, cross-language framework for remote service invocation, originally developed by Facebook and later made its way into the Apache open source project. It generates RPC server/client template code for various major languages through its own IDL intermediate language and with the help of a code generation engine.

Thrift supports many different programming languages, including C++, Java, Python, PHP, and Ruby. This series focuses on how to configure and use Thrift based on the Java language.

design

layered

The Thrift software stack consists of the Transport Layer, Protocol Layer, Processor Layer, and Server Layer from bottom to top.

  • Transport Layer: The Transport Layer is responsible for reading and writing data directly from the network. It defines the specific network Transport protocol. Such as TCP/IP transmission.
  • Protocol Layer: The Protocol Layer defines the data transmission format and is responsible for serialization and deserialization of data transmitted over the network. Examples include JSON, XML, binary data, and so on.
  • Processor Layer: The Processor Layer is generated by the specific IDL (Interface Description Language), encapsulates the specific low-level network transmission and serialization modes, and delegates the processing to the user-implemented Handler.
  • Server Layer: Integrate the above components to provide a concrete network thread /IO service model to form the final service.

transport

Encapsulates the Socket communication API

The client is TTransport and the server is TServerTransport

Implementation type:

TSocket/TServerSocket: Uses blocking I/O for transmission, which is the most common mode

TFramedTransport/TNonblockingServerSocket: use non-blocking mode, according to the size of the block transmission, similar to the Java NIO

protocol

Specify the message delivery format

The client is TProtocol, which holds the Transport object and encapsulates socket flows in the format specified for message delivery. The Server is TProtocolFactory, which specifies the protocol type

Thrift allows the user to select the types of transmission protocols between the client and server. The transmission protocols are generally divided into text and binary. To save bandwidth and improve transmission efficiency, binary transmission protocols are generally used, and text-based protocols are sometimes used based on the actual requirements of the project or product. Common protocols are as follows:

TBinaryProtocol: Transmits data in binary encoding format

TCompactProtocol: an efficient, dense binary encoding format for data transfer

TJSONProtocol: Uses the JSON text data encoding protocol for data transmission

TSimpleJSONProtocol: provides jSON-only write protocol, suitable for parsing through scripting languages

Iface/processor/Client

Define RPC interfaces and interface implementations

The client implements the Iface interface and holds a Protocol object that can send messages to the defined interface methods and receive messages back to the server. Because clients are thrift generated automatically, Therefore, you can directly use the server implementation of the Iface interface of the server processor to encapsulate the general message processing logic, read the message according to the protocol specified by the user, call back the Iface interface method implemented by the server, and return the response in the specified message format

server

The Thrift service model can be unified by holding transport and Processor objects and specifying a protocolFactory

Server only, type of implementation:

TSimpleServer: single-threaded server side, using standard blocking I/O

TThreadPoolServer: Multi-threaded server side, using standard blocking I/O

TNonblockingServer: Single-threaded server side, using non-blocking I/O

THsHaServer: semi-synchronous and semi-asynchronous server side, based on non-blocking I/O read and write and multi-threaded work task processing

TThreadedSelectorServer: the server side of the multithreaded selector, which enhances THsHaServer on the asynchronous IO model

features

Multilingual/cross-language support

Simple interface maintenance

Usage

IDL

Write the IDL file of Hello. thrift

service HelloWorldService {
  string say(1: string username)
}
Copy the code

Using a code generation tool to generate code, execute the following command

thrift -gen java hello.thrift
Copy the code

interface

Since the target directory for code generation is not specified, the generated class file is stored in the gen-java directory by default to generate a helloWorldService.java class file

The Thrift framework only needs to focus on the four core internal interfaces/classes: Iface, AsyncIface, Client and AsyncClient.

  • Iface: The server provides the specific synchronization business logic to the client by implementing the HelloWorldService.Iface interface.

  • AsyncIface: The server provides specific asynchronous business logic to the client by implementing the HelloWorldService.Iface interface.

  • Client: The Client synchronously accesses the service method provided by the server through the instance object of HelloWorldService.Client.

  • AsyncClient: the client through the HelloWorldService AsyncClient instance objects, access to the server in the form of asynchronous service method.

public class HelloWorldService {
    public interface Iface {
        public String say(String username) throws org.apache.thrift.TException;
    }

    public interface AsyncIface {
        public void say(String username, org.apache.thrift.async.AsyncMethodCallback<String> resultHandler) throws org.apache.thrift.TException;
    }

    public static class Client extends org.apache.thrift.TServiceClient implements Iface {
        public static class Factory implements org.apache.thrift.TServiceClientFactory<Client> {
            public Factory(a) {}public Client getClient(org.apache.thrift.protocol.TProtocol prot) {
                return new Client(prot);
            }

            public Client getClient(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) {
                return newClient(iprot, oprot); }}public Client(org.apache.thrift.protocol.TProtocol prot) {
            super(prot, prot);
        }

        public Client(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) {
            super(iprot, oprot);
        }

        public String say(String username) throws org.apache.thrift.TException {
            send_say(username);
            return recv_say();
        }
        / / to omit...
    }

    public static class AsyncClient extends org.apache.thrift.async.TAsyncClient implements AsyncIface {
        public static class Factory implements org.apache.thrift.async.TAsyncClientFactory<AsyncClient> {
            private org.apache.thrift.async.TAsyncClientManager clientManager;
            private org.apache.thrift.protocol.TProtocolFactory protocolFactory;

            public Factory(org.apache.thrift.async.TAsyncClientManager clientManager, org.apache.thrift.protocol.TProtocolFactory protocolFactory) {
                this.clientManager = clientManager;
                this.protocolFactory = protocolFactory;
            }

            public AsyncClient getAsyncClient(org.apache.thrift.transport.TNonblockingTransport transport) {
                return newAsyncClient(protocolFactory, clientManager, transport); }}public AsyncClient(org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.async.TAsyncClientManager clientManager, org.apache.thrift.transport.TNonblockingTransport transport) {
            super(protocolFactory, clientManager, transport);
        }

        public void say(String username, org.apache.thrift.async.AsyncMethodCallback<String> resultHandler) throws org.apache.thrift.TException {
            checkReady();
            say_call method_call = new say_call(username, resultHandler, this, ___protocolFactory, ___transport);
            this.___currentMethod = method_call;
            ___manager.call(method_call);
        }
        / / to omit...
    }
    / / to omit...
}
Copy the code

implementation

TServer

TServer defines the static inner class Args, which inherits from the abstract class AbstractServerArgs. AbstractServerArgs takes the Builder pattern and provides TServer with various factories:

The factory attribute Plant type role
ProcessorFactory TProcessorFactory The process-layer factory class is used for the creation of concrete TProcessor objects
InputTransportFactory TTransportFactory The transport layer input factory class is used for the creation of concrete TTransport objects
OutputTransportFactory TTransportFactory The transport layer outputs factory classes for the creation of concrete TTransport objects
InputProtocolFactory TProtocolFactory The protocol layer input factory class is used for the creation of concrete TProtocol objects
OutputProtocolFactory TProtocolFactory The protocol layer output factory class is used for the creation of concrete TProtocol objects

Here are some of the core code for TServer:

TServer has three methods: Serve (), Stop (), and isServing(). Serve () is used to start the service, stop() is used to close the service, and isServing() is used to detect the start-stop status of the service.

Different implementation classes of TServer start differently, so serve() is defined as an abstract method. Not all services need to exit gracefully, so the stop() method is not defined as abstract.

public abstract class TServer {
    public static class Args extends org.apache.thrift.server.TServer.AbstractServerArgs<org.apache.thrift.server.TServer.Args> {
        public Args(TServerTransport transport) {
            super(transport); }}public static abstract class AbstractServerArgs<T extends org.apache.thrift.server.TServer.AbstractServerArgs<T>> {
        final TServerTransport serverTransport;
        TProcessorFactory processorFactory;
        TTransportFactory inputTransportFactory = new TTransportFactory();
        TTransportFactory outputTransportFactory = new TTransportFactory();
        TProtocolFactory inputProtocolFactory = new TBinaryProtocol.Factory();
        TProtocolFactory outputProtocolFactory = new TBinaryProtocol.Factory();

        public AbstractServerArgs(TServerTransport transport) { serverTransport = transport; }}protected TProcessorFactory processorFactory_;
    protected TServerTransport serverTransport_;
    protected TTransportFactory inputTransportFactory_;
    protected TTransportFactory outputTransportFactory_;
    protected TProtocolFactory inputProtocolFactory_;
    protected TProtocolFactory outputProtocolFactory_;
    private boolean isServing;
    protected TServer(org.apache.thrift.server.TServer.AbstractServerArgs args) {
        processorFactory_ = args.processorFactory;
        serverTransport_ = args.serverTransport;
        inputTransportFactory_ = args.inputTransportFactory;
        outputTransportFactory_ = args.outputTransportFactory;
        inputProtocolFactory_ = args.inputProtocolFactory;
        outputProtocolFactory_ = args.outputProtocolFactory;
    }
    public abstract void serve(a);
    public void stop(a) {}
    public boolean isServing(a) {
        return isServing;
    }
    protected void setServing(boolean serving) { isServing = serving; }}Copy the code

TSimpleServer

TSimpleServer works in the simplest blocking IO. The implementation method is simple and easy to understand. However, it can only receive and process one socket connection at a time, which is inefficient. It is mainly used to demonstrate the working process of Thrift and is rarely used in actual development.

public void serve(a) {
    try {
      serverTransport_.listen();// Enable listening
    } catch (TTransportException ttx) {
      LOGGER.error("Error occurred during listening.", ttx);
      return;
    }
    // Run the preServe event
    if(eventHandler_ ! =null) {
      eventHandler_.preServe();
    }
    setServing(true);

    while(! stopped_) {// Single thread polling processing
      TTransport client = null;
      TProcessor processor = null;
      TTransport inputTransport = null;
      TTransport outputTransport = null;
      TProtocol inputProtocol = null;
      TProtocol outputProtocol = null;
      ServerContext connectionContext = null;
      try {
        client = serverTransport_.accept();// Receive client requests
        if(client ! =null) {
          processor = processorFactory_.getProcessor(client);
          inputTransport = inputTransportFactory_.getTransport(client);
          outputTransport = outputTransportFactory_.getTransport(client);
          inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
          outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
          if(eventHandler_ ! =null) {
            connectionContext = eventHandler_.createContext(inputProtocol, outputProtocol);
          }
          while (true) {
            if(eventHandler_ ! =null) {
              eventHandler_.processContext(connectionContext, inputTransport, outputTransport);
            }
            processor.process(inputProtocol, outputProtocol);// Thrirft generates the processor for processing, invokes the server implementation and returns the result}}}}Copy the code

ThreadPoolServer

The TThreadPoolServer mode works in the blocking socket mode. The main thread is responsible for blocking the incoming socket, and the specific business processing is handled by a thread pool.

  public void serve(a) {
  	if(! preServe()) {return;
  	}
  	execute();
  	waitForShutdown();
    setServing(false);
  }
  protected void execute(a) {
    int failureCount = 0;
    while(! stopped_) {try {
        TTransport client = serverTransport_.accept();// Receive client connection requests
        WorkerProcess wp = new WorkerProcess(client);

        int retryCount = 0;
        long remainTimeInMillis = requestTimeoutUnit.toMillis(requestTimeout);
        while(true) {
          try {
            executorService_.execute(wp);// Accept the connection request from the client in blocking mode, and on each connection, encapsulate the channel object into a WorkerProcess object (the WorkerProcess implements the Runnabel interface) and submit it to the thread pool.
            break; }}}}Copy the code
  • Advantages of the TThreadPoolServer mode

Accept threads and Worker threads that process client connections are split, and data reading and business processing are handled by Thread pools. Therefore, new connections can be accepted in a timely manner when there is a large amount of concurrency.

The thread pool mode is best suited for scenarios where the server can predict the maximum number of concurrent clients, where each request can be processed in a timely manner by the business thread pool and performance is very high.

  • Disadvantages of the TThreadPoolServer pattern

The processing power of the thread pool mode is limited by the working capacity of the thread pool. When the number of concurrent requests exceeds the number of threads in the thread pool, new requests can only be queued.

TNonblockingServer

The TNonblockingServer mode is also single-threaded, but NIO mode, with Channel/Selector mechanism, using the IO event model to handle.

All sockets are registered with the Selector, which monitors all sockets in a thread through a seletor loop.

At the end of each selector loop, all ready sockets are processed, data is read for incoming sockets, and data is sent for incoming sockets. For the listening socket, a new business socket is generated and registered with the selector.

Note: TNonblockingServer requires that the underlying transport channel must use Tframe Transport.

  public void serve(a) {
    // start any IO threads
    if(! startThreads()) {// Start thread processing IO
      return;
    }
    // start listening, or exit
    if(! startListening()) {return;
    }
    setServing(true);
    // this will block while we serve
    waitForShutdown();
    setServing(false);
    // do a little cleanup
    stopListening();
  }
Copy the code
public void run(a) {
  try {
    if(eventHandler_ ! =null) {
      eventHandler_.preServe();
    }

    while(! stopped_) { select();/ / nio select model
      processInterestChanges();
    }
    for(SelectionKey selectionKey : selector.keys()) { cleanupSelectionKey(selectionKey); }}}private void select(a) {
  try {
    // wait for io events.
    selector.select();// Wait for I/O events
    // process the io events we received
    Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
    while(! stopped_ && selectedKeys.hasNext()) { SelectionKey key = selectedKeys.next(); selectedKeys.remove();// skip if not valid
      if(! key.isValid()) { cleanupSelectionKey(key);continue;
      }
      // if the key is marked Accept, then it has to be the server
      // transport.
      if (key.isAcceptable()) {
        handleAccept();
      } else if (key.isReadable()) {
        // deal with reads
        handleRead(key);
      } else if (key.isWritable()) {
        // deal with writes
        handleWrite(key);
      } else {
        LOGGER.warn("Unexpected state in select! "+ key.interestOps()); }}}}Copy the code
  • Advantages of TNonblockingServer mode

Compared with TSimpleServer, TNonblockingServer uses non-blocking I/O to monitor and process I/O events such as Accept, read, and write, and also monitors the status changes of multiple sockets.

  • Disadvantages of the TNonblockingServer pattern

The TNonblockingServer pattern still uses a single-threaded order for business processing. When the service processing is complex and time-consuming, for example, some interface functions need to read the database and take a long time to execute, the whole service will be blocked. In this case, the efficiency of this mode is not high, because multiple invocation request tasks are still executed one after another.

THsHaServer

THsHaServer inherits from TNonblockingServer and introduces thread pools to improve the concurrency of task processing. THsHaServer is a half-sync/half-async processing mode. Half-aysnc is used for I/O event processing (Accept/Read/Write), and half-sync is used for RPC synchronization by service handlers.

Note: THsHaServer, like TNonblockingServer, requires the underlying transport channel to use Tframe Transport.

THsHaServer inherits TNonblockingServer and adds the ability for thread pools to concurrently process work tasks.

protected void handleRead(SelectionKey key) {
  FrameBuffer buffer = (FrameBuffer) key.attachment();
  if(! buffer.read()) { cleanupSelectionKey(key);return;
  }

  // if the buffer's frame read is complete, invoke the method.
  if (buffer.isFrameFullyRead()) {
    if(! requestInvoke(buffer)) {/ / AbstractNonblockingServer. Java when handling the read operation, call the following THsHaServer requestInvoke methodcleanupSelectionKey(key); }}}protected boolean requestInvoke(FrameBuffer frameBuffer) {
    try {
      Runnable invocation = getRunnable(frameBuffer);
      invoker.execute(invocation);// Let the thread pool handle it
      return true;
    } catch (RejectedExecutionException rx) {
      LOGGER.warn("ExecutorService rejected execution!", rx);
      return false; }}Copy the code
  • The advantages of THsHaServer

THsHaServer compared with TNonblockingServer mode, THsHaServer delivers business processing to a thread pool after data reading, and the main thread directly returns to the next cycle operation, greatly improving efficiency.

  • The disadvantage of THsHaServer

The main thread still needs to complete all socket listening, reading, and writing operations. When the number of concurrent requests is large and the amount of data to be sent is large, the new connection requests on the listening socket cannot be accepted in time.

TThreadedSelectorServer

TThreadedSelectorServer is an extension of THsHaServer that separates the read/write of the Selector from the main thread. Also introduced is the worker worker thread pool, which is also a half-sync/half-async service model.

The TThreadedSelectorServer model is currently the most advanced threading service model provided by Thrift. It consists of several components:

  • An AcceptThread object that listens for new connections on the socket.
  • Several SelectorThread objects are used to handle network I/O reads and writes of service sockets. All network data is read and written by these threads.
  • A load balancer SelectorThreadLoadBalancer object, it is mainly used for AcceptThread thread receives a new socket connection request, the decision will be the new connection request assigned to which SelectorThread threads.
  • An ExecutorService-type worker thread pool. The SelectorThread thread listens for calls coming from business sockets, reads the request data, and executes the calls to the ExecutorService thread pool threads. Handler callback processing is primarily used to handle each RPC request (this part is synchronous).
  protected boolean startThreads(a) {
    try {
      for (int i = 0; i < args.selectorThreads; ++i) {
        selectorThreads.add(new SelectorThread(args.acceptQueueSizePerThread));// The network I/O operations are distributed across multiple SelectorThreads, so the network I/O can be read and written quickly, which is a good response to a large number of network I/ OS.
      }
      acceptThread = new AcceptThread((TNonblockingServerTransport) serverTransport_,
        createSelectorThreadLoadBalancer(selectorThreads));Special acceptThreads handle new connection requests and can respond to a large number of concurrent connection requests in a timely manner
      for (SelectorThread thread : selectorThreads) {
        thread.start();
      }
      acceptThread.start();
      return true;
    } catch (IOException e) {
      LOGGER.error("Failed to start threads!", e);
      return false; }}Copy the code

reference

Official documentation Apache Thrift series of tutorial blogs