preface
Thrift provides a network service model: single-threaded, multi-threaded, event-driven, from another perspective: blocking service model, non-blocking service model.
-
Blocking service model: TSimpleServer, TThreadPoolServer.
-
Non-blocking service models: TNonblockingServer, THsHaServer, and TThreadedSelectorServer.
TServer class hierarchy:
The body of the
TServer
TServer defines the static inner class Args, which inherits from the abstract class AbstractServerArgs. AbstractServerArgs takes the Builder pattern and provides TServer with various factories:
The factory attribute | Plant type | role |
---|---|---|
ProcessorFactory | TProcessorFactory | The process-layer factory class is used for the creation of concrete TProcessor objects |
InputTransportFactory | TTransportFactory | The transport layer input factory class is used for the creation of concrete TTransport objects |
OutputTransportFactory | TTransportFactory | The transport layer outputs factory classes for the creation of concrete TTransport objects |
InputProtocolFactory | TProtocolFactory | The protocol layer input factory class is used for the creation of concrete TProtocol objects |
OutputProtocolFactory | TProtocolFactory | The protocol layer output factory class is used for the creation of concrete TProtocol objects |
Here are some of the core code for TServer:
public abstract class TServer {
public static class Args extends org.apache.thrift.server.TServer.AbstractServerArgs<org.apache.thrift.server.TServer.Args> {
public Args(TServerTransport transport) {
super(transport); }}public static abstract class AbstractServerArgs<T extends org.apache.thrift.server.TServer.AbstractServerArgs<T>> {
final TServerTransport serverTransport;
TProcessorFactory processorFactory;
TTransportFactory inputTransportFactory = new TTransportFactory();
TTransportFactory outputTransportFactory = new TTransportFactory();
TProtocolFactory inputProtocolFactory = new TBinaryProtocol.Factory();
TProtocolFactory outputProtocolFactory = new TBinaryProtocol.Factory();
public AbstractServerArgs(TServerTransport transport) { serverTransport = transport; }}protected TProcessorFactory processorFactory_;
protected TServerTransport serverTransport_;
protected TTransportFactory inputTransportFactory_;
protected TTransportFactory outputTransportFactory_;
protected TProtocolFactory inputProtocolFactory_;
protected TProtocolFactory outputProtocolFactory_;
private boolean isServing;
protected TServer(org.apache.thrift.server.TServer.AbstractServerArgs args) {
processorFactory_ = args.processorFactory;
serverTransport_ = args.serverTransport;
inputTransportFactory_ = args.inputTransportFactory;
outputTransportFactory_ = args.outputTransportFactory;
inputProtocolFactory_ = args.inputProtocolFactory;
outputProtocolFactory_ = args.outputProtocolFactory;
}
public abstract void serve(a);
public void stop(a) {}
public boolean isServing(a) {
return isServing;
}
protected void setServing(boolean serving) { isServing = serving; }}Copy the code
TServer has three methods: Serve (), Stop (), and isServing(). Serve () is used to start the service, stop() is used to close the service, and isServing() is used to detect the start-stop status of the service.
Different implementation classes of TServer start differently, so serve() is defined as an abstract method. Not all services need to exit gracefully, so the stop() method is not defined as abstract.
TSimpleServer
TSimpleServer works in the simplest blocking IO. The implementation method is simple and easy to understand. However, it can only receive and process one socket connection at a time, which is inefficient. It is mainly used to demonstrate the working process of Thrift and is rarely used in actual development.
(I) Work process
(2) Introduction to use
Server:
ServerSocket serverSocket = new ServerSocket(ServerConfig.SERVER_PORT);
TServerSocket serverTransport = new TServerSocket(serverSocket);
HelloWorldService.Processor processor =
new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
TBinaryProtocol.Factory protocolFactory = new TBinaryProtocol.Factory();
TSimpleServer.Args tArgs = new TSimpleServer.Args(serverTransport);
tArgs.processor(processor);
tArgs.protocolFactory(protocolFactory);
// A simple single-threaded service model is commonly used for testing
TServer tServer = new TSimpleServer(tArgs);
System.out.println("Running Simple Server");
tServer.serve();
Copy the code
Client:
TTransport transport = new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT);
TProtocol protocol = new TBinaryProtocol(transport);
HelloWorldService.Client client = new HelloWorldService.Client(protocol);
transport.open();
String result = client.say("Leo");
System.out.println("Result =: " + result);
transport.close();
Copy the code
(3) source code analysis
Look at the source code for the above process, the serve() method in tSimpleserver.java, as follows:
The serve() method operates:
- Set up the
TServerSocket
thelisten()
Method to start a connectionListening to the. - In order toblockingThe way a client accepts a connection request each time it entersThe connectionCreate a channel for it
TTransport
Object. - Create processor objects, input transport channel objects, output transport channel objects, input protocol objects, and output protocol objects for the client.
- through
TServerEventHandler
Object handles specific business requests.
ThreadPoolServer
The TThreadPoolServer mode works in the blocking socket mode. The main thread is responsible for blocking the incoming socket, and the specific business processing is handled by a thread pool.
(I) Work process
(2) Introduction to use
Server:
ServerSocket serverSocket = new ServerSocket(ServerConfig.SERVER_PORT);
TServerSocket serverTransport = new TServerSocket(serverSocket);
HelloWorldService.Processor<HelloWorldService.Iface> processor =
new HelloWorldService.Processor<>(new HelloWorldServiceImpl());
TBinaryProtocol.Factory protocolFactory = new TBinaryProtocol.Factory();
TThreadPoolServer.Args ttpsArgs = new TThreadPoolServer.Args(serverTransport);
ttpsArgs.processor(processor);
ttpsArgs.protocolFactory(protocolFactory);
// The thread pool service model uses standard blocking IO to pre-create a set of threads to process requests
TServer ttpsServer = new TThreadPoolServer(ttpsArgs);
System.out.println("Running ThreadPool Server");
ttpsServer.serve();
Copy the code
Client:
TTransport transport = new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT);
TProtocol protocol = new TBinaryProtocol(transport);
HelloWorldService.Client client = new HelloWorldService.Client(protocol);
transport.open();
String result = client.say("ThreadPoolClient");
System.out.println("Result =: " + result);
transport.close();
Copy the code
(3) source code analysis
ThreadPoolServer addresses TSimpleServer’s inability to support concurrency and multiple connections by introducing thread pools. The model implemented is One Thread Per Connection. To see the source code for the above process, start with a thread pool snippet:
The serve() method in tthreadPoolServer.java looks like this:
The serve() method operates:
- Set up the
TServerSocket
thelisten()
Method to start a connectionListening to the. - In order toblockingIn the manner of receivingThe clienttheConnection requestEvery time you enter oneThe connectionThat will beChannel objectEncapsulate into one
WorkerProcess
Object (WorkerProcess
To achieve theRunnabel
Interface), and submitThe thread pool. WorkerProcess
therun()
Method is responsible forThe business processCreated for the clientProcessor object,Enter the transport channel object,Output transport channel object,Input protocol objectandOutput protocol object.- through
TServerEventHandler
Object handles specific business requests.
WorkerProcess’s run() method:
(4) Advantages and disadvantages
Advantages of the TThreadPoolServer mode
Accept threads and Worker threads that process client connections are split, and data reading and business processing are handled by Thread pools. Therefore, new connections can be accepted in a timely manner when there is a large amount of concurrency.
The thread pool mode is best suited for scenarios where the server can predict the maximum number of concurrent clients, where each request can be processed in a timely manner by the business thread pool and performance is very high.
Disadvantages of the TThreadPoolServer pattern
The processing power of the thread pool mode is limited by the working capacity of the thread pool. When the number of concurrent requests exceeds the number of threads in the thread pool, new requests can only be queued.
TNonblockingServer
The TNonblockingServer mode is also single-threaded, but NIO mode, with Channel/Selector mechanism, using the IO event model to handle.
All sockets are registered with the Selector, which monitors all sockets in a thread through a seletor loop.
At the end of each selector loop, all ready sockets are processed, data is read for incoming sockets, and data is sent for incoming sockets. For the listening socket, a new business socket is generated and registered with the selector.
Note: TNonblockingServer requires that the underlying transport channel must use Tframe Transport.
(I) Work process
(2) Introduction to use
Server:
TProcessor tprocessor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
TNonblockingServerSocket tnbSocketTransport = new TNonblockingServerSocket(ServerConfig.SERVER_PORT);
TNonblockingServer.Args tnbArgs = new TNonblockingServer.Args(tnbSocketTransport);
tnbArgs.processor(tprocessor);
tnbArgs.transportFactory(new TFramedTransport.Factory());
tnbArgs.protocolFactory(new TCompactProtocol.Factory());
// To use non-blocking IO servers and clients, you need to specify the TFramedTransport data transfer mode
TServer server = new TNonblockingServer(tnbArgs);
System.out.println("Running Non-blocking Server");
server.serve();
Copy the code
Client:
TTransport transport = new TFramedTransport(new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT));
// The protocol must be consistent with that of the server
TProtocol protocol = new TCompactProtocol(transport);
HelloWorldService.Client client = new HelloWorldService.Client(protocol);
transport.open();
String result = client.say("NonBlockingClient");
System.out.println("Result =: " + result);
transport.close();
Copy the code
(3) source code analysis
TNonblockingServer inheritance in AbstractNonblockingServer, here we are more concerned with nio-based selector that part of the key code.
(4) Advantages and disadvantages
Advantages of TNonblockingServer mode
Compared with TSimpleServer, TNonblockingServer uses non-blocking I/O to monitor and process I/O events such as Accept, read, and write, and also monitors the status changes of multiple sockets.
Disadvantages of the TNonblockingServer pattern
The TNonblockingServer pattern still uses a single-threaded order for business processing. When the service processing is complex and time-consuming, for example, some interface functions need to read the database and take a long time to execute, the whole service will be blocked. In this case, the efficiency of this mode is not high, because multiple invocation request tasks are still executed one after another.
THsHaServer
THsHaServer inherits from TNonblockingServer and introduces thread pools to improve the concurrency of task processing. THsHaServer is a half-sync/half-async processing mode. Half-aysnc is used for I/O event processing (Accept/Read/Write), and half-sync is used for RPC synchronization by service handlers.
Note: THsHaServer, like TNonblockingServer, requires the underlying transport channel to use Tframe Transport.
(I) Work process
(2) Introduction to use
Server:
TNonblockingServerSocket tnbSocketTransport = new TNonblockingServerSocket(ServerConfig.SERVER_PORT);
TProcessor tprocessor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
// Half synchronous and half asynchronous
THsHaServer.Args thhsArgs = new THsHaServer.Args(tnbSocketTransport);
thhsArgs.processor(tprocessor);
thhsArgs.transportFactory(new TFramedTransport.Factory());
thhsArgs.protocolFactory(new TBinaryProtocol.Factory());
TServer server = new THsHaServer(thhsArgs);
System.out.println("Running HsHa Server");
server.serve();
Copy the code
Client:
TTransport transport = new TFramedTransport(new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT));
// The protocol must be consistent with that of the server
TProtocol protocol = new TBinaryProtocol(transport);
HelloWorldService.Client client = new HelloWorldService.Client(protocol);
transport.open();
String result = client.say("HsHaClient");
System.out.println("Result =: " + result);
transport.close();
Copy the code
(3) source code analysis
THsHaServer inherits TNonblockingServer and adds the ability for thread pools to concurrently process work tasks.
Task thread pool creation process:
The following TThreadedSelectorServer includes most of the features of THsHaServer, source code analysis can refer to TThreadedSelectorServer.
(4) Advantages and disadvantages
The advantages of THsHaServer
THsHaServer compared with TNonblockingServer mode, THsHaServer delivers business processing to a thread pool after data reading, and the main thread directly returns to the next cycle operation, greatly improving efficiency.
The disadvantage of THsHaServer
The main thread still needs to complete all socket listening, reading, and writing operations. When the number of concurrent requests is large and the amount of data to be sent is large, the new connection requests on the listening socket cannot be accepted in time.
TThreadedSelectorServer
TThreadedSelectorServer is an extension of THsHaServer that separates the read/write of the Selector from the main thread. Also introduced is the worker worker thread pool, which is also a half-sync/half-async service model.
The TThreadedSelectorServer model is currently the most advanced threading service model provided by Thrift. It consists of several components:
- a
AcceptThread
Thread object that handles listenerssocket
New connection on. - A number of
SelectorThread
Object is specifically used to process businesssocket
thenetworkI/O
Read and writeOperation of all network dataRead and writeIt’s all done by these threads. - aLoad balancer
SelectorThreadLoadBalancer
Object, used primarily forAcceptThread
threadA new one was receivedsocket
When connecting requests, decide to put thisA new connectionRequest to whichSelectorThread
thread. - a
ExecutorService
The type ofWorker thread poolIn theSelectorThread
Services are monitored in the thread. Proceduresocket
If there is a call request coming fromRequest data readAfter that, hand overExecutorService
The thread poolThe thread in does the actual execution of the call. Mainly used to deal with eachrpc
The request ofhandler
The callback processing(This part issynchronous).
(I) Work process
(2) Introduction to use
Server:
TNonblockingServerSocket serverSocket = new TNonblockingServerSocket(ServerConfig.SERVER_PORT);
TProcessor processor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
// Multithreading is half synchronous and half asynchronous
TThreadedSelectorServer.Args ttssArgs = new TThreadedSelectorServer.Args(serverSocket);
ttssArgs.processor(processor);
ttssArgs.protocolFactory(new TBinaryProtocol.Factory());
// When using non-blocking IO, both the server and client need to specify TFramedTransport
ttssArgs.transportFactory(new TFramedTransport.Factory());
// Multi-threaded semi-synchronous and semi-asynchronous service model
TThreadedSelectorServer server = new TThreadedSelectorServer(ttssArgs);
System.out.println("Running ThreadedSelector Server");
server.serve();
Copy the code
Client:
for (int i = 0; i < 10; i++) {
new Thread("Thread " + i) {
@Override
public void run(a) {
For non-blocking services, use TFramedTransport(for sending data in chunks)
for (int j = 0; j < 10; j++) {
TTransport transport = null;
try {
transport = new TFramedTransport(new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT));
TProtocol protocol = new TBinaryProtocol(transport);
HelloWorldService.Client client = new HelloWorldService.Client(protocol);
transport.open();
String result = client.say("ThreadedSelector Client");
System.out.println("Result =: " + result);
transport.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
// Close the channel
transport.close();
}
}
}
}.start();
}
Copy the code
(iii) Core code
The AcceptThread, SelectorThread, and ExecutorService components of the workflow are defined in the source code as follows:
The TThreadedSelectorServer pattern has a dedicated AcceptThread for new connection requests, so it can respond to a large number of concurrent connection requests in a timely manner. In addition, it disperses network I/O operations into multiple SelectorThreads. Therefore, network I/O can be read and written quickly, which is a good response to a large number of network I/ OS.
The TThreadedSelectorServer default parameters are defined as follows:
- The default number of selectorThreads for network I/O reads and writes is 2
- The default number of workerThreads for business processing is 5
- Worker thread pool Task queue size (acceptQueueSizePerThread) of a single thread: 4
Creates, initializes, and starts acceptThreads and SelectorThreads, and starts SelectorThreads, a load balancer for the selector thread.
AcceptThread source
Acceptthreads inherit from threads and contain three important attributes: Non-blocking type transmission channel (TNonblockingServerTransport), NIO selector (acceptSelector) and selector threads load balancer (threadChooser).
If we look at the run() method of acceptThreads, we can see that the accept thread calls the select() method repeatedly once it is started:
Look at the select() method. The acceptSelector waits for an I/O event to arrive, takes the SelectionKey, and checks whether it is an Accept event. If so, a new connection is received through the handleAccept() method; Otherwise, if an AcceptThread receives AN I/O read or write event, the AcceptThread submits the request to the SelectorThread.
In the handleAccept() method, the connection channel is first fetched through doAccept(), and then the Selector thread load balancer selects a Selector thread to complete the following IO read and write events.
Moving on to the implementation of the doAddAccept() method, which, unsurprisingly, further calls the addAcceptedConnection() method of SelectorThread to pass the non-blocking transfer channel object to the SelectorThread for further IO reads and writes.
SelectorThreadLoadBalancer source
How to create SelectorThreadLoadBalancer?
SelectorThreadLoadBalancer is a polling algorithm based the Selector Selector, thread by thread iterator for the new incoming SelectorThread join order allocation.
SelectorThread source
A SelectorThread, like an AcceptThread, is an internal member class of TThreadedSelectorServer. Each SelectorThread object has a blocking queue that holds the connection channels received by the thread.
The size of the blocking queue can be specified by the constructor:
As you can see above, the addAcceptedConnection() method of SelectorThread is called in the doAddAccept() method of AcceptThread.
This method does two things:
- Will be the
SelectorThread
thread-receivedConnection channelIn theBlocking queueIn the. - through
wakeup()
Way to turnSelectorThread
In theNIO
The selectorselector
.
Since SelectorThread also descends from Thread, look at the implementation of its run() method:
The select() of the SelectorThread method listens for IO events and only handles data reads and writes. If the connection has data to read, read it and cache it in frame mode. If data needs to be written to the connection, cache and send the client data. After data read and write processing is completed, the SelectionKey of the SelectionKey needs to be cleared and deregister to the SELECTOR of NIO.
- Data write operationWhen it’s done, the whole
rpc
The call is over,handleWrite()
The method is as follows:
- Data read operationAnd when it’s done,
Thrift
Make use ofRead the dataperformThe target method.handleRead()
The method is as follows:
After the handleRead method executes the read() method and completes the data reading, the requestInvoke() method invokes the target method to complete the specific business processing. The requestInvoke() method encapsulates the request data as a Runnable object and submits it to the ExecutorService thread pool for processing.
After completion of the select () method, a thread to continue running processAcceptedConnections () method handles the next connect IO events.
Here are a few core operations:
- Try from
SelectorThread
theBlocking queueacceptedQueue
Get aConnected transport channel. If successful, callregisterAccepted()
Methods; Otherwise, enter the next loop. registerAccepted()
Method will beTransmission channelAt the bottom of theThe connectionRegistered toNIO
theThe selectorselector
Above, we get oneSelectionKey
.- To create a
FrameBuffer
Object and bind to the obtainedSelectionKey
Above, used in the middle of data transmissionRead and write cache.
conclusion
This article introduces various threaded service models of Thrift, including two blocking service models: TSimpleServer, TThreadPoolServer, and three non-blocking service models: TNonblockingServer, THsHaServer, and TThreadedSelectorServer. The specific usage, workflow, principle and source code implementation of various service models are analyzed to a certain extent.
In view of the long space, please read slowly!
A link to the
-
Apache Thrift Series Details (I) – Overview and Introduction
-
Apache Thrift series details (II) – Network services Model
-
Apache Thrift – Serialization mechanism
Welcome to pay attention to the technical public number: Zero one Technology Stack
This account will continue to share learning materials and articles on back-end technologies, including virtual machine basics, multithreaded programming, high-performance frameworks, asynchronous, caching and messaging middleware, distributed and microservices, architecture learning and progression.