preface

Thrift provides a network service model: single-threaded, multi-threaded, event-driven, from another perspective: blocking service model, non-blocking service model.

  • Blocking service model: TSimpleServer, TThreadPoolServer.

  • Non-blocking service models: TNonblockingServer, THsHaServer, and TThreadedSelectorServer.

TServer class hierarchy:


The body of the

TServer

TServer defines the static inner class Args, which inherits from the abstract class AbstractServerArgs. AbstractServerArgs takes the Builder pattern and provides TServer with various factories:

The factory attribute Plant type role
ProcessorFactory TProcessorFactory The process-layer factory class is used for the creation of concrete TProcessor objects
InputTransportFactory TTransportFactory The transport layer input factory class is used for the creation of concrete TTransport objects
OutputTransportFactory TTransportFactory The transport layer outputs factory classes for the creation of concrete TTransport objects
InputProtocolFactory TProtocolFactory The protocol layer input factory class is used for the creation of concrete TProtocol objects
OutputProtocolFactory TProtocolFactory The protocol layer output factory class is used for the creation of concrete TProtocol objects

Here are some of the core code for TServer:

public abstract class TServer {
    public static class Args extends org.apache.thrift.server.TServer.AbstractServerArgs<org.apache.thrift.server.TServer.Args> {
        public Args(TServerTransport transport) {
            super(transport); }}public static abstract class AbstractServerArgs<T extends org.apache.thrift.server.TServer.AbstractServerArgs<T>> {
        final TServerTransport serverTransport;
        TProcessorFactory processorFactory;
        TTransportFactory inputTransportFactory = new TTransportFactory();
        TTransportFactory outputTransportFactory = new TTransportFactory();
        TProtocolFactory inputProtocolFactory = new TBinaryProtocol.Factory();
        TProtocolFactory outputProtocolFactory = new TBinaryProtocol.Factory();

        public AbstractServerArgs(TServerTransport transport) { serverTransport = transport; }}protected TProcessorFactory processorFactory_;
    protected TServerTransport serverTransport_;
    protected TTransportFactory inputTransportFactory_;
    protected TTransportFactory outputTransportFactory_;
    protected TProtocolFactory inputProtocolFactory_;
    protected TProtocolFactory outputProtocolFactory_;
    private boolean isServing;

    protected TServer(org.apache.thrift.server.TServer.AbstractServerArgs args) {
        processorFactory_ = args.processorFactory;
        serverTransport_ = args.serverTransport;
        inputTransportFactory_ = args.inputTransportFactory;
        outputTransportFactory_ = args.outputTransportFactory;
        inputProtocolFactory_ = args.inputProtocolFactory;
        outputProtocolFactory_ = args.outputProtocolFactory;
    }

    public abstract void serve(a);
    public void stop(a) {}

    public boolean isServing(a) {
        return isServing;
    }

    protected void setServing(boolean serving) { isServing = serving; }}Copy the code

TServer has three methods: Serve (), Stop (), and isServing(). Serve () is used to start the service, stop() is used to close the service, and isServing() is used to detect the start-stop status of the service.

Different implementation classes of TServer start differently, so serve() is defined as an abstract method. Not all services need to exit gracefully, so the stop() method is not defined as abstract.


TSimpleServer

TSimpleServer works in the simplest blocking IO. The implementation method is simple and easy to understand. However, it can only receive and process one socket connection at a time, which is inefficient. It is mainly used to demonstrate the working process of Thrift and is rarely used in actual development.

(I) Work process

(2) Introduction to use

Server:

    ServerSocket serverSocket = new ServerSocket(ServerConfig.SERVER_PORT);
    TServerSocket serverTransport = new TServerSocket(serverSocket);
    HelloWorldService.Processor processor =
            new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
    TBinaryProtocol.Factory protocolFactory = new TBinaryProtocol.Factory();

    TSimpleServer.Args tArgs = new TSimpleServer.Args(serverTransport);
    tArgs.processor(processor);
    tArgs.protocolFactory(protocolFactory);
    // A simple single-threaded service model is commonly used for testing
    TServer tServer = new TSimpleServer(tArgs);
    System.out.println("Running Simple Server");
    tServer.serve();
Copy the code

Client:

    TTransport transport = new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT);
    TProtocol protocol = new TBinaryProtocol(transport);
    HelloWorldService.Client client = new HelloWorldService.Client(protocol);
    transport.open();

    String result = client.say("Leo");
    System.out.println("Result =: " + result);
    transport.close();
Copy the code

(3) source code analysis

Look at the source code for the above process, the serve() method in tSimpleserver.java, as follows:

The serve() method operates:

  1. Set up theTServerSocketthelisten()Method to start a connectionListening to the.
  2. In order toblockingThe way a client accepts a connection request each time it entersThe connectionCreate a channel for itTTransportObject.
  3. Create processor objects, input transport channel objects, output transport channel objects, input protocol objects, and output protocol objects for the client.
  4. throughTServerEventHandlerObject handles specific business requests.

ThreadPoolServer

The TThreadPoolServer mode works in the blocking socket mode. The main thread is responsible for blocking the incoming socket, and the specific business processing is handled by a thread pool.

(I) Work process

(2) Introduction to use

Server:

    ServerSocket serverSocket = new ServerSocket(ServerConfig.SERVER_PORT);
    TServerSocket serverTransport = new TServerSocket(serverSocket);
    HelloWorldService.Processor<HelloWorldService.Iface> processor =
            new HelloWorldService.Processor<>(new HelloWorldServiceImpl());

    TBinaryProtocol.Factory protocolFactory = new TBinaryProtocol.Factory();
    TThreadPoolServer.Args ttpsArgs = new TThreadPoolServer.Args(serverTransport);
    ttpsArgs.processor(processor);
    ttpsArgs.protocolFactory(protocolFactory);

    // The thread pool service model uses standard blocking IO to pre-create a set of threads to process requests
    TServer ttpsServer = new TThreadPoolServer(ttpsArgs);
    System.out.println("Running ThreadPool Server");
    ttpsServer.serve();
Copy the code

Client:

    TTransport transport = new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT);
    TProtocol protocol = new TBinaryProtocol(transport);
    HelloWorldService.Client client = new HelloWorldService.Client(protocol);

    transport.open();
    String result = client.say("ThreadPoolClient");
    System.out.println("Result =: " + result);
    transport.close();
Copy the code

(3) source code analysis

ThreadPoolServer addresses TSimpleServer’s inability to support concurrency and multiple connections by introducing thread pools. The model implemented is One Thread Per Connection. To see the source code for the above process, start with a thread pool snippet:

The serve() method in tthreadPoolServer.java looks like this:

The serve() method operates:

  1. Set up theTServerSocketthelisten()Method to start a connectionListening to the.
  2. In order toblockingIn the manner of receivingThe clienttheConnection requestEvery time you enter oneThe connectionThat will beChannel objectEncapsulate into oneWorkerProcessObject (WorkerProcessTo achieve theRunnabelInterface), and submitThe thread pool.
  3. WorkerProcesstherun()Method is responsible forThe business processCreated for the clientProcessor object,Enter the transport channel object,Output transport channel object,Input protocol objectandOutput protocol object.
  4. throughTServerEventHandlerObject handles specific business requests.

WorkerProcess’s run() method:

(4) Advantages and disadvantages

Advantages of the TThreadPoolServer mode

Accept threads and Worker threads that process client connections are split, and data reading and business processing are handled by Thread pools. Therefore, new connections can be accepted in a timely manner when there is a large amount of concurrency.

The thread pool mode is best suited for scenarios where the server can predict the maximum number of concurrent clients, where each request can be processed in a timely manner by the business thread pool and performance is very high.

Disadvantages of the TThreadPoolServer pattern

The processing power of the thread pool mode is limited by the working capacity of the thread pool. When the number of concurrent requests exceeds the number of threads in the thread pool, new requests can only be queued.


TNonblockingServer

The TNonblockingServer mode is also single-threaded, but NIO mode, with Channel/Selector mechanism, using the IO event model to handle.

All sockets are registered with the Selector, which monitors all sockets in a thread through a seletor loop.

At the end of each selector loop, all ready sockets are processed, data is read for incoming sockets, and data is sent for incoming sockets. For the listening socket, a new business socket is generated and registered with the selector.

Note: TNonblockingServer requires that the underlying transport channel must use Tframe Transport.

(I) Work process

(2) Introduction to use

Server:

    TProcessor tprocessor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
    TNonblockingServerSocket tnbSocketTransport = new TNonblockingServerSocket(ServerConfig.SERVER_PORT);

    TNonblockingServer.Args tnbArgs = new TNonblockingServer.Args(tnbSocketTransport);
    tnbArgs.processor(tprocessor);
    tnbArgs.transportFactory(new TFramedTransport.Factory());
    tnbArgs.protocolFactory(new TCompactProtocol.Factory());

    // To use non-blocking IO servers and clients, you need to specify the TFramedTransport data transfer mode
    TServer server = new TNonblockingServer(tnbArgs);
    System.out.println("Running Non-blocking Server");
    server.serve();
Copy the code

Client:

    TTransport transport = new TFramedTransport(new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT));
    // The protocol must be consistent with that of the server
    TProtocol protocol = new TCompactProtocol(transport);
    HelloWorldService.Client client = new HelloWorldService.Client(protocol);
    transport.open();

    String result = client.say("NonBlockingClient");
    System.out.println("Result =: " + result);
    transport.close();
Copy the code

(3) source code analysis

TNonblockingServer inheritance in AbstractNonblockingServer, here we are more concerned with nio-based selector that part of the key code.

(4) Advantages and disadvantages

Advantages of TNonblockingServer mode

Compared with TSimpleServer, TNonblockingServer uses non-blocking I/O to monitor and process I/O events such as Accept, read, and write, and also monitors the status changes of multiple sockets.

Disadvantages of the TNonblockingServer pattern

The TNonblockingServer pattern still uses a single-threaded order for business processing. When the service processing is complex and time-consuming, for example, some interface functions need to read the database and take a long time to execute, the whole service will be blocked. In this case, the efficiency of this mode is not high, because multiple invocation request tasks are still executed one after another.

THsHaServer

THsHaServer inherits from TNonblockingServer and introduces thread pools to improve the concurrency of task processing. THsHaServer is a half-sync/half-async processing mode. Half-aysnc is used for I/O event processing (Accept/Read/Write), and half-sync is used for RPC synchronization by service handlers.

Note: THsHaServer, like TNonblockingServer, requires the underlying transport channel to use Tframe Transport.

(I) Work process

(2) Introduction to use

Server:

    TNonblockingServerSocket tnbSocketTransport = new TNonblockingServerSocket(ServerConfig.SERVER_PORT);
    TProcessor tprocessor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
    // Half synchronous and half asynchronous
    THsHaServer.Args thhsArgs = new THsHaServer.Args(tnbSocketTransport);
    thhsArgs.processor(tprocessor);
    thhsArgs.transportFactory(new TFramedTransport.Factory());
    thhsArgs.protocolFactory(new TBinaryProtocol.Factory());

    TServer server = new THsHaServer(thhsArgs);
    System.out.println("Running HsHa Server");
    server.serve();
Copy the code

Client:

    TTransport transport = new TFramedTransport(new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT));
    // The protocol must be consistent with that of the server
    TProtocol protocol = new TBinaryProtocol(transport);
    HelloWorldService.Client client = new HelloWorldService.Client(protocol);
    transport.open();

    String result = client.say("HsHaClient");
    System.out.println("Result =: " + result);
    transport.close();
Copy the code

(3) source code analysis

THsHaServer inherits TNonblockingServer and adds the ability for thread pools to concurrently process work tasks.

Task thread pool creation process:

The following TThreadedSelectorServer includes most of the features of THsHaServer, source code analysis can refer to TThreadedSelectorServer.

(4) Advantages and disadvantages

The advantages of THsHaServer

THsHaServer compared with TNonblockingServer mode, THsHaServer delivers business processing to a thread pool after data reading, and the main thread directly returns to the next cycle operation, greatly improving efficiency.

The disadvantage of THsHaServer

The main thread still needs to complete all socket listening, reading, and writing operations. When the number of concurrent requests is large and the amount of data to be sent is large, the new connection requests on the listening socket cannot be accepted in time.


TThreadedSelectorServer

TThreadedSelectorServer is an extension of THsHaServer that separates the read/write of the Selector from the main thread. Also introduced is the worker worker thread pool, which is also a half-sync/half-async service model.

The TThreadedSelectorServer model is currently the most advanced threading service model provided by Thrift. It consists of several components:

  1. aAcceptThreadThread object that handles listenerssocketNew connection on.
  2. A number ofSelectorThreadObject is specifically used to process businesssocketthenetworkI/ORead and writeOperation of all network dataRead and writeIt’s all done by these threads.
  3. aLoad balancerSelectorThreadLoadBalancerObject, used primarily forAcceptThreadthreadA new one was receivedsocketWhen connecting requests, decide to put thisA new connectionRequest to whichSelectorThreadthread.
  4. aExecutorServiceThe type ofWorker thread poolIn theSelectorThreadServices are monitored in the thread. ProceduresocketIf there is a call request coming fromRequest data readAfter that, hand overExecutorServiceThe thread poolThe thread in does the actual execution of the call. Mainly used to deal with eachrpcThe request ofhandlerThe callback processing(This part issynchronous).

(I) Work process

(2) Introduction to use

Server:

    TNonblockingServerSocket serverSocket = new TNonblockingServerSocket(ServerConfig.SERVER_PORT);
    TProcessor processor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
    // Multithreading is half synchronous and half asynchronous
    TThreadedSelectorServer.Args ttssArgs = new TThreadedSelectorServer.Args(serverSocket);
    ttssArgs.processor(processor);
    ttssArgs.protocolFactory(new TBinaryProtocol.Factory());
    // When using non-blocking IO, both the server and client need to specify TFramedTransport
    ttssArgs.transportFactory(new TFramedTransport.Factory());

    // Multi-threaded semi-synchronous and semi-asynchronous service model
    TThreadedSelectorServer server = new TThreadedSelectorServer(ttssArgs);
    System.out.println("Running ThreadedSelector Server");
    server.serve();
Copy the code

Client:

for (int i = 0; i < 10; i++) {
    new Thread("Thread " + i) {
        @Override
        public void run(a) {
            For non-blocking services, use TFramedTransport(for sending data in chunks)
            for (int j = 0; j < 10; j++) {
                TTransport transport = null;
                try {
                    transport = new TFramedTransport(new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT));
                    TProtocol protocol = new TBinaryProtocol(transport);
                    HelloWorldService.Client client = new HelloWorldService.Client(protocol);
                    transport.open();
                    String result = client.say("ThreadedSelector Client");
                    System.out.println("Result =: " + result);
                    transport.close();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    // Close the channel
                    transport.close();
                }
            }
        }
    }.start();
}
Copy the code

(iii) Core code

The AcceptThread, SelectorThread, and ExecutorService components of the workflow are defined in the source code as follows:

The TThreadedSelectorServer pattern has a dedicated AcceptThread for new connection requests, so it can respond to a large number of concurrent connection requests in a timely manner. In addition, it disperses network I/O operations into multiple SelectorThreads. Therefore, network I/O can be read and written quickly, which is a good response to a large number of network I/ OS.

The TThreadedSelectorServer default parameters are defined as follows:

  • The default number of selectorThreads for network I/O reads and writes is 2
  • The default number of workerThreads for business processing is 5
  • Worker thread pool Task queue size (acceptQueueSizePerThread) of a single thread: 4

Creates, initializes, and starts acceptThreads and SelectorThreads, and starts SelectorThreads, a load balancer for the selector thread.

AcceptThread source

Acceptthreads inherit from threads and contain three important attributes: Non-blocking type transmission channel (TNonblockingServerTransport), NIO selector (acceptSelector) and selector threads load balancer (threadChooser).

If we look at the run() method of acceptThreads, we can see that the accept thread calls the select() method repeatedly once it is started:

Look at the select() method. The acceptSelector waits for an I/O event to arrive, takes the SelectionKey, and checks whether it is an Accept event. If so, a new connection is received through the handleAccept() method; Otherwise, if an AcceptThread receives AN I/O read or write event, the AcceptThread submits the request to the SelectorThread.

In the handleAccept() method, the connection channel is first fetched through doAccept(), and then the Selector thread load balancer selects a Selector thread to complete the following IO read and write events.

Moving on to the implementation of the doAddAccept() method, which, unsurprisingly, further calls the addAcceptedConnection() method of SelectorThread to pass the non-blocking transfer channel object to the SelectorThread for further IO reads and writes.

SelectorThreadLoadBalancer source

How to create SelectorThreadLoadBalancer?

SelectorThreadLoadBalancer is a polling algorithm based the Selector Selector, thread by thread iterator for the new incoming SelectorThread join order allocation.

SelectorThread source

A SelectorThread, like an AcceptThread, is an internal member class of TThreadedSelectorServer. Each SelectorThread object has a blocking queue that holds the connection channels received by the thread.

The size of the blocking queue can be specified by the constructor:

As you can see above, the addAcceptedConnection() method of SelectorThread is called in the doAddAccept() method of AcceptThread.

This method does two things:

  1. Will be theSelectorThreadthread-receivedConnection channelIn theBlocking queueIn the.
  2. throughwakeup()Way to turnSelectorThreadIn theNIOThe selectorselector.

Since SelectorThread also descends from Thread, look at the implementation of its run() method:

The select() of the SelectorThread method listens for IO events and only handles data reads and writes. If the connection has data to read, read it and cache it in frame mode. If data needs to be written to the connection, cache and send the client data. After data read and write processing is completed, the SelectionKey of the SelectionKey needs to be cleared and deregister to the SELECTOR of NIO.

  • Data write operationWhen it’s done, the wholerpcThe call is over,handleWrite()The method is as follows:

  • Data read operationAnd when it’s done,ThriftMake use ofRead the dataperformThe target method.handleRead()The method is as follows:

After the handleRead method executes the read() method and completes the data reading, the requestInvoke() method invokes the target method to complete the specific business processing. The requestInvoke() method encapsulates the request data as a Runnable object and submits it to the ExecutorService thread pool for processing.

After completion of the select () method, a thread to continue running processAcceptedConnections () method handles the next connect IO events.

Here are a few core operations:

  1. Try fromSelectorThreadtheBlocking queueacceptedQueueGet aConnected transport channel. If successful, callregisterAccepted()Methods; Otherwise, enter the next loop.
  2. registerAccepted()Method will beTransmission channelAt the bottom of theThe connectionRegistered toNIOtheThe selectorselectorAbove, we get oneSelectionKey.
  3. To create aFrameBufferObject and bind to the obtainedSelectionKeyAbove, used in the middle of data transmissionRead and write cache.

conclusion

This article introduces various threaded service models of Thrift, including two blocking service models: TSimpleServer, TThreadPoolServer, and three non-blocking service models: TNonblockingServer, THsHaServer, and TThreadedSelectorServer. The specific usage, workflow, principle and source code implementation of various service models are analyzed to a certain extent.

In view of the long space, please read slowly!

A link to the

  1. Apache Thrift Series Details (I) – Overview and Introduction

  2. Apache Thrift series details (II) – Network services Model

  3. Apache Thrift – Serialization mechanism


Welcome to pay attention to the technical public number: Zero one Technology Stack

This account will continue to share learning materials and articles on back-end technologies, including virtual machine basics, multithreaded programming, high-performance frameworks, asynchronous, caching and messaging middleware, distributed and microservices, architecture learning and progression.