Author: Cony
Guide language: The RPC call of TARS micro-service open source framework includes client and server. The series of articles “RPC source code analysis of TARS micro-service open source framework RPC source code analysis” will be divided into four parts: first understanding the client, synchronous and asynchronous call of the client, first understanding the server and workflow of the server, using C++ language as the carrier. To understand how TARS RPC calls work.
What is the TARS
TARS is a micro-service development framework used by Tencent for ten years. Currently, it supports C++, Java, PHP, node.js and Go languages. This open source project provides users with a complete set of micro-service platform PaaS solutions involving development, operation and maintenance, as well as testing, helping a product or service to develop, deploy, test, and launch quickly. At present, this framework is applied in Tencent’s major core businesses, and the scale of service nodes deployed and operated based on this framework reaches hundreds of thousands.
The TARS communication model includes both client and server. Client and server communicate with each other mainly through RPC. This series of articles is divided into two parts, the RPC call part of the source code analysis. This is the next part, we will take C++ language as the carrier, take you to understand the TARS server.
Getting to know the server
When using TARS to build the RPC server, TARS will help you generate an XXXServer class, which inherits from the Application class, declare the variable XXXServer g_app, and call the function:
g_app.main(argc, argv);
g_app.waitForShutdown();
Copy the code
Then you can start the TARS RPC service. Before we dive into the server code for TARS, let’s introduce a few important classes to give you an overview.
Application
As mentioned earlier, a server is an Application that helps users read configuration files, initialize proxies (if the server needs to invoke other services) and services from the configuration file, and create and start network threads and business threads.
TC_EpollServer
TC_EpollServer is the real server, and if Application is the fan, TC_EpollServer is the motor. TC_EpollServer is responsible for two large modules, the network module and the business module, which are the two classes that will be introduced below.
NetThread
Represents the network module, which contains TC_Epoller as IO reuse, TC_Socket to establish socket connections, and ConnectionList to record many socket connections to clients. Any data sent or received related to the network is related to NetThread. In the configuration file, use netthreads under /tars/application/server to configure the number of Netthreads
HandleGroup and Handle…
The Handle is a thread that executes PRC service, and the HandleGroup composed of many Handles is a group of business threads of the same RPC service. The business thread is responsible for calling the user-defined service code and storing the processing results in the send cache for the network module to send. How the business thread calls the user-defined code is explained in detail below, using simple C++ reflection, which is not mentioned in many sources. In the configuration file, use/tars/application/server/threads under xxxAdapter configuration in a HandleGroup Handle the number of threads (business).
BindAdapter
The xxxAdapter under /tars/ Application /server in the configuration file is the configuration of BindAdapter. A BindAdapter represents a service entity. BindAdapter represents an external listening socket for an RPC service. It also declares the maximum number of connections, the size of the receive queue, the number of business threads, the name of the RPC service, and the protocol used.
The BindAdapter itself can be thought of as an instance of a service, which can establish a real listening socket and serve external services. It is associated with both the network module NetThread and the service module HandleGroup. For example, The first thread of multiple Netthreads listens to the BindAdapter listen socket. If a client is connected to the BindAdapter, one of the listen sockets is randomly selected from the multiple Netthreads. Puts the connection into the ConnectionList of the selected NetThread. A BindAdapter is usually associated with a set of handlegroups in which business threads execute services corresponding to the BindAdapter. As you can see, the BindAdapter is associated with both the network module and the service module.
Ok, now that we have introduced these classes, let’s look at the relationship between them through the class diagram:
In the TC_EpollServer management class diagram, the network module on the left and the service module on the right are responsible for establishing and managing the network relationship of the server, while the service module on the right is responsible for executing the service code of the server. The two modules are integrated through BindAdapter to perform RPC services externally.
Initialize the
Like the client, the server also needs initialization to build the whole as described above. According to the above introduction, initialization can be divided into two modules — initialization of the network module and initialization of the business module. Void main() and void waitForQuit() are used to initialize the pipe signal, read the configuration file, etc. These will be ignored. We mainly see how to build the network part through epoll and build listen socket, and how to set up the business thread group to build the business part.
TC_EpollServer initialization
Before initializing the network module and service module, TC_EpollServer needs to be initialized. The main code is as follows:
void Application::main(int argc, char *argv[]) { ...... // Initialize the Server part initializeServer(); . }Copy the code
InitializeServer () populates the various static member variables in ServerConfig to be retrieved as needed. TC_EpollServer(iNetThreadNum) = new TC_EpollServer(iNetThreadNum);
TC_EpollServer::TC_EpollServer(unsigned int iNetThreadNum)
{
if(_netThreadNum < 1) { _netThreadNum = 1; } // The number of network threads cannot be 15if(_netThreadNum > 15)
{
_netThreadNum = 15;
}
for(size_t i = 0; i < _netThreadNum; ++i) { TC_EpollServer::NetThread* netThreads = new TC_EpollServer::NetThread(this); _netThreads.push_back(netThreads); }}Copy the code
After that, an AdminAdapter was actually created, but unlike the normal RPC service BindAdapter, it will not be described here.
Now that TC_EpollServer has been built, how to arrange the left (network module) and right (business module) protection for it?
The network module is initialized. Procedure
Before explaining the network module, take a closer look at the class diagram for the network module:
Let’s take a look at some of the code in Application related to the initialization of network modules:
void Application::main(int argc, char *argv[]) { ...... vector<TC_EpollServer::BindAdapterPtr> adapters; // Bind objects and portsbindAdapter(adapters); . _epollServer->createEpoll(); } void Application::waitForShutdown()
{
waitForQuit(); . }Copy the code
The initialization of the network part is inseparable from the establishment of the monitoring ports (socket, bind, listen) of each RPC service, the connection (accept) of the receiving client, and the establishment of epoll. So when and where do you call these functions? The general process is shown in the figure below:
1. Create a Listen socket for the service entity
First in Application::main(), call:
vector<TC_EpollServer::BindAdapterPtr> adapters; // Bind objects and portsbindAdapter(adapters);
Copy the code
Create a service entity bindAdapter in Application::bindAdapter(), /tars/application/server; /tars/application/server; /tars/application/server;
BindAdapterPtr bindAdapter = new BindAdapter(_epollServer.get());
_epollServer->bind(bindAdapter);
Copy the code
To determine the listen socket of the service entity. As you can see, in TC_EpollServer::bind() :
int TC_EpollServer::bind(TC_EpollServer::BindAdapterPtr &lsPtr)
{
int iRet = 0;
for(size_t i = 0; i < _netThreads.size(); ++i)
{
if(i == 0)
{
iRet = _netThreads[i]->bind(lsPtr);
}
else{// If the listeners are not listening on the socket, the list uses the maximum number of connections set to the adapter as the initialization _netThreads[I]->setListSize(lsPtr->getMaxConns()); }}return iRet;
}
Copy the code
The first network thread in the network thread group created at the initialization of TC_EpollServer above is responsible for creating and listening to the listen socket of the service entity, thus avoiding the scare effect of multiple threads listening to the same FD.
As you can see, we continue to call NetThread:: Bind (BindAdapterPtr &lsPtr), which does some preparatory work, NetThread::bind(const TC_Endpoint &ep, TC_Socket &s) :
void TC_EpollServer::NetThread::bind(const TC_Endpoint &ep, TC_Socket &s)
{
int type= ep.isUnixLocal()? AF_LOCAL:AF_INET;if(ep.isTcp())
{
s.createSocket(SOCK_STREAM, type);
}
else
{
s.createSocket(SOCK_DGRAM, type);
}
if(ep.isUnixLocal())
{
s.bind(ep.getHost().c_str());
}
else
{
s.bind(ep.getHost(), ep.getPort());
}
if(ep.isTcp() && ! ep.isUnixLocal()) { s.listen(1024); s.setKeepAlive(); s.setTcpNoDelay(); // Do not set closewaitS.setnoclosewait (); } s.setblock(false);
}
Copy the code
BindAdapterPtr (BindAdapterPtr &lsPtr); You can also see that the NetThread records the BindAdapter that the FD is listening to:
_listeners[s.getfd()] = lsPtr;
Copy the code
The following figure summarizes the process for creating a Listen socket for a service entity
2. To create an epoll
The code goes back to Application::main() by executing:
_epollServer->createEpoll();
Copy the code
To get TC_EpollServer to create epoll in the network thread it manages:
void TC_EpollServer::createEpoll()
{
for(size_t i = 0; i < _netThreads.size(); ++i) { _netThreads[i]->createEpoll(i+1); } initUdp() cannot be called until createEpoll() is initialized by all network threads.for(size_t i = 0; i < _netThreads.size(); ++i) { _netThreads[i]->initUdp(); }}Copy the code
The uint32_createepoll (uint32_t iIndex) function is used as a function to initialize netthreads. The listen socket created above is added to epoll (of course, only the first network thread has the Listen socket), and the ConnectionList _list is initialized. See the following figure for a summary of the process:
3. Start the network thread
Since a NetThread is a thread, you need to execute its start() function to start it. And this work is not in the Application: : the main (), but in the Application: : waitForShutdown () in the Application: : waitForQuit (finish), follow the flow chart below to see the code, is clear:
Initialization of a business module
Also, as with network modules, before explaining business modules, take a careful look at the related class diagram of business modules:
During business module initialization, we need to figure out two questions: How does the business module establish a relationship with the user-populated implementation of XXXServantImp so that Handle can call the user-defined RPC methods when a request comes in? When and where are business threads started, and how do they wait for requests to arrive?
Take a look at what code in the Application relates to initialization of a business module:
void Application::main(int argc, char *argv[])
{
......
vector<TC_EpollServer::BindAdapterPtr> adapters;
bindAdapter(adapters); // Initialize the business application (); // Set the HandleGroup to start the threadfor (size_t i = 0; i < adapters.size(); ++i)
{
string name = adapters[i]->getName();
string groupName = adapters[i]->getHandleGroupName();
if(name ! = groupName) { TC_EpollServer::BindAdapterPtr ptr = _epollServer->getBindAdapter(groupName);if(! ptr) { throw runtime_error("[TARS][adater `" + name + "` setHandle to group `" + groupName + "` fail!"); }}setHandle(adapters[i]); } // Start the business processing thread _epollServer->startHandle(); . }Copy the code
The first problem is solved in bindAdapter(Adapters) and Initialize (). The rest of the code creates and starts the Handle business thread group.
1. Associate a BindAdapter with a user-defined method
How to correlate? Take a look at the following code flow diagram:
How do I enable business threads to invoke user-defined code? With the ServantHelperManager as a bridge, business threads can be indexed to service ids by the BindAdapter ID. This is then indexed by the service ID to a generator of the user-defined XXXServantImp class. With the generator, the business thread can generate the XXXServantImp class and invoke the methods within it. Let’s do it step by step.
In the Application::bindAdapter() call to Application::main(), you can see the following code:
for (size_t i = 0; i < adapterName.size(); i++)
{
……
string servant = _conf.get("/tars/application/server/" + adapterName[i] + "<servant>");
checkServantNameValid(servant, sPrefix);
ServantHelperManager::getInstance()->setAdapterServant(adapterName[i], servant); ... }Copy the code
For example, adapterNamei for MyDemo. StringServer. StringServantAdapter,. And the servant is MyDemo StringServer. StringServantObj, these are all in the configuration file is read, The former is the ID of the BindAdapter, while the latter is the service ID. In ServantHelperManager:: setAdapterServant(), just execute:
void ServantHelperManager::setAdapterServant(const string &sAdapter, const string &sServant)
{
_adapter_servant[sAdapter] = sServant;
_servant_adapter[sServant] = sAdapter;
}
Copy the code
The two member variables are simply:
/** * map<string, string> _adapter_servant; / / map<string, string> _servant_adapter;Copy the code
This is just a mapping record, which can then be indexed to the service ID by the BindAdapter ID. From the service ID, simple C++ reflection can be used to derive the user implemented XXXServantImp class to obtain the user implemented method.
How to implement reflection from service ID to class? Help is also required through the ServantHelperManager. In Application::main(), after Application::bindAdapter() executes initialize(), which is a pure virtual function that actually executes functions of the derived XXXServer class, like this:
void
StringServer::initialize()
{
//initialize application here:
//...
addServant<StringServantImp>(ServerConfig::Application + "." + ServerConfig::ServerName + ".StringServantObj");
}
Copy the code
The code ends up executing ServantHelperManager:: addServant() :
template<typename T>
void addServant(const string &id,bool check = false)
{
if(check && _servant_adapter.end() == _servant_adapter.find(id))
{
cerr<<"[TARS]ServantHelperManager::addServant "<< id <<" not find adapter.(maybe not conf in the web)"<<endl;
throw runtime_error("[TARS]ServantHelperManager::addServant " + id + " not find adapter.(maybe not conf in the web)");
}
_servant_creator[id] = new ServantCreation<T>();
}
Copy the code
The parameter const string & id is a service id, for example above MyDemo. StringServer. StringServantObj, T is the user fill the implementation XXXServantImp classes.
_servant_creatorID = new ServantCreation() is the key to the function, _servant_creator is map<string, ServantHelperCreationPtr>, You can index to the ServantHelperCreationPtr by service ID. What is the ServantHelperCreationPtr? The class generator that helped us generate an instance of XXXServantImp is simple C++ reflection:
/** * Servant */ class ServantHelperCreation : public TC_HandleBase { public: virtual ServantPtr create(const string &s) = 0; }; typedef TC_AutoPtr<ServantHelperCreation> ServantHelperCreationPtr; ////////////////////////////////////////////////////////////////////////////// /** * Servant */ template<class T> struct ServantCreation : public ServantHelperCreation { ServantPtr create(const string &s) { T *p = new T; p->setName(s); returnp; }};Copy the code
This is a simple reflection technique to generate the corresponding XXXServantImp class from the service ID. A business thread in a business thread group can obtain the service ID through the ServantHelperManager by obtaining the ID of the BindAdapter of the business it wants to execute. With the service ID you can get the generator of the XXXServantImp class to generate the XXXServantImp class to execute the user-defined RPC methods inside. Now look back at the figure (2-8) to get a general idea of the process.
2. Start the Handle service thread
All that remains is to create the HandleGroup, bind it to the BindAdapter, bind it to the TC_EpollServer, and then create/start the Handle thread under the HandleGroup. The process of starting Handle involves obtaining the service class generator mentioned above in “associating a BindAdapter with a user-defined method.” Take a look at the code flow diagram:
There are two parts here. The first part is to execute the following code in Application::main() :
// Set the HandleGroup to start the threadfor (size_t i = 0; i < adapters.size(); ++i)
{
string name = adapters[i]->getName();
string groupName = adapters[i]->getHandleGroupName();
if(name ! = groupName) { TC_EpollServer::BindAdapterPtr ptr = _epollServer->getBindAdapter(groupName);if(! ptr) { throw runtime_error("[TARS][adater `" + name + "` setHandle to group `" + groupName + "` fail!"); }}setHandle(adapters[i]);
}
Copy the code
Traverse the every BindAdapter defined in the configuration file (for example MyDemo. StringServer. StringServantAdapter), and set up the business for its thread group HandleGroup, Make the RPC method corresponding to the BindAdapter available to all threads in the thread group. The trace code is as follows:
void Application::setHandle(TC_EpollServer::BindAdapterPtr& adapter)
{
adapter->setHandle<ServantHandle>();
}
Copy the code
Note that ServantHandle is a derived class of Handle, which is the business processing thread class, and then comes:
template<typename T> void setHandle()
{
_pEpollServer->setHandleGroup<T>(_handleGroupName, _iHandleNum, this);
}
Copy the code
In TC_EpollServer:: setHandleGroup(), the code that actually creates the business thread group HandleGroup and its threads and associates the thread group with BindAdapter and TC_EpollServer:: setHandleGroup() :
/** * Create a handle group. If it already exists, return @param name * @return HandlePtr
*/
template<class T> void setHandleGroup(const string& groupName, int32_t handleNum, BindAdapterPtr adapter)
{
map<string, HandleGroupPtr>::iterator it = _handleGroups.find(groupName);
if (it == _handleGroups.end())
{
HandleGroupPtr hg = new HandleGroup();
hg->name = groupName;
adapter->_handleGroup = hg;
for (int32_t i = 0; i < handleNum; ++i)
{
HandlePtr handle = new T();
handle->setEpollServer(this);
handle->setHandleGroup(hg);
hg->handles.push_back(handle);
}
_handleGroups[groupName] = hg;
it = _handleGroups.find(groupName);
}
it->second->adapters[adapter->getName()] = adapter;
adapter->_handleGroup = it->second;
}
Copy the code
Here, you can see the creation of the business thread group: HandleGroupPtr Hg = new HandleGroup(); Business thread creation: HandlePtr Handle = new T() (T is ServantHandle); Establish relationships, such as BindAdapter and HandleGroup correlation: it->second-> adaptersAdapter ->getName() = adapter and adapter->_handleGroup = it->second. After executing the above code, you should have the following class diagram:
TC_EpollServer:: setHandleGroup() :
As the layers of functions exit, the code comes back to Application::main() and then executes:
// Start the business process thread _epollServer->startHandle();Copy the code
In TC_EpollServer::startHandle(), all business thread groups in the business module HandleGroup controlled by TC_EpollServer are traversed, and each Handle in the group is traversed, and its start() method is executed to start the thread:
void TC_EpollServer::startHandle()
{
if(! _handleStarted) { _handleStarted =true;
for (auto& kv : _handleGroups)
{
auto& hds = kv.second->handles;
for (auto& handle : hds)
{
if(! handle->isAlive()) handle->start(); }}}}Copy the code
Because the Handle is inherited from TC_Thread, in carrying out the Handle: : start (), will perform virtual function Handle: : run (), in the Handle: : run () in the main is to perform two functions, one is ServantHandle: : initialize (), The other is Handle::handleImp() :
void TC_EpollServer::Handle::run()
{
initialize();
handleImp();
}
Copy the code
ServantHandle: : initialize () to users is the main purpose of the implementation of the RPC methods, its implementation principle and above (” 2.2.3 business module initialization “1 in dot” BindAdapter and user defined methods “) as mentioned, Find the generator of the user-filled implementation’s XXXServantImp class and generate an instance of the XXXServantImp class using the ID of the associated BindAdapter and the ServantHelpManager. Pair <string, ServantPtr> with the service name and put it in map<string, ServantPtr> ServantHandle:: <string, ServantPtr> ServantHandle:: _servants <string, ServantPtr> ServantHandle:: _servants
void ServantHandle::initialize() { map<string, TC_EpollServer::BindAdapterPtr>::iterator adpit; / / get the Handle of the associated BindAdapter map < string, TC_EpollServer: : BindAdapterPtr > & adapters = _handleGroup - > adapters. // Iterate through all bindAdaptersfor(adpit = adapters.begin(); adpit ! = adapters.end(); ++adpit) {// Use the ServantHelperManager to get the service pointer -- ServantPtr servant = of the XXXServantImp class ServantHelperManager::getInstance()->create(adpit->first); // Put Pointers into map<string, ServantPtr> ServantHandle:: _servantsif (servant)
{
_servants[servant->getName()] = servant;
}
else
{
TLOGERROR("[TARS]ServantHandle initialize createServant ret null, for adapter `" + adpit->first + "`"<< endl); }}... }Copy the code
The main function of Handle::handleImp() is to block the business thread waiting on the condition variable. Here you can see the _handleGroup->monitor.timedWait(_iWaitTime) function, which blocks the wait on the condition variable:
void TC_EpollServer::Handle::handleImp() {... struct timespec ts;while(! getEpollServer()->isTerminate()) { { TC_ThreadLock::Lock lock(_handleGroup->monitor);if(allAdapterIsEmpty() && allFilterIsEmpty()) { _handleGroup->monitor.timedWait(_iWaitTime); }}}... }Copy the code
The Handle thread uses condition variables to make all business threads block and wait to be woken up. Since this chapter is about initialization, the code will be read here and explained later. How to find and perform business with map<string, ServantPtr> ServantHandle:: _servants Now review the above code flow with a functional flowchart:
Work on the server
After the initialization, the server will enter the working state, the server worker thread is divided into two types, as the network thread and business thread introduced previously, the network thread is responsible for accepting the client connection and sending and receiving data, while the business thread is only concerned with the execution of user defined PRC method, Both threads have been started with start() at initialization.
Most servers are executed according to the accept()->read()->write()->close() process. The general working flow chart is as follows:
The TARS server is no exception.
The decision logic is realized by Epoll IO reuse model. Each network thread, NetThread, has a TC_Epoller to collect, listen and distribute events.
As mentioned earlier, only the first network thread listens for the Connection. Once a new Connection is accepted, a Connection instance is constructed and the network thread that handles the Connection is selected.
Once the request is read in, it is temporarily stored in the receive queue and the business thread is notified to process it. Here, the business thread finally steps in, processes the request, and puts the result on the send queue.
There is data in the send queue and the network thread needs to be notified to send the data. The network thread that receives the send notification sends the response to the client.
The workflow of TARS server is basically like this. As shown in the figure above, the workflow of ordinary server is not much different. The following four parts of receiving client connection, reading RPC request, processing RPC request, and sending RPC response will introduce the work of the server side one by one.
Accept client connections
TC_Epoller ::run(); TC_Epoller ::run(); TC_Epoller ::run();
_epoller.add(kv.first, H64(ET_LISTEN) | kv.first, EPOLLIN);
Copy the code
From epoll_wait () returns, a consortium of epoll_event epoll_data will be (ET_LISTEN | listen socket ‘fd), and obtain high 32 bits, is ET_LISTEN, Then execute the case ET_LISTEN branch in switch below
try
{
const epoll_event &ev = _epoller.get(i);
uint32_t h = ev.data.u64 >> 32;
switch(h)
{
caseEt_listeners: {auto it = _listeners. Find (ev.data.u32);if( it ! = _listeners.end()) {if(ev.events & EPOLLIN)
{
bool ret;
do
{
ret = accept(ev.data.u32);
}while(ret); }}}break;
caseET_CLOSE: // Closes the requestbreak;
caseET_NOTIFY: // Send notification......break;
caseET_NET: // Network request......break;
default:
assert(true); }}Copy the code
Ret = Accept (ev.data.u32) : ret = Accept (ev.data.u32) : ret = Accept (ev.data.u32) :
Before we get there, let’s review the network thread related class diagram and get an idea of accept from the diagram:
Start with NetThread::accept(int fd) for NetThread::run().
1. Accept Obtains the client socket
NetThread::accept(int fd)
// Receive connection TC_Socket s; s.init(fd,false, AF_INET);
int iRetCode = s.accept(cs, (struct sockaddr *) &stSockAddr, iSockAddrSize);
Copy the code
Through TC_Socket:: ACCEPT (), call the system function accept() to accept the socket connection of the client’s three hard handshakes, and then print and check the IP and port of the client, and analyze whether the corresponding BindAdapter is overloaded, overload will close the connection. Then set the client socket:
cs.setblock(false);
cs.setKeepAlive();
cs.setTcpNoDelay();
cs.setCloseWaitDefault();
Copy the code
At this point, the first step of the corresponding figure (2-16), accepting the client connection (the flow is shown below), is complete.
2. Create a Connection for the client socket
Create a Connection for NetThread:: Accept (int fd);
int timeout = _listeners[fd]->getEndpoint().getTimeout()/1000;
Connection *cPtr = new Connection(_listeners[fd].get(), fd, (timeout < 2 ? 2 : timeout), cs.getfd(), ip, port);
Copy the code
The parameters in the constructor are, in order, the pointer to the BindAdapter corresponding to the new client, the FD of the BindAdapter corresponding to the Listen socket, the timeout period, the FD of the client socket, the IP address of the client, and the port. In the Connection constructor, its TC_Socket is also associated with a fd:
Connection::Connection(TC_EpollServer::BindAdapter *pBindAdapter, int LFD, int timeout, int fd, const string& ip, uint16_t port) { ...... _sock.init(fd,true, AF_INET);
}
Copy the code
After the TC_Socket is associated, the client socket can be operated on through the Connection instance. At this point, step 2 of Figure 2-16, creating a Connection for the client socket, is complete (the process is shown below).
3. Select a network thread for Connection
NetThread:: Accept (int fd) select a network thread for the Connection and add it to the ConnectionList.
//addTcpConnection(cPtr);
_epollServer->addConnection(cPtr, cs.getfd(), TCP_CONNECTION);
Copy the code
TC_EpollServer: : the addConnection () of the code as shown below:
void TC_EpollServer::addConnection(TC_EpollServer::NetThread::Connection * cPtr, int fd, int iType)
{
TC_EpollServer::NetThread* netThread = getNetThreadOfFd(fd);
if(iType == 0)
{
netThread->addTcpConnection(cPtr);
}
else{ netThread->addUdpConnection(cPtr); }}Copy the code
Select a network thread for Connection* cPtr. In the flowchart, the selected network thread is called Chosen_NetThread. Choose network function is TC_EpollServer thread: : getNetThreadOfFd (int fd), according to the client socket fd and remainder get specific code is as follows:
NetThread* getNetThreadOfFd(int fd)
{
return _netThreads[fd % _netThreads.size()];
}
Copy the code
Then call the selected thread NetThread: : addTcpConnection () method (or
NetThread: : addUdpConnection (), here only introduce the method of TCP), add the Connection to the selected network ConnectionList thread, Finally performs _epoller. Add (cPtr – > getfd (), cPtr – > getId (), EPOLLIN | EPOLLOUT) the client socket fd join this network thread TC_Epoller, let the network thread is responsible for our clients to send and receive data. At this point, the third step corresponding to Figure (28) is completed (the specific process is shown in the figure below).
Receiving RPC Requests
NetThread::run(); case ET_LISTEN (); case ET_NET (); case ET_NET (); Why the case ET_NET branch? As mentioned above, the client socket fd join TC_Epoller to listening, speaking, reading and writing, is _epoller. Add (cPtr – > getfd (), cPtr – > getId (), EPOLLIN | EPOLLOUT), The second argument passed to the function is a 32-bit integer cPtr->getId(). The second argument to the function must be a 64-bit integer, so it will be a 64-bit integer cPtr->getId(). The second parameter is returned to the user as the 64-bit union epoll_data_t data in the active epoll_event structure when the registered event causes epoll_wait() to exit. So, look at the following NetThread::run() code:
try
{
const epoll_event &ev = _epoller.get(i);
uint32_t h = ev.data.u64 >> 32;
switch(h)
{
caseET_LISTEN:...break;
caseET_CLOSE: // Closes the requestbreak;
caseET_NOTIFY: // Send notification......break;
caseET_NET: // network request processNet(ev);break;
default:
assert(true); }}Copy the code
Epoll_data_t data is the highest 32 bits of the 64-bit union epoll_data_t data. If epoll_wait() is stopped because the socket receives data, the highest 32 bits of epoll_data_t data are 0. The lower 32 bits are cPtr->getId(), so h will be 0. And ET_NET is 0, so the client socket will execute the case ET_NET branch if data comes in. The following is a function flow diagram that executes the case ET_NET branch.
1. Obtain the active Connection
On receiving the RPC request to enter NetThread::processNet(), the server needs to know which client socket is activated, so it executes in NetThread::processNet() :
void TC_EpollServer::NetThread::processNet(const epoll_event &ev) { uint32_t uid = ev.data.u32; Connection *cPtr = getConnectionPtr(uid); . }Copy the code
As mentioned above, the high 32 bits of epoll_data_t data are 0 and the low 32 bits are cPtr->getId(). Through NetThread: : getConnectionPtr () can return from the ConnectionList at this moment need to read the Connection of the RPC request. Then a simple check is performed on the obtained Connection to see if epoll_event:: Events is EPOLLERR or EPOLLHUP (the specific process is shown in the following figure).
2. Receives client requests and places them in a thread-safe queue
Epoll_event :: Events is EPOLLIN. NetThread::recvBuffer() reads RPC request data, and Connection:: InsertRecvQueue () wakes up the business thread to send data.
if(ev.events & EPOLLIN) {recv_queue::queue_type vRecvData; int ret = recvBuffer(cPtr, vRecvData);if(ret < 0)
{
delConnection(cPtr,true,EM_CLIENT_CLOSE);
return;
}
if(!vRecvData.empty())
{
cPtr->insertRecvQueue(vRecvData);
}
}
Copy the code
NetThread::recvBuffer(). The server creates a thread-safe queue to hold the received data recv_queue::queue_type vRecvData. Call NetThread::recvBuffer(cPtr, vRecvData) with the Connection cPtr and recv_queue:: Queue_type vRecvData as parameters.
NetThread::recvBuffer() further calls Connection::recv() :
int NetThread::recvBuffer(NetThread::Connection *cPtr, recv_queue::queue_type &v)
{
return cPtr->recv(v);
}
Copy the code
Connection::recv() performs different receiving methods according to different transport layer protocols (LFD ==-1 if UDP is transmitted). For example, TCP performs:
iBytesReceived = ::read(_sock.getfd(), (void*)buffer, sizeof(buffer))
Copy the code
Perform different actions based on data receiving conditions, for example, FIN segment received, errno==EAGAIN. If a real request packet is received, the received data is placed in the String Connection::_recvbuffer and Connection:: parseProtocol() is called.
In Connection:: parseProtocol(), the callback protocol parsing function checks the received data. When the check passes, the element tagRecvData* recv is constructed and placed in the thread-safe queue:
tagRecvData* recv = new tagRecvData();
recv->buffer = std::move(ro);
recv->ip = _ip;
recv->port = _port;
recv->recvTimeStamp = TNOWMS;
recv->uid = getId();
recv->isOverload = false;
recv->isClosed = false; recv->fd = getfd(); // This ->_bEmptyConn =false; O.pus_back (recv);Copy the code
At this point, the RPC request data has been fully retrieved and placed on a thread-safe queue (as shown in the figure below).
3. The thread safe queue is not empty, and the service thread is awakened to send
NetThread::processNet() : Connection:: if the thread safe queue is not empty, the thread safe queue will execute the Connection:: InsertRecvQueue () :
void NetThread::processNet(const epoll_event &ev)
{
......
if(ev.events & EPOLLIN) // There is data to read {......if(! vRecvData.empty()) { cPtr->insertRecvQueue(vRecvData); }}... }Copy the code
In Connection:: insertRecvQueue(), the BindAdapter is overloaded, which can be divided into three cases: unoverloaded, half-overloaded and fully overloaded. If the overload can discard all the RPC request data in the thread safe queue, otherwise you will perform BindAdapter: : insertRecvQueue ().
Code in BindAdapter: : insertRecvQueue (), there are two main actions, the first is to get to the RPC request packets into BindAdapter receive queue – recv_queue _rbuffer:
_rbuffer.push_back(vtRecvData)
Copy the code
The second is the HandleGroup thread group that wakes up the waiting condition variable:
_handleGroup->monitor.notify()
Copy the code
Now, after receiving the RPC request data, the server’s network thread finally wakes up the business thread (see figure below), and it’s time for the business module to see how to process the RPC request.
Handling RPC requests
After receiving the request data, wake up the HandleGroup (_handleGroup->monitor.notify()), which is mentioned in “2.2.3 Initialization of the Service Module” (2), “Start the Handle service thread”. In the Handle::handleImp() function _handleGroup->monitor.timedWait(_iWaitTime). Through the condition variable, the business threads in the HandleGroup block together waiting for the network thread to wake them up. Now that we’ve finally notified the condition variable, what happens to the request? Here, you need to review section 2.2.3 to see what ServantHandle::_servants actually carry.
Ok, there are three steps to process RPC request: construct request context, call user-implemented method to process request, push response packet to thread-safe queue and notify network thread. The specific function flow is shown in the figure below, now further analysis:
1. Get the request data to construct the request context
When the business thread is awakened from the condition variable, it retrieves the request data from the BindAdapter it is responsible for: Adapter – > waitForRecvQueue (recv, 0), in BindAdapter: : waitForRecvQueue (), will be from the thread safe queue recv_queue BindAdapter: : _ rbuffer retrieve data:
bool BindAdapter::waitForRecvQueue(tagRecvData* &recv, uint32_t iWaitTime)
{
bool bRet = false;
bRet = _rbuffer.pop_front(recv, iWaitTime);
if(! bRet) {return bRet;
}
return bRet;
}
Copy the code
Remember where data was pushed into a thread-safe queue? Yes, in point 3 of “2.3.2 Receiving RPC requests”, “Thread-safe queue is not empty, wake up business thread to send”.
Next, call ServantHandle:: Handle () to process the received RPC request data.
The first step in the process as shown in this section headings – the request context structure, using ServantHandle: : createCurrent () :
void ServantHandle::handle(const TC_EpollServer::tagRecvData &stRecvData) { TarsCurrentPtr current = createCurrent(stRecvData); . }Copy the code
In ServantHandle: : createCurrent (), the first new TarsCurrent instance, and then call the initialize () method, In TarsCurrent::initialize(const TC_EpollServer::tagRecvData &stRecvData, int64_t beginTime), Put the contents of the RPC request packet into the request context TarsCurrentPtr Current, and you only need to focus on this request context later. Also note that the TARS protocol uses TarsCurrent:: Initialize (const String &sRecvBuffer) to put the contents of the request package into the request context, otherwise the memcpy() system call is used to copy the contents. Here is a brief summary of the process in this section:
2. Processing requests (TARS protocol only)
Once the request context is retrieved, it is time to process it.
Void ServantHandle::handle(const TC_EpollServer::tagRecvData &stRecvData) {// Construct request context TarsCurrentPtr current = createCurrent(stRecvData);if(! current)return; // Process the requestif (current->getBindAdapter()->isTarsProtocol())
{
handleTarsProtocol(current);
}
else{ handleNoTarsProtocol(current); }}Copy the code
This RPC framework supports BOTH TARS and non-TARS protocols. The following will only introduce the processing of TARS protocols. For non-TARS protocols, the analysis process is similar. Readers who are interested in non-TARS protocols can compare and analyze the non-TARS part. Before we get there, let’s take a look at the service-related inheritance architecture, so don’t confuse the three classes:
Okay, now focus on ServantHandle: : handleTarsProtocol (const TarsCurrentPtr problem t) function. First paste the code:
Void ServantHandle: : handleTarsProtocol (const TarsCurrentPtr & current) {/ / 1 - the request context current preprocessing the servant / / 2 - finding the right service map<string, ServantPtr>::iterator sit = _servants.find(current->getServantName());if (sit == _servants.end())
{
current->sendResponse(TARSSERVERNOSERVANTERR);
return;
}
int ret = TARSSERVERUNKNOWNERR;
string sResultDesc = ""; vector<char> buffer; Try {//3- ret = sit->second-> Dispatch (current, buffer); } catch(TarsDecodeException &ex) {... } catch(TarsEncodeException &ex) {... } catch(exception &ex) {... } catch(...) {... } // Echo back the response, the third point to analyze...... }Copy the code
In the function, the request context will be preprocessed, such as set call validity check, dyeing processing, etc. We then get the service object based on the service name in the context: Map <string, ServantPtr>::iterator sit = _servants. Find (current->getServantName()), _servants is given content in the second sub-point “Handle Business thread Startup” of “2.2.3 Initialization of Business Modules”. Its key is the service ID (or service name) and value is the pointer to the instance of the service XXXServantImp implemented by the user.
Ret = sit->second-> Dispatch (current, buffer), Servant:: Dispatch () (as shown in Figure (2-26), because XXXServantImp is from XXXServant, XXXServantImp is from Servant, so it is actually the method of executing the Servant), different agreements are used to handle the Servant differently. TARS ::TarsCurrentPtr _current, vector &_sResponseBuffer (TARS ::TarsCurrentPtr _current, vector &_sResponseBuffer)
int Servant::dispatch(TarsCurrentPtr current, vector<char> &buffer)
{
int ret = TARSSERVERUNKNOWNERR;
if (current->getFuncName() == "tars_ping") {// omit}else if(! Current ->getBindAdapter()->isTarsProtocol()else
{
TC_LockT<TC_ThreadRecMutex> lock(*this);
ret = onDispatch(current, buffer);
}
return ret;
}
Copy the code
The XXXServant class is generated when executing Tars2Cpp. It generates pure virtual functions based on the user-defined tars file and the onDispatch() method.
- 1. Find the function corresponding to the request data in this service class;
- 2. Decode function parameters in request data;
- 3. Execute the corresponding USER-DEFINED RPC method of the XXXServantImp class.
- 4. Results after the implementation of the coding function;
- 5. Return tars: : TARSSERVERSUCCESS.
In practice, users can disable the autoreply function (e.g., current->setResponse(false)) and send a reply (e.g., Servant ->async_response_XXXAsync(current, RET, rStr)). At this point, the server has executed the RPC method, so here’s a quick summary of this section:
3. Push the response packet to the thread-safe queue and notify the network thread
After processing the RPC request and executing the RPC method, the result (buffer in the following code) needs to be sent back to the client:
Void ServantHandle: : handleTarsProtocol (const TarsCurrentPtr & current) {/ / 1 - the request context current preprocessing the servant / / 2 - finding the right service //3- Business logic processing // echo response, which is analyzed in this sectionif(current->isResponse()) { current->sendResponse(ret, buffer, TarsCurrent::TARS_STATUS(), sResultDesc); }}Copy the code
Since business and network are independent, after receiving the request packet, the network thread uses condition variables to notify the business thread, and how can the business thread notify the network thread? The network thread is blocked in epoll, so epoll is needed to notify the network thread. This time we’ll look at the summary and then the code:
In ServantHandle: : handleTarsProtocol (), the final step is to echo response packet. The return of the packet goes through the following steps: encode the response — find the network thread that received the request because we need to notify it to do the work — put the response packet into the send queue of the network thread — wake up the network thread using epoll features. Let’s focus on NetThread::send():
void TC_EpollServer::NetThread::send(uint32_t uid, const string &s, const string &ip, uint16_t port)
{
if(_bTerminate)
{
return;
}
tagSendData* send = new tagSendData();
send->uid = uid;
send->cmd = 's'; send->buffer = s; send->ip = ip; send->port = port; _sbuffer.push_back(send); _epoll.mod (_notify.getfd(), H64(ET_NOTIFY), EPOLLOUT); }Copy the code
At this point, the business module in the server has done its job, and it is the network module’s job to send the response data to the client.
Sending an RPC Response
Mod (_notify.getfd(), H64(ET_NOTIFY), EPOLLOUT) Add to this our previous analysis of “2.3.1 Accept client please connect” and “2.3.2 Receive RPC request”, we know that we must start with NetThread::run() and enter the case ET_NOTIFY branch:
try
{
const epoll_event &ev = _epoller.get(i);
uint32_t h = ev.data.u64 >> 32;
switch(h)
{
caseET_LISTEN:...break;
caseET_CLOSE: // Closes the requestbreak;
caseET_NOTIFY: // Send notification processPipe();break;
caseET_NET: // Network request......break;
default:
assert(true); }}Copy the code
In NetThread::processPipe(), the response packet is first fetched from the thread-safe queue: _sbufqueue.dequeue (sendp, false), here echoes the third dot “push response packet to thread-safe queue and notify network thread” in “2.3.3 Handling RPC requests”. Then get the uid of the Connection corresponding to the request information from the response information and use the UID to getConnection: Connection *cPtr = getConnectionPtr(sendp->uid). Since Connection is converged with TC_Socket, the subsequent response data is sent back to the client through Connection, as shown in the following figure:
Summary of server work
Here’s a graphical summary of how the server works:
conclusion
TARS can quickly build systems and automatically generate code with ease of use and high performance in mind, helping developers and enterprises quickly build their own stable and reliable distributed applications in a microservice manner, allowing developers to focus only on business logic and improve operational efficiency. Multi-language, agile development, high availability, and efficient operations make TARS an enterprise-class product.
“RPC source code analysis of TARS Microservices Open Source framework” series of articles are divided into two parts, the source code analysis of RPC call part. In the next part of this article, we take you through the TARS server. Welcome to the introduction to TARS C++ client
TARS micro services to help you digital transformation, welcome to visit:
TARS website: TarsCloud.org
TARS source: github.com/TarsCloud
Get the TARS Official Training Ebook: wj.qq.com/s2/6570357/…
Or scan code to obtain: