GitChat author: fan li original text: C++ high-performance server network framework design details focus on wechat public number: “GitChat technology talk” seriously about technology
【 Don’t miss the Easter egg 】
preface
In this article, we will introduce server development and explore how to develop a high performance and high concurrency server application from many aspects. It is important to note that typically large servers are complex in their business, not in their code engineering infrastructure.
Large servers are typically composed of multiple services, may support CDN, or support so-called “distributed”, etc., which this article will not cover because servers, no matter how complex, are composed of a single server. So the emphasis of this article is to discuss the structure of the individual services, and in this case, the structure is a single server layer structure of network communication, if you can really understand what I say, then on the basic structure of any business can be, can also be extended to complicated structure of the multiple server group, such as “distributed” service.
The code examples in this article, while using C++ as an example, are equally applicable to Java (I am a Java developer myself), and the principles are the same. It’s just that Java may have wrapped a virtual machine layer of interfaces on top of the basic OPERATING system network communication apis (Java may even provide some off-the-shelf apis based on common network communication framework ideas, such as NIO). For that reason, this article is not about big, empty technical terms, but about actual coding schemes or ways to optimize existing code that can guide you in the real world. In addition, the technologies discussed here involve both Windows and Linux platforms.
The so-called high performance is that the server can smoothly process the connection of each client and answer the request of the client as low as possible; High concurrency refers not only to the fact that the server can support multiple client connections at the same time, but also that these clients continue to communicate with the server during the connection. There are libraries on the web that claim to support millions or even tens of millions of concurrent connections for a single service, and then I actually looked at them and found that they only support many connections at once.
If a server can simply accept n connections (which may be large) but can’t methodically process the data going back and forth between those connections, it doesn’t make any sense. This server framework is just a “toy” and has no meaning for actual production and application.
This article will be introduced from two aspects, one is the basic network communication components in the server; The other is how to integrate these basic communication components into a complete and efficient server framework. Note: The client in the rest of this article is a relative term and refers to the terminal connected to the service in question, so the client here could be either our traditional client program or another server program connected to the service.
1. Network communication components
Along the lines described above, we will start with the network communication component of the service application.
#### Problems to be solved
Since is the server program will certainly involve the network communication part, so the server program network communication module to solve what problems? At present, there are many network communication frameworks on the network, such as Libevent, Boost ASio, ACE, but the common technical means of network communication are almost the same, at least to solve the following problems:
-
How do I detect new client connections?
-
How do I accept client connections?
-
How do I check whether data is sent from the client?
-
How do I receive data from clients?
-
How do I detect abnormal connections? What can I do if the connection is abnormal?
-
How do I send data to the client?
-
How do I close the connection after sending data to the client?
Anyone with a rudimentary network background can answer some of the above questions, such as accept for client connections, RECV for client data, send for client data, Multiplexing select, poll, epoll and other socket APIS can be used to check whether the client has new connections and whether the client has new data. Indeed, these basic Socket apis form the foundation of server network communication, and no matter how cleverly the network communication framework is designed, it is built on top of these basic Socket apis. But how to skillfully organize these basic socket apis is the key to the problem. We say that the server is very efficient, high concurrency support, in fact only a means of technical implementation, anyway, in terms of software development is a program, and so, as long as can as possible to meet “to minimize wait or not wait” this principle is efficient, that is efficient not “busy busy died, idle idle die”, Everyone can do nothing, but if there is work to be done, try to do it together, rather than one part busy doing things in turn 123456789 and the other part sitting around doing nothing. This may sound a little abstract, but let’s take some examples to illustrate it. Such as:
-
By default, the thread blocks when there is no data in recV;
-
By default, send will block if the TCP window is not large enough to send data out;
-
The connect function blocks there when connecting to the other end by default;
-
Or you can send a piece of data to the other end and wait for the other end to answer, and if the other end doesn’t answer, the current thread is blocked here.
None of the above is the way to think about efficient server development, because none of the above examples meet the “minimize waiting” principle, why wait? There is not a way that these processes don’t have to wait, and it would be nice not only not to wait, but to inform me when these things are done. So I can do something else with those CPU time slices that I would have used to wait. Multiplex: Yes, we’ll talk about IO Multiplexing.
#### Comparison of several IO multiplexing mechanisms
Windows supports SELECT, WSAAsyncSelect, WSAEventSelect, and COMPLETION port (IOCP). Linux supports Select, Poll, and epoll. Instead of going into the specifics of each function, let’s go a little deeper. The API functions listed above can be divided into two levels:
Level 1: Select and poll
Layer 2: WSAAsyncSelect, WSAEventSelect, completion port (IOCP), ePoll
Why are they so divided? The select and poll functions still actively query the socket handle for events such as readable events, writable events, or error events at certain times. If we detect some events during this time, we’re not wasting our time, but what if there are no events during this time? We are doing this, to put it bluntly, is a waste of time, because if a server has multiple connections, in the case of limited CPU time slice, we spent some time tested the part of the socket connection, only to find they what events all have no, and during this period we have some things to deal with, So why are we taking the time to do this? Isn’t it a good time to spend doing what we need to do? Therefore, for server programs to be efficient, we should try to avoid spending time actively querying for events on certain sockets, and wait for those sockets to be told to handle events. That’s what the functions at level two do. They essentially change the active query for events to when there are events, the system will tell us, and we’ll deal with them later. Just a function of level two notice the way we are different, such as WSAAsyncSelect is to use the Windows window message queue mechanism of events to inform us of the setting window procedure function, IOCP is using the GetQueuedCompletionStatus return the correct state, Epoll is returned by the epoll_wait function.
For example, if the connect function connects to the other end, if the socket used to connect is non-blocking, then connect will not complete immediately, but will return immediately without waiting. When the connection completes, WSAAsyncSelect will return an FD_CONNECT event telling us that the connection was successful. Epoll produces an EPOLLOUT event, and we know that the connection is complete. Even when the socket has data to read, WSAAsyncSelect generates FD_READ events, epoll generates EPOLLIN events, and so on. So with the above discussion, we can get the correct posture of network communication to detect readable, writable or error events. This is the second principle I bring up here: minimize the amount of time spent doing nothing. This may not be an advantage if the server has enough resources, but it can be a performance bottleneck if there are too many tasks to handle.
#### Check the correct posture of network events
According to the above introduction, firstly, in order to avoid meaningless waiting time, and secondly, the strategy of waiting for the operating system to inform us of the status of events is adopted instead of actively querying the events of each socket. Our sockets should be set to non-blocking. On this basis, we return to the seven questions mentioned in column (1) :
-
How do I detect new client connections?
-
How do I accept client connections?
The accept function blocks there by default. If epoll detects an EPOLLIN event on the listening socket, or WSAAsyncSelect detects an FD_ACCEPT event, it indicates that a new connection has been created and the Accept function is not blocked. Of course, you should also set the generated socket to non-blocking. So we can send and receive data on the new socket.
-
How do I check whether data is sent from the client?
-
How do I receive data from clients?
In the same way, we should collect data only when there is a readable event on the socket, so that we do not have to wait when we call recv or read. How much data should we collect at one time? We can make this decision based on our needs, even if you would repeat recv or read in a loop, for non-blocking sockets, recv or read will return immediately if there is no more data, and the error code EWOULDBLOCK would indicate that there is no more data currently available. Example:
bool CIUSocket::Recv() { int nRet = 0; while(true) { char buff[512]; nRet = ::recv(m_hSocket, buff, 512, 0); if(nRet == SOCKET_ERROR) // Close the Socket immediately if an error occurs {if (::WSAGetLastError() == WSAEWOULDBLOCK) break; else return false; } else if(nRet < 1) return false; m_strRecvBuf.append(buff, nRet); ::Sleep(1); } return true; }Copy the code
-
How do I detect abnormal connections? What can I do if the connection is abnormal?
Similarly, when we receive an exception event such as EPOLLERR or FD_CLOSE, we know that an exception has been generated. We usually close the corresponding socket to handle the exception. In addition, if 0 is returned by send/recv or read/write on a socket, the socket has been closed on the other end.
-
How do I send data to the client?
This is also a common network communication interview questions, a year Tencent background development position asked such a question. Sending data to the client is a bit more tricky than receiving it, and requires a bit of skill. First of all, we can’t register writable events like data readable events at the beginning, because if writable events are detected, our sockets will be writable as long as the peer receives data normally. If we set listening to writable events, writable events will be triggered frequently. But we don’t necessarily have data to send at this point. So the right thing to do is: If there are data to be sent, first try to send, if can’t send or just send out part of the rest of us need to be cached, then set up the test can be written on the socket, next time you can write event occurs, continue to send, if still can’t send, continues to set the listener can write event, so on, Until all the data has been sent. Once all the data has been sent, we remove listening for writable events to avoid useless writable event notifications. I don’t know if you have noticed that if only part of the data is sent at a time, the rest of the data should be stored for the time being. In this case, we need a buffer to store this part of the data. This buffer is called the “send buffer”. The send buffer not only stores the data that has not been sent, but also stores the new data that needs to be sent from the upper layer during the sending process. To ensure order, new data should be appended to the rest of the current data, starting at the head of the send buffer. That is to say, first come, first sent, last sent.
-
How do I close the connection after sending data to the client?
Difficult to deal with this problem, because the “send out” is not necessarily true to send out, we call the send or write function even if successful, is to write the data protocol stack on the inside of the operating system success, whether it be sent out and when to be sent out it’s hard to judge, to each other is more difficult to judge whether received. Therefore, we currently simply assume that send or write returns the size of the number of bytes we send, and we consider it “sent”. Then call the socket API such as close to close the connection. Of course, you can also call the shutdown function to achieve what is called a “semi-shutdown.” On the topic of closing connections, let’s have a separate small title dedicated to this topic.
#### Passively close and actively close connections
In practice, a connection is closed passively because we detect abnormal events such as EPOLLERR, or the peer end closes the connection, send or RECV returns 0, and the connection is no longer meaningful and we are forced to close the connection.
To close a connection, we call close/ Closesocket to close the connection. For example, clients send us illegal data, such as some tentative packets of network attacks. At this point, we close the socket connection for security reasons.
#### Send and receive buffers
The send buffer has been introduced and explained why it exists. The same is true for the receive buffer. After receiving the data, we can unpack it directly, but this is not good for reason 1: Unless some format, commonly known as the protocol such as HTTP protocol, most of the server business agreement is different, that is to say, a packet in the interpretation of the data format should be the business layer, and the network communication layer should be decoupled, more general to the network layer, we cannot know the upper protocol become what appearance, Because different protocol formats are different, they are business-related. Reason two: even if they know the protocol format, we in the network layer solution package deal with corresponding business, if the business process is time-consuming, such as the need for complex operations, or to connect to the database account password authentication, so our network thread will need a lot of time to handle these tasks, so that other network events can’t handle in time. In view of the above two points, we really need a receive buffer, put the received data into the buffer, and a special business thread or business logic to take the data out of the receive buffer, and unpack the business.
Having said that, how big should the send and receive buffers be? This is an old problem, because we often encounter such problems: pre-allocated memory is too small and not enough, too much can be wasted. What to do? The answer, like strings and vectors, is to design a buffer that can grow dynamically, allocate as needed, and expand if needed.
It is important to note that the send buffer and receive buffer mentioned here are one for each socket connection. This is our most common design scheme.
#### protocol design
Except for some common protocols, such as HTTP and FTP, most server protocols are service-specific. Once the protocol is designed, the format of the packet is set according to the protocol. We know that TCP/IP is streaming data, so streaming data is like flowing water. There is no clear boundary between packets. For example, end A sends three consecutive packets of 50 bytes to end B. End B may first receive 10 bytes and then 140 bytes. Or 20 bytes, then 20 bytes, then 110 bytes; It is possible to receive 150 bytes at a time. These 150 bytes can be received by B in any combination or number of bytes. Therefore, the first problem of protocol design is how to define the packet boundaries, that is, how to know the size of each packet data. Currently, there are three commonly used methods as follows:
-
Fixed size, this method assumes that each packet size is a fixed number of bytes, for example, each packet size discussed above is 50 bytes, and each 50 bytes received by the receiver is treated as a packet.
-
Specifies the end of a packet, such as a \r\n(newline and carriage return), so that the peer end is considered to have received a packet and the following data is the contents of the next packet.
-
Generally, the packet header is of a fixed size. There is a field in the packet header that specifies the packet body or the size of the whole packet. After receiving the data, the peer end first parses the fields in the packet head to obtain the size of the packet body or the size of the whole packet, and then defines the data boundary according to the size.
The second aspect of the protocol is that the protocol should be designed to be unpacked as easily as possible, which means that the format fields of the protocol should be as clear as possible.
Agreement to the third problem is discussed, according to the agreement to assemble a single packet should be small, note that there is a single packet, it has the following advantages: first, for some mobile terminal equipment, its data processing capacity and bandwidth ability is limited, small data can not only speed up the processing speed, and save a lot of traffic cost; Second, if a single packet is small enough, it can greatly reduce the bandwidth pressure on the server side of frequent network communication, and the system in which it is located can also use less memory. Consider: if a stock server, if a stock packet is 100 bytes or 1000 bytes, that is also 10000 stocks difference?
The fourth issue the protocol addresses is that for numeric types, we should explicitly specify the length of the value, such as long, which is 32-bit 4 bytes on a 32-bit machine, but 64-bit 8 bytes on a 64-bit machine. This is also a long, and the sender and receiver may use different lengths to decode because of different machine bits. Therefore, it is advisable to explicitly specify the length of integer fields in protocols that involve cross-platform use, such as INT32, int64, and so on. Here is an example of a protocol interface that Java programmers should be familiar with:
class BinaryReadStream
{
private:
const char* const ptr;
const size_t len;
const char* cur;
BinaryReadStream(const BinaryReadStream&);
BinaryReadStream& operator=(const BinaryReadStream&);
public:
BinaryReadStream(const char* ptr, size_t len);
virtual const char* GetData() const;
virtual size_t GetSize() const;
bool IsEmpty() const;
bool ReadString(string* str, size_t maxlen, size_t& outlen);
bool ReadCString(char* str, size_t strlen, size_t& len);
bool ReadCCString(const char** str, size_t maxlen, size_t& outlen);
bool ReadInt32(int32_t& i);
bool ReadInt64(int64_t& i);
bool ReadShort(short& i);
bool ReadChar(char& c);
size_t ReadAll(char* szBuffer, size_t iLen) const;
bool IsEnd() const;
const char* GetCurrent() const{ return cur; }
public:
bool ReadLength(size_t & len);
bool ReadLengthWithoutOffset(size_t &headlen, size_t & outlen);
};
class BinaryWriteStream
{
public:
BinaryWriteStream(string* data);
virtual const char* GetData() const;
virtual size_t GetSize() const;
bool WriteCString(const char* str, size_t len);
bool WriteString(const string& str);
bool WriteDouble(double value, bool isNULL = false);
bool WriteInt64(int64_t value, bool isNULL = false);
bool WriteInt32(int32_t i, bool isNULL = false);
bool WriteShort(short i, bool isNULL = false);
bool WriteChar(char c, bool isNULL = false);
size_t GetCurrentPos() const{ return m_data->length(); }
void Flush();
void Clear();
private:
string* m_data;
};Copy the code
Where BinaryWriteStream is the encoding protocol class and BinaryReadStream is the decoding protocol class. It can be encoded and decoded as follows.
Code:
std::string outbuf;
BinaryWriteStream writeStream(&outbuf);
writeStream.WriteInt32(msg_type_register);
writeStream.WriteInt32(m_seq);
writeStream.WriteString(retData);
writeStream.Flush();Copy the code
Decoding:
BinaryReadStream readStream(strMsg.c_str(), strMsg.length());
int32_t cmd;
if (!readStream.ReadInt32(cmd))
{
return false;
}
//int seq;
if (!readStream.ReadInt32(m_seq))
{
return false;
}
std::string data;
size_t datalength;
if (!readStream.ReadString(&data, 0, datalength))
{
return false;
}Copy the code
Second, the organization of server program structure
We’ve covered a lot of the specifics of the six headings above, and now it’s time to talk about putting them together. According to my personal experience, the current mainstream idea is one Thread one loop+reactor model (also has proactor model) strategy. Generally speaking, it is a thread per loop, that is, the function of a thread in a continuous loop to do some things in sequence, such as detecting network events, unpacking data to generate business logic. Let’s start from the simplest, set some threads in a loop to do network communication related things, pseudo-codes are as follows:
while(Exit flag) {//IO multiplexing detects socket readable events, error events // (also detects writable events if there is data to be sent) // If there are readable events, new connections are received for listening sockets; // For ordinary sockets, receive data on the socket and store the received data in the corresponding receive buffer. If an error occurs, close the connection. // If there is an error event, close the connection}Copy the code
In addition, some threads are set to process the received data and unpack the business logic. These threads can be considered as business threads, and the pseudo-codes are as follows:
// Unpack the data from the receive buffer into different services for processingCopy the code
The above structure is the most common server logic structure, but can we simplify or synthesize it? Let’s try it out. Have you ever thought about this: If the machine has two cpus (or cores, to be exact), and we have two network threads and two business logic threads, the situation might be as follows: While the business thread is running, the network thread is not running and must wait. If that is the case, why build two more threads? There is no real improvement in program performance, except maybe a little clearer structure, and CPU time is wasted on thread context switching. So, we can merge the network thread with the business logic thread, and the merged pseudo-code looks like this:
while(Exit flag) {//IO multiplexing detects socket readable events, error events // (also detects writable events if there is data to be sent) // If there are readable events, new connections are received for listening sockets; // For ordinary sockets, receive data on the socket and store the received data in the corresponding receive buffer. If an error occurs, close the connection. // If there is an error event, close the connection // Unpack the data from the receive buffer and break it into different services for processing}Copy the code
In addition, when there are no network IO events, we can process some business logic in a timely manner, and reduce unnecessary thread context switching time.
We can go one step further and even add some other tasks to the while loop, such as the program’s logical task queue, timer events, etc., with the following pseudo-codes:
while(Exit flag) {// Timer event handler //IO reuse technology detects socket readable events, error events // (if there is data to send, then also detects writable events) // If there are readable events, for listening socket receives new connections; // For ordinary sockets, receive data on the socket and store the received data in the corresponding receive buffer. If an error occurs, close the connection. // If there is an error event, close the connection // Unpack the data from the receive buffer and break it into different services for processing // Program-defined task 1 // Program-defined task 2}Copy the code
Note: The reason why the timer event is processed before the network I/O event detection is to avoid the timer event expiration time too long. If placed later, the previous processing may take a little time, by the time the timer event is processed, the time interval has passed a lot of time. While this does not guarantee that timer events are 100 percent accurate, it does. Of course, Linux provides a timer object such as eventFD, and all timer objects can be processed as fd such as socket. This is similar to the idea of libevent. Libevent encapsulates sockets, timers, and signals into a unified object for processing.
With all the theoretical stuff out of the way, muduo is a popular open source library based on boost. I changed it to C++11 and fixed some bugs.
The while loop for the core thread function described above is in eventloop.cpp:
void EventLoop::loop() { assert(! looping_); assertInLoopThread(); looping_ =true;
quit_ = false; // FIXME: what if someone calls quit() before loop() ?
LOG_TRACE << "EventLoop " << this << " start looping";
while(! quit_) { activeChannels_.clear(); pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); ++iteration_;if (Logger::logLevel() <= Logger::TRACE)
{
printActiveChannels();
}
// TODO sort channel by priority
eventHandling_ = true;
for(ChannelList::iterator it = activeChannels_.begin(); it ! = activeChannels_.end(); ++it) { currentActiveChannel_ = *it; currentActiveChannel_->handleEvent(pollReturnTime_); } currentActiveChannel_ = NULL; eventHandling_ =false;
doPendingFunctors();
if (frameFunctor_)
{
frameFunctor_();
}
}
LOG_TRACE << "EventLoop " << this << " stop looping";
looping_ = false;
}Copy the code
Poller_ ->poll Uses epoll to separate network events and then processes the separated network events. Each client socket corresponds to a connection, that is, a TcpConnection and a Channel object. CurrentActiveChannel ->handleEvent(pollReturnTime); pollReturnTime (pollReturnTime);
void Channel::handleEvent(Timestamp receiveTime)
{
std::shared_ptr<void> guard;
if (tied_)
{
guard = tie_.lock();
if(guard) { handleEventWithGuard(receiveTime); }}else
{
handleEventWithGuard(receiveTime);
}
}
void Channel::handleEventWithGuard(Timestamp receiveTime)
{
eventHandling_ = true;
LOG_TRACE << reventsToString();
if((revents_ & POLLHUP) && ! (revents_ & POLLIN)) {if (logHup_)
{
LOG_WARN << "Channel::handle_event() POLLHUP";
}
if (closeCallback_) closeCallback_();
}
if (revents_ & POLLNVAL)
{
LOG_WARN << "Channel::handle_event() POLLNVAL";
}
if (revents_ & (POLLERR | POLLNVAL))
{
if (errorCallback_) errorCallback_();
}
if(revents_ & (POLLIN | POLLPRI | POLLRDHUP)) {/ / when listening socket,readCallback_ pointing to the Acceptor: : handleRead / / when the client socket, call TcpConnection: : handleReadif (readCallback_) readCallback_(receiveTime);
}
if(revents_ & POLLOUT) {// If the socket is connected to the state server, writeCallback_ points to Connector::handleWrite().if (writeCallback_) writeCallback_();
}
eventHandling_ = false;
}Copy the code
Of course, this takes advantage of the “polymorphism” of the Channel object. If the socket is normal, the readable event will call a preset callback function. But if the socket is listening, the Aceptor object’s handleRead() is called to receive the new connection:
void Acceptor::handleRead()
{
loop_->assertInLoopThread();
InetAddress peerAddr;
//FIXME loop until no more
int connfd = acceptSocket_.accept(&peerAddr);
if (connfd >= 0)
{
// string hostport = peerAddr.toIpPort();
// LOG_TRACE << "Accepts of "<< hostport; //newConnectionCallback_ actually points to TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)if (newConnectionCallback_)
{
newConnectionCallback_(connfd, peerAddr);
}
else{ sockets::close(connfd); }}else
{
LOG_SYSERR << "in Acceptor::handleRead";
// Read the section named "The special problem of // accept()ing when you can't" in libev's doc. // By Marc Lehmann, author of livev. if (errno == EMFILE) { ::close(idleFd_); idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL); ::close(idleFd_); idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC); }}}Copy the code
The business logic processing in the main loop corresponds to:
doPendingFunctors();
if (frameFunctor_)
{
frameFunctor_();
}
[cpp] view plain copy
void EventLoop::doPendingFunctors()
{
std::vector<Functor> functors;
callingPendingFunctors_ = true;
{
std::unique_lock<std::mutex> lock(mutex_);
functors.swap(pendingFunctors_);
}
for (size_t i = 0; i < functors.size(); ++i)
{
functors[i]();
}
callingPendingFunctors_ = false;
}Copy the code
The added business logic here is to add Pointers to functions that perform tasks stored in the member variable pendingFunctors_, which is a vector of function Pointers. Each function is called when executed. The above code uses a stack variable to swap the function pointer inside the member variable pendingFunctors_, and then operates on the stack variable to reduce the granularity of the lock. Because the member variable pendingFunctors_ is also used when adding tasks, designed to operate on multiple threads, it is necessary to add a lock.
void EventLoop::queueInLoop(const Functor& cb)
{
{
std::unique_lock<std::mutex> lock(mutex_);
pendingFunctors_.push_back(cb);
}
if (!isInLoopThread() || callingPendingFunctors_)
{
wakeup();
}
}Copy the code
FrameFunctor_ is simpler by setting a function pointer. Of course, there is a trick here, that is, when adding a task, in order to be able to execute it immediately, we use the wake up mechanism to wake up the epoll by writing a few bytes to a FD, so that it will return immediately because no other socke has an event, and then execute the added task.
Let’s look at the logic of data collection:
void TcpConnection::handleRead(Timestamp receiveTime)
{
loop_->assertInLoopThread();
int savedErrno = 0;
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
if(n > 0) {//messageCallback_ points to CTcpSession::OnRead(const STD ::shared_ptr<TcpConnection>& conn, Buffer* pBuffer, Timestamp receiveTime) messageCallback_(shared_from_this(), &inputBuffer_, receiveTime); }else if (n == 0)
{
handleClose();
}
else
{
errno = savedErrno;
LOG_SYSERR << "TcpConnection::handleRead"; handleError(); }}Copy the code
Put the received data into the receive buffer, we will unpack it in the future:
void ClientSession::OnRead(const std::shared_ptr<TcpConnection>& conn, Buffer* pBuffer, Timestamp receivTime)
{
while (true) {// It is not the size of a headif (pBuffer->readableBytes() < (size_t)sizeof(msg))
{
LOG_INFO << "buffer is not enough for a package header, pBuffer->readableBytes()=" << pBuffer->readableBytes() << ", sizeof(msg)=" << sizeof(msg);
return; } // the size of the entire package is not enough MSG header; memcpy(&header, pBuffer->peek(), sizeof(msg));if (pBuffer->readableBytes() < (size_t)header.packagesize + sizeof(msg))
return;
pBuffer->retrieve(sizeof(msg));
std::string inbuf;
inbuf.append(pBuffer->peek(), header.packagesize);
pBuffer->retrieve(header.packagesize);
if(! Process(conn, inbuf.c_str(), inbuf.length())) { LOG_WARN <<"Process error, close TcpConnection";
conn->forceClose();
}
}// end while-loop
}Copy the code
Determine whether the data in the receive buffer is sufficient for a packet size. If so, determine whether it is sufficient for the packet size specified by the packet header. If so, Process the packet in the Process function.
Now look at the logic of sending data:
void TcpConnection::sendInLoop(const void* data, size_t len)
{
loop_->assertInLoopThread();
ssize_t nwrote = 0;
size_t remaining = len;
bool faultError = false;
if (state_ == kDisconnected)
{
LOG_WARN << "disconnected, give up writing";
return; } / /if no thing in output queue, try writing directly
if(! channel_->isWriting() && outputBuffer_.readableBytes() == 0) { nwrote = sockets::write(channel_->fd(), data, len);if (nwrote >= 0)
{
remaining = len - nwrote;
if (remaining == 0 && writeCompleteCallback_)
{
loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this())); }}else // nwrote < 0
{
nwrote = 0;
if(errno ! = EWOULDBLOCK) { LOG_SYSERR <<"TcpConnection::sendInLoop";
if (errno == EPIPE || errno == ECONNRESET) // FIXME: any others?
{
faultError = true;
}
}
}
}
assert(remaining <= len);
if(! faultError && remaining > 0) { size_t oldLen = outputBuffer_.readableBytes();if (oldLen + remaining >= highWaterMark_
&& oldLen < highWaterMark_
&& highWaterMarkCallback_)
{
loop_->queueInLoop(std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));
}
outputBuffer_.append(static_cast<const char*>(data)+nwrote, remaining);
if(! channel_->isWriting()) { channel_->enableWriting(); }}}Copy the code
If remaining is larger than remaining, call channel_->enableWriting(). Start listening for writable events. Writable events are handled as follows:
[cpp] view plain copy
void TcpConnection::handleWrite()
{
loop_->assertInLoopThread();
if (channel_->isWriting())
{
ssize_t n = sockets::write(channel_->fd(),
outputBuffer_.peek(),
outputBuffer_.readableBytes());
if (n > 0)
{
outputBuffer_.retrieve(n);
if (outputBuffer_.readableBytes() == 0)
{
channel_->disableWriting();
if (writeCompleteCallback_)
{
loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
}
if(state_ == kDisconnecting) { shutdownInLoop(); }}}else
{
LOG_SYSERR << "TcpConnection::handleWrite";
// if(state_ == kDisconnecting) // { // shutdownInLoop(); / /}}}else
{
LOG_TRACE << "Connection fd = " << channel_->fd()
<< " is down, no more writing"; }}Copy the code
If you call channel_->disableWriting() after sending data; Remove listening for writable events.
Many readers may have always wanted to ask, does not mean that unpack data and processing logic is business code instead of network communication, you seem to mix together, there is no, the actual code dealing with here are all framework was provided by the callback function in processing, specific how to deal with, by users of the framework – the business layer definition.
To sum up, it is actually a loop in a thread function, do not believe you look at a transaction system server project code I once worked on:
void CEventDispatcher::Run()
{
m_bShouldRun = true;
while(m_bShouldRun)
{
DispatchIOs();
SyncTime();
CheckTimer();
DispatchEvents();
}
}
void CEpollReactor::DispatchIOs()
{
DWORD dwSelectTimeOut = SR_DEFAULT_EPOLL_TIMEOUT;
if (HandleOtherTask())
{
dwSelectTimeOut = 0;
}
struct epoll_event ev;
CEventHandlerIdMap::iterator itor = m_mapEventHandlerId.begin();
for(; itor! =m_mapEventHandlerId.end(); itor++) { CEventHandler *pEventHandler = (CEventHandler *)(*itor).first;if(pEventHandler == NULL){
continue;
}
ev.data.ptr = pEventHandler;
ev.events = 0;
int nReadID, nWriteID;
pEventHandler->GetIds(&nReadID, &nWriteID);
if (nReadID > 0)
{
ev.events |= EPOLLIN;
}
if (nWriteID > 0)
{
ev.events |= EPOLLOUT;
}
epoll_ctl(m_fdEpoll, EPOLL_CTL_MOD, (*itor).second, &ev);
}
struct epoll_event events[EPOLL_MAX_EVENTS];
int nfds = epoll_wait(m_fdEpoll, events, EPOLL_MAX_EVENTS, dwSelectTimeOut/1000);
for (int i=0; i<nfds; i++)
{
struct epoll_event &evref = events[i];
CEventHandler *pEventHandler = (CEventHandler *)evref.data.ptr;
if((evref.events|EPOLLIN)! =0 && m_mapEventHandlerId.find(pEventHandler)! =m_mapEventHandlerId.end()) { pEventHandler->HandleInput(); }if((evref.events|EPOLLOUT)! =0 && m_mapEventHandlerId.find(pEventHandler)! =m_mapEventHandlerId.end()) { pEventHandler->HandleOutput(); } } } void CEventDispatcher::DispatchEvents()
{
CEvent event;
CSyncEvent *pSyncEvent;
while(m_queueEvent.PeekEvent(event))
{
int nRetval;
if(event.pEventHandler ! = NULL) { nRetval = event.pEventHandler->HandleEvent(event.nEventID, event.dwParam, event.pParam); }else
{
nRetval = HandleEvent(event.nEventID, event.dwParam, event.pParam);
}
if(event.pAdd ! {pSyncEvent=(CSyncEvent *) event.padd; pSyncEvent->nRetval = nRetval; pSyncEvent->sem.UnLock(); }}}Copy the code
Take a look at mogujie open source TeamTalk source code (code download address: github.com/baloonwj/Te…
void CEventDispatch::StartDispatch(uint32_t wait_timeout)
{
fd_set read_set, write_set, excep_set;
timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = wait_timeout * 1000; // 10 millisecond
if(running)
return;
running = true;
while (running)
{
_CheckTimer();
_CheckLoop();
if(! m_read_set.fd_count && ! m_write_set.fd_count && ! m_excep_set.fd_count) { Sleep(MIN_TIMER_DURATION);continue;
}
m_lock.lock();
memcpy(&read_set, &m_read_set, sizeof(fd_set));
memcpy(&write_set, &m_write_set, sizeof(fd_set));
memcpy(&excep_set, &m_excep_set, sizeof(fd_set));
m_lock.unlock();
int nfds = select(0, &read_set, &write_set, &excep_set, &timeout);
if (nfds == SOCKET_ERROR)
{
log("select failed, error code: %d", GetLastError());
Sleep(MIN_TIMER_DURATION);
continue; // select again
}
if (nfds == 0)
{
continue;
}
for (u_int i = 0; i < read_set.fd_count; i++)
{
//log("select return read count=%d\n", read_set.fd_count);
SOCKET fd = read_set.fd_array[i];
CBaseSocket* pSocket = FindBaseSocket((net_handle_t)fd);
if(pSocket) { pSocket->OnRead(); pSocket->ReleaseRef(); }}for (u_int i = 0; i < write_set.fd_count; i++)
{
//log("select return write count=%d\n", write_set.fd_count);
SOCKET fd = write_set.fd_array[i];
CBaseSocket* pSocket = FindBaseSocket((net_handle_t)fd);
if(pSocket) { pSocket->OnWrite(); pSocket->ReleaseRef(); }}for (u_int i = 0; i < excep_set.fd_count; i++)
{
//log("select return exception count=%d\n", excep_set.fd_count);
SOCKET fd = excep_set.fd_array[i];
CBaseSocket* pSocket = FindBaseSocket((net_handle_t)fd);
if(pSocket) { pSocket->OnClose(); pSocket->ReleaseRef(); }}}}Copy the code
Filezilla is a server-side FTP tool that uses the Windows WSAAsyncSelect model (github.com/baloonwj/fi…). :
//Processes event notifications sent by the sockets or the layers
static LRESULT CALLBACK WindowProc(HWND hWnd, UINT message, WPARAM wParam, LPARAM lParam)
{
if (message>=WM_SOCKETEX_NOTIFY)
{
//Verify parameters
ASSERT(hWnd);
CAsyncSocketExHelperWindow *pWnd=(CAsyncSocketExHelperWindow *)GetWindowLongPtr(hWnd, GWLP_USERDATA);
ASSERT(pWnd);
if(! pWnd)return 0;
if (message < static_cast<UINT>(WM_SOCKETEX_NOTIFY+pWnd->m_nWindowDataSize)) //Index is within socket storage
{
//Lookup socket and verify if it's valid CAsyncSocketEx *pSocket=pWnd->m_pAsyncSocketExWindowData[message - WM_SOCKETEX_NOTIFY].m_pSocket; SOCKET hSocket = wParam; if (! pSocket) return 0; if (hSocket == INVALID_SOCKET) return 0; if (pSocket->m_SocketData.hSocket ! = hSocket) return 0; int nEvent = lParam & 0xFFFF; int nErrorCode = lParam >> 16; //Dispatch notification if (! pSocket->m_pFirstLayer) { //Dispatch to CAsyncSocketEx instance switch (nEvent) { case FD_READ: #ifndef NOSOCKETSTATES if (pSocket->GetState() == connecting && ! nErrorCode) { pSocket->m_nPendingEvents |= FD_READ; break; } else if (pSocket->GetState() == attached) pSocket->SetState(connected); if (pSocket->GetState() ! = connected) break; // Ignore further FD_READ events after FD_CLOSE has been received if (pSocket->m_SocketData.onCloseCalled) break; #endif //NOSOCKETSTATES #ifndef NOSOCKETSTATES if (nErrorCode) pSocket->SetState(aborted); #endif //NOSOCKETSTATES if (pSocket->m_lEvent & FD_READ) { pSocket->OnReceive(nErrorCode); } break; case FD_FORCEREAD: //Forceread does not check if there's data waiting
#ifndef NOSOCKETSTATES
if(pSocket->GetState() == connecting && ! nErrorCode) { pSocket->m_nPendingEvents |= FD_FORCEREAD;break;
}
else if (pSocket->GetState() == attached)
pSocket->SetState(connected);
if(pSocket->GetState() ! = connected)break;
#endif //NOSOCKETSTATES
if (pSocket->m_lEvent & FD_READ)
{
#ifndef NOSOCKETSTATES
if (nErrorCode)
pSocket->SetState(aborted);
#endif //NOSOCKETSTATES
pSocket->OnReceive(nErrorCode);
}
break;
case FD_WRITE:
#ifndef NOSOCKETSTATES
if(pSocket->GetState() == connecting && ! nErrorCode) { pSocket->m_nPendingEvents |= FD_WRITE;break;
}
else if(pSocket->GetState() == attached && ! nErrorCode) pSocket->SetState(connected);if(pSocket->GetState() ! = connected)break;
#endif //NOSOCKETSTATES
if (pSocket->m_lEvent & FD_WRITE)
{
#ifndef NOSOCKETSTATES
if (nErrorCode)
pSocket->SetState(aborted);
#endif //NOSOCKETSTATES
pSocket->OnSend(nErrorCode);
}
break;
case FD_CONNECT:
#ifndef NOSOCKETSTATES
if (pSocket->GetState() == connecting)
{
if (nErrorCode && pSocket->m_SocketData.nextAddr)
{
if (pSocket->TryNextProtocol())
break;
}
pSocket->SetState(connected);
}
else if(pSocket->GetState() == attached && ! nErrorCode) pSocket->SetState(connected);#endif //NOSOCKETSTATES
if (pSocket->m_lEvent & FD_CONNECT)
pSocket->OnConnect(nErrorCode);
#ifndef NOSOCKETSTATES
if(! nErrorCode) {if ((pSocket->m_nPendingEvents&FD_READ) && pSocket->GetState() == connected)
pSocket->OnReceive(0);
if ((pSocket->m_nPendingEvents&FD_FORCEREAD) && pSocket->GetState() == connected)
pSocket->OnReceive(0);
if ((pSocket->m_nPendingEvents&FD_WRITE) && pSocket->GetState() == connected)
pSocket->OnSend(0);
}
pSocket->m_nPendingEvents = 0;
#endif
break;
case FD_ACCEPT:
#ifndef NOSOCKETSTATES
if(pSocket->GetState() ! = listening && pSocket->GetState() ! = attached)break;
#endif //NOSOCKETSTATES
if (pSocket->m_lEvent & FD_ACCEPT)
pSocket->OnAccept(nErrorCode);
break;
case FD_CLOSE:
#ifndef NOSOCKETSTATES
if(pSocket->GetState() ! = connected && pSocket->GetState() ! = attached)break;
// If there are still bytes left to read, call OnReceive instead of
// OnClose and trigger a new OnClose
DWORD nBytes = 0;
if(! nErrorCode && pSocket->IOCtl(FIONREAD, &nBytes)) {if (nBytes > 0)
{
// Just repeat message.
pSocket->ResendCloseNotify();
pSocket->m_SocketData.onCloseCalled = true;
pSocket->OnReceive(WSAESHUTDOWN);
break;
}
}
pSocket->SetState(nErrorCode ? aborted : closed);
#endif //NOSOCKETSTATES
pSocket->OnClose(nErrorCode);
break; }}else //Dispatch notification to the lowest layer
{
if (nEvent == FD_READ)
{
// Ignore further FD_READ events after FD_CLOSE has been received
if (pSocket->m_SocketData.onCloseCalled)
return 0;
DWORD nBytes;
if(! pSocket->IOCtl(FIONREAD, &nBytes)) nErrorCode = WSAGetLastError();if (pSocket->m_pLastLayer)
pSocket->m_pLastLayer->CallEvent(nEvent, nErrorCode);
}
else if (nEvent == FD_CLOSE)
{
// If there are still bytes left to read, call OnReceive instead of
// OnClose and trigger a new OnClose
DWORD nBytes = 0;
if(! nErrorCode && pSocket->IOCtl(FIONREAD, &nBytes)) {if (nBytes > 0)
{
// Just repeat message.
pSocket->ResendCloseNotify();
if (pSocket->m_pLastLayer)
pSocket->m_pLastLayer->CallEvent(FD_READ, 0);
return 0;
}
}
pSocket->m_SocketData.onCloseCalled = true;
if (pSocket->m_pLastLayer)
pSocket->m_pLastLayer->CallEvent(nEvent, nErrorCode);
}
else if(pSocket->m_pLastLayer) pSocket->m_pLastLayer->CallEvent(nEvent, nErrorCode); }}return 0;
}
else if (message == WM_USER) //Notification event sent by a layer
{
//Verify parameters, lookup socket and notification message
//Verify parameters
ASSERT(hWnd);
CAsyncSocketExHelperWindow *pWnd=(CAsyncSocketExHelperWindow *)GetWindowLongPtr(hWnd, GWLP_USERDATA);
ASSERT(pWnd);
if(! pWnd)return 0;
if (wParam >= static_cast<UINT>(pWnd->m_nWindowDataSize)) //Index is within socket storage
{
return 0;
}
CAsyncSocketEx *pSocket = pWnd->m_pAsyncSocketExWindowData[wParam].m_pSocket;
CAsyncSocketExLayer::t_LayerNotifyMsg *pMsg = (CAsyncSocketExLayer::t_LayerNotifyMsg *)lParam;
if(! pMsg || ! pSocket || pSocket->m_SocketData.hSocket ! = pMsg->hSocket) { delete pMsg;return 0;
}
int nEvent=pMsg->lEvent&0xFFFF;
int nErrorCode=pMsg->lEvent>>16;
//Dispatch to layer
if (pMsg->pLayer)
pMsg->pLayer->CallEvent(nEvent, nErrorCode);
else
{
//Dispatch to CAsyncSocketEx instance
switch (nEvent)
{
case FD_READ:
#ifndef NOSOCKETSTATES
if(pSocket->GetState() == connecting && ! nErrorCode) { pSocket->m_nPendingEvents |= FD_READ;break;
}
else if(pSocket->GetState() == attached && ! nErrorCode) pSocket->SetState(connected);if(pSocket->GetState() ! = connected)break;
#endif //NOSOCKETSTATES
if (pSocket->m_lEvent & FD_READ)
{
#ifndef NOSOCKETSTATES
if (nErrorCode)
pSocket->SetState(aborted);
#endif //NOSOCKETSTATES
pSocket->OnReceive(nErrorCode);
}
break;
case FD_FORCEREAD: //Forceread does not check if there's data waiting #ifndef NOSOCKETSTATES if (pSocket->GetState() == connecting && ! nErrorCode) { pSocket->m_nPendingEvents |= FD_FORCEREAD; break; } else if (pSocket->GetState() == attached && ! nErrorCode) pSocket->SetState(connected); if (pSocket->GetState() ! = connected) break; #endif //NOSOCKETSTATES if (pSocket->m_lEvent & FD_READ) { #ifndef NOSOCKETSTATES if (nErrorCode) pSocket->SetState(aborted); #endif //NOSOCKETSTATES pSocket->OnReceive(nErrorCode); } break; case FD_WRITE: #ifndef NOSOCKETSTATES if (pSocket->GetState() == connecting && ! nErrorCode) { pSocket->m_nPendingEvents |= FD_WRITE; break; } else if (pSocket->GetState() == attached && ! nErrorCode) pSocket->SetState(connected); if (pSocket->GetState() ! = connected) break; #endif //NOSOCKETSTATES if (pSocket->m_lEvent & FD_WRITE) { #ifndef NOSOCKETSTATES if (nErrorCode) pSocket->SetState(aborted); #endif //NOSOCKETSTATES pSocket->OnSend(nErrorCode); } break; case FD_CONNECT: #ifndef NOSOCKETSTATES if (pSocket->GetState() == connecting) pSocket->SetState(connected); else if (pSocket->GetState() == attached && ! nErrorCode) pSocket->SetState(connected); #endif //NOSOCKETSTATES if (pSocket->m_lEvent & FD_CONNECT) pSocket->OnConnect(nErrorCode); #ifndef NOSOCKETSTATES if (! nErrorCode) { if (((pSocket->m_nPendingEvents&FD_READ) && pSocket->GetState() == connected) && (pSocket->m_lEvent & FD_READ)) pSocket->OnReceive(0); if (((pSocket->m_nPendingEvents&FD_FORCEREAD) && pSocket->GetState() == connected) && (pSocket->m_lEvent & FD_READ)) pSocket->OnReceive(0); if (((pSocket->m_nPendingEvents&FD_WRITE) && pSocket->GetState() == connected) && (pSocket->m_lEvent & FD_WRITE)) pSocket->OnSend(0); } pSocket->m_nPendingEvents = 0; #endif //NOSOCKETSTATES break; case FD_ACCEPT: #ifndef NOSOCKETSTATES if ((pSocket->GetState() == listening || pSocket->GetState() == attached) && (pSocket->m_lEvent & FD_ACCEPT)) #endif //NOSOCKETSTATES { pSocket->OnAccept(nErrorCode); } break; case FD_CLOSE: #ifndef NOSOCKETSTATES if ((pSocket->GetState() == connected || pSocket->GetState() == attached) && (pSocket->m_lEvent & FD_CLOSE)) { pSocket->SetState(nErrorCode? aborted:closed); #else { #endif //NOSOCKETSTATES pSocket->OnClose(nErrorCode); } break; } } delete pMsg; return 0; } else if (message == WM_USER+1) { // WSAAsyncGetHostByName reply // Verify parameters ASSERT(hWnd); CAsyncSocketExHelperWindow *pWnd = (CAsyncSocketExHelperWindow *)GetWindowLongPtr(hWnd, GWLP_USERDATA); ASSERT(pWnd); if (! pWnd) return 0; CAsyncSocketEx *pSocket = NULL; for (int i = 0; i < pWnd->m_nWindowDataSize; ++i) { pSocket = pWnd->m_pAsyncSocketExWindowData[i].m_pSocket; if (pSocket && pSocket->m_hAsyncGetHostByNameHandle && pSocket->m_hAsyncGetHostByNameHandle == (HANDLE)wParam && pSocket->m_pAsyncGetHostByNameBuffer) break; } if (! pSocket || ! pSocket->m_pAsyncGetHostByNameBuffer) return 0; int nErrorCode = lParam >> 16; if (nErrorCode) { pSocket->OnConnect(nErrorCode); return 0; } SOCKADDR_IN sockAddr{}; sockAddr.sin_family = AF_INET; sockAddr.sin_addr.s_addr = ((LPIN_ADDR)((LPHOSTENT)pSocket->m_pAsyncGetHostByNameBuffer)->h_addr)->s_addr; sockAddr.sin_port = htons(pSocket->m_nAsyncGetHostByNamePort); BOOL res = pSocket->Connect((SOCKADDR*)&sockAddr, sizeof(sockAddr)); delete [] pSocket->m_pAsyncGetHostByNameBuffer; pSocket->m_pAsyncGetHostByNameBuffer = 0; pSocket->m_hAsyncGetHostByNameHandle = 0; if (! res) if (GetLastError() ! = WSAEWOULDBLOCK) pSocket->OnConnect(GetLastError()); return 0; } else if (message == WM_USER + 2) { //Verify parameters, lookup socket and notification message //Verify parameters if (! hWnd) return 0; CAsyncSocketExHelperWindow *pWnd=(CAsyncSocketExHelperWindow *)GetWindowLongPtr(hWnd, GWLP_USERDATA); if (! pWnd) return 0; if (wParam >= static_cast
(pWnd->m_nWindowDataSize)) //Index is within socket storage return 0; CAsyncSocketEx *pSocket = pWnd->m_pAsyncSocketExWindowData[wParam].m_pSocket; if (! pSocket) return 0; // Process pending callbacks std::list
tmp; tmp.swap(pSocket->m_pendingCallbacks); pSocket->OnLayerCallback(tmp); for (auto & cb : tmp) { delete [] cb.str; } } else if (message == WM_TIMER) { if (wParam ! = 1) return 0; ASSERT(hWnd); CAsyncSocketExHelperWindow *pWnd=(CAsyncSocketExHelperWindow *)GetWindowLongPtr(hWnd, GWLP_USERDATA); ASSERT(pWnd && pWnd->m_pThreadData); if (! pWnd || ! pWnd->m_pThreadData) return 0; if (pWnd->m_pThreadData->layerCloseNotify.empty()) { KillTimer(hWnd, 1); return 0; } CAsyncSocketEx* socket = pWnd->m_pThreadData->layerCloseNotify.front(); pWnd->m_pThreadData->layerCloseNotify.pop_front(); if (pWnd->m_pThreadData->layerCloseNotify.empty()) KillTimer(hWnd, 1); if (socket) PostMessage(hWnd, socket->m_SocketData.nSocketIndex + WM_SOCKETEX_NOTIFY, socket->m_SocketData.hSocket, FD_CLOSE); return 0; } return DefWindowProc(hWnd, message, wParam, lParam); }
Copy the code
If you’re not familiar with these projects, you probably don’t have any interest in looking at each line of logic. But you have to understand the logic of the architecture that I’m talking about, which is basically the principle of the current mainstream network framework. Filezilla’s network communication layer, for example, is also used in easyMule.
I’ve covered the framework for a single service application, and if you understand what I mean, I’m sure you can build a high-performance service application as well.
In addition, the server framework could add a lot of interesting details, such as flow control, to the design ideas above. Here’s another example from a project I actually worked on:
In a practical project, when the number of client connections is large and the server is processing network data, if there is data to be processed on multiple sockets at the same time, due to the limited NUMBER of CPU cores, it may occur that the worker thread has been processing the events of the first several sockets. Process the data of the next few sockets until the first few sockets are finished. It’s the equivalent of going to a restaurant and everyone has ordered food, but some tables are always served, and some tables are never served. This can’t be good. Here’s how to avoid it:
int CFtdEngine::HandlePackage(CFTDCPackage *pFTDCPackage, CFTDCSession *pSession)
{
//NET_IO_LOG0("CFtdEngine::HandlePackage\n");
FTDC_PACKAGE_DEBUG(pFTDCPackage);
if(pFTDCPackage->GetTID() ! = FTD_TID_ReqUserLogin) {if(! IsSessionLogin(pSession->GetSessionID())) { SendErrorRsp(pFTDCPackage, pSession, 1,"Customer not logged in");
return0; } } CalcFlux(pSession, pFTDCPackage->Length()); REPORT_EVENT(LOG_DEBUG,"Front/Fgateway"."Login Request %0x", pFTDCPackage->GetTID());
int nRet = 0;
switch(pFTDCPackage->GetTID())
{
caseFTD_TID_ReqUserLogin: /// huWP: 20070608: If the API version is too late, the login will be prohibitedif (pFTDCPackage->GetVersion()>FTD_VERSION)
{
SendErrorRsp(pFTDCPackage, pSession, 1, "Too High FTD Version");
return 0;
}
nRet = OnReqUserLogin(pFTDCPackage, (CFTDCSession *)pSession);
FTDRequestIndex.incValue();
break;
case FTD_TID_ReqCheckUserLogin:
nRet = OnReqCheckUserLogin(pFTDCPackage, (CFTDCSession *)pSession);
FTDRequestIndex.incValue();
break;
case FTD_TID_ReqSubscribeTopic:
nRet = OnReqSubscribeTopic(pFTDCPackage, (CFTDCSession *)pSession);
FTDRequestIndex.incValue();
break;
}
return 0;
}Copy the code
When there is readable data on a socket, the system receives the data on the socket, unpacks the received data, and calls CalcFlux(pSession, pFTDCPackage->Length()) to collect traffic statistics:
void CFrontEngine::CalcFlux(CSession *pSession, const int nFlux)
{
TFrontSessionInfo *pSessionInfo = m_mapSessionInfo.Find(pSession->GetSessionID());
if(pSessionInfo ! PSessionInfo ->nCommFlux ++; /// If the traffic exceeds the threshold, the read operation of the session is suspendedif (pSessionInfo->nCommFlux >= pSessionInfo->nMaxCommFlux)
{
pSession->SuspendRead(true); }}}Copy the code
This function increments the number of packets processed by a Session, then determines if the maximum number of packets is exceeded and sets the read suspend flag:
void CSession::SuspendRead(bool bSuspend)
{
m_bSuspendRead = bSuspend;
}Copy the code
The socket will be excluded from the socket list next time:
void CEpollReactor::RegisterIO(CEventHandler *pEventHandler)
{
int nReadID, nWriteID;
pEventHandler->GetIds(&nReadID, &nWriteID);
if(nWriteID ! = 0 && nReadID ==0) { nReadID = nWriteID; }if(nReadID ! = 0) { m_mapEventHandlerId[pEventHandler] = nReadID; struct epoll_event ev; ev.data.ptr = pEventHandler;if(epoll_ctl(m_fdEpoll, EPOLL_CTL_ADD, nReadID, &ev) ! = 0) { perror("epoll_ctl EPOLL_CTL_ADD");
}
}
}
void CSession::GetIds(int *pReadId, int *pWriteId)
{
m_pChannelProtocol->GetIds(pReadId,pWriteId);
if(m_bSuspendRead) { *pReadId = 0; }}Copy the code
That is, the socket is no longer checked for readable data. Then reset the flag after 1 second in the timer, so that the socket can be detected again if there is data:
const int SESSION_CHECK_TIMER_ID = 9;
const int SESSION_CHECK_INTERVAL = 1000;
SetTimer(SESSION_CHECK_TIMER_ID, SESSION_CHECK_INTERVAL);
void CFrontEngine::OnTimer(int nIDEvent)
{
if (nIDEvent == SESSION_CHECK_TIMER_ID)
{
CSessionMap::iterator itor = m_mapSession.Begin();
while(! itor.IsEnd()) { TFrontSessionInfo *pFind = m_mapSessionInfo.Find((*itor)->GetSessionID());if(pFind ! = NULL) { CheckSession(*itor, pFind); } itor++; } } } void CFrontEngine::CheckSession(CSession *pSession, TFrontSessionInfo *pSessionInfo) {pSessionInfo->nCommFlux -= pSessionInfo->nMaxCommFlux;if(pSessionInfo->nCommFlux < 0) { pSessionInfo->nCommFlux = 0; } // If the amount of traffic exceeds the limit, suspend the read operation of the session pSession->SuspendRead(pSessionInfo->nCommFlux >= pSessionInfo->nMaxCommFlux); }Copy the code
This is similar to a restaurant in which guests at a certain table are given some dishes to eat first. After some dishes are served, they will not continue to serve dishes at this table, but to other empty tables. After everyone has eaten, they will continue to come back to the original table to serve dishes. In fact, that’s how we do it all the time. The example above is a good idea for implementing single-service traffic control, which ensures that each client is served equally, rather than some clients waiting too long to respond. Of course, such techniques cannot be applied to sequential businesses, such as sales systems, which are usually first order, first served.
In addition, in order to speed up IO operations, the current server uses a large number of caching technology, caching is actually a space for time strategy. For information that is used repeatedly but does not change often, we can use caching if it is time consuming to load the information from its original location (for example, from disk, from a database). So now like Redis, LevelDB, FastDB and other in-memory databases are popular. If you’re going into server development, you need to know at least a few of them.
This is my first post on Gitchat. Due to limited space, many details are not possible to be described. Meanwhile, I will not talk about the design techniques of distributed server here, and I will share more techniques later if conditions permit. I also want to thank GitChat for providing such a platform to communicate with you.
In view of the author’s limited ability and experience, there are inevitably mistakes and omissions in this article.
【GitChat Talent Course 】
- Front-end Villain in the Desert: Getting Started with Angular For Beginners
- · Zoom.Quiet: GitQ: GitHub In Taste
- How to Learn the React Stack from Scratch
- GordonChoi, early adopter of GA: GA e-commerce data analysis practice course