Article source: www.cnblogs.com/concurrency…
Github.com/eahydra/swn… \
After getting into GO, the network support for GO was very welcome. GO implements syntactically synchronized semantics without sacrificing too much performance, using IO path reuse at the bottom, such as EPOLL for LINUX and IOCP for WINDOWS.
But in the development of the server program, a lot of it is passive triggered, is the client sent to the request to deal with. It is inherently an event-based program. With GO, because concurrency is part of the language, features like Goroutine and Channel make it easy for programmers to switch between synchronous and asynchronous when implementing features.
Because of my own needs, I made a simple encapsulation for the server of event-based scenario. See the code here.
Design principles
Because of GO’s IO mechanism and native support for concurrency primitives, coupled with the encapsulation of network apis, programmers can easily implement an efficient server or client program. A common implementation is to call Net.listen (” tcp4 “, address) to get a net.listener, and then loop indefinitely to call net.listener. Accept to get a net.conn, You can call net.conn’s interface to set the size of the send and receive buffer, you can set KEEPALIVE, etc. Because of TCP’s duplex feature, net.conn can start a Goroutine for an infinite loop to receive data from the peer end, and then unpack it.
My idea was to build a thin layer of encapsulation on top of this simple implementation to make it as lean as possible, but flexible as possible. The hope is to be able to adapt to different protocols, causing as little constraint to users as possible.
The Session object
AcceptLoop (swnet.server. AcceptLoop) or swnet.NewSession (swnet.newSession) to create a new object, which is usually used in the client context. Once you have the Session object, you can call the Start method to Start working. A method called Start is also exposed because on the server side, there may be some requirements, such as an ACL for IP, so the Start behavior is left up to the consumer to decide how to invoke. However, if the user does not want to Start, it is the user’s responsibility to Close it, otherwise it will cause resource leakage.
After Start, two Goroutines are started, one for receiving data from the peer end and one for sending data to the peer end. To send data to the peer, you can use the AsyncSend method, which queues the data to be sent down the sending channel. The reason for using channels here is that in the server scenario, it is necessary to queue the sent data to prevent the peer end from receiving the data quickly but slowly, or too much AsyncSend method is called, resulting in too much data accumulation and increasing the pressure of memory. I think it is reasonable to control the sending rate through channel. At the same time, also provides a method can be used to modify the length of the channel, one is called NewSession to specified size, the second is called the Session. The SetSendChannelSize set size, but note that this method is called when must be completed before the Start, otherwise it will cause error. This is also done because there is no need to dynamically change the send channel size.
If the sending channel is full, the AsyncSend method returns ErrSendChanBlocking. This error type was added as a result of the above design. Without returning this error, there is no way to give the consumer a chance to handle the problem. If the user receives the error, he can try to analyze the cause of the problem himself, or he can try to send it in a loop, or he can discard the sent data. In short, users can get their own processing opportunities.
If the Session object is already closed, calling AsyncSend returns an ErrStoped error. In addition, because AsyncSend queues data into a sending channel, it is the consumer’s responsibility to ensure that the sent data is not modified until the sending is complete.
If the data fails to be sent, or for some other reason, my implementation is to simply Close the Session violently.
In addition, there may be some use cases in which large data packets, such as 64K or 32K, will be sent, so as to avoid repeated memory requirings, so the SetSendCallback method is added for Session. You can set up a callback function that can be invoked after sending, giving the consumer the opportunity to reclaim the data object, for example with sync.pool. Although it didn’t have much effect when I tested it myself.
To make it easier for users to set some net.conn parameters, a RawConn method is added to obtain an instance of Net.conn. It’s kind of tangled up here. Because exposing this internal resource gives the user a great deal of flexibility. It can bypass the Session sending channel and play its own. But for user convenience, I’ll do it anyway. The user assumes the corresponding responsibility. You can also add a Hijack method like net.http, which lets users take over Net.conn and play with their own.
Many SET/GET methods in a Session are unlocked. This is partly because many operations are done at once before Start, or because the data in GET is not so compact.
Sometimes, if a Session is closed, you might want to know about this behavior. So the SetCloseCallback method is provided, which you can set. It doesn’t matter if it’s not set. CloseCallback is guaranteed to be called only once.
Protocol serialization abstraction
Because one of the goals is to be able to isolate specific protocol formats. So the protocol is abstracted. Just implement the PacketProtocol interface:
// PacketReader is used to unmarshal a complete packet from buff
type PacketReader interface {
// Read data from conn and build a complete packet.
// How to read from conn is up to you. You can set read timeout or other option.
// If buff's capacity is small, you can make a new buff, then return it,
// so can reuse to reduce memory overhead.
ReadPacket(conn net.Conn, buff []byte) (interface{}, []byte, error)
}
// PacketWriter is used to marshal packet into buff
type PacketWriter interface {
// Build a complete packet. If buff's capacity is too small, you can make a new one
// and return it to reuse.
BuildPacket(packet interface{}, buff []byte) ([]byte, error)
// How to write data to conn is up to you. So you can set write timeout or other option.
WritePacket(conn net.Conn, buff []byte) error
}
// PacketProtocol just a composite interface
type PacketProtocol interface {
PacketReader
PacketWriter
}
Copy the code
Implement PacketReader/PacketWriter two interfaces. In order to make the memory reuse as much as possible and reduce the memory stress, a slice needs to be returned in the return value of ReadPacket method and BuildPacket method. The framework passes a default size slice to the two methods on the first call, and if the capacity is insufficient, the user can recreate the slice himself and then return the slice after writing data. Use the returned slice the next time you use it.
The ReadPacket method is called in a Goroutine dedicated to receiving data. Implementers can read according to their own policies, and since net.conn is passed in, users can set THEIR own I/O Timeout. The implementer is responsible for returning a complete request package. If there is an error, it is necessary to return an error. If an error is detected, the Session is closed. The reason for this is that when reading or building a request packet fails, it could be a data error, it could be a link error, or whatever, but I personally don’t think it’s necessary to continue processing in this case and just close the link. If the data in the returned request package contains the data of slice type, it is recommended to re-allocate a slice and copy it from the buff. Try not to reuse the buff slice, otherwise additional bugs may occur.
The BuildPacket method is called in a Goroutine that handles sending. When the sending Goroutine receives the packet, BuildPacket is called and the implementer can serialize it in its own private format. Again, if the buff is not enough, you rebuild a buff yourself, then populate the data and return the buff.
WritePacket is the requirement to give implementers their own personalized delivery. Implementers may need to set I/O Timeout.
Request packet routing
In an Event-based implementation, the only thing that is always necessary is to forward a request packet to the corresponding handler. But exactly how to do it depends on the use case scenario and implementation. So what I did here was very simple, defining a PacketHandler interface:
// PacketHandler is used to process packet that recved from remote session
type PacketHandler interface {
// When got a valid packet from PacketReader, you can dispatch it.
Handle(s *Session, packet interface{})
}
Copy the code
Users can implement the corresponding Handle method by themselves. When the goroutine receiving the data receives the data from the peer end and calls Packetreader. ReadPacket, it calls the Handle method and passes in the Session instance and request packet. The purpose of passing in a Session is to save the user from having to maintain an instance of the Session. Because the logic that a programmer needs to implement is relatively simple, he only needs to use the Session to satisfy his needs, he only needs to implement the corresponding handler function. When the processing is complete, call session. AsyncSend to send the response packet.
It is possible to provide a simple implementation of the default version. However, considering the different protocols, the scheduling key will be different, so let the user play.
The user has a lot of freedom here. He can do a callback distribution logic based on the map relationship, or he can do a simple implementation logic and then implement it using Type Assert. Specific also depends on respective taste and decide. I prefer the latter, which can reduce a lot of Register and achieve the taste of Actor Model + Pattern Match.
The Server object
There is also a simple encapsulation of the server side. Server implementation is very simple, is repeated to Accept, and then construct a Session, and then call the callback function passed in by the user, and it is done. The user can pass in net.listener, PacketProtocol, PacketHandler, and SendChanSize. These parameters are passed in when the Session is constructed to reduce duplicate code implementations. Server-acceptloop does not close constructed sessions. The user is responsible for completing this!
disadvantages
The whole is very simple, just built a mold. In my own unpublished code, in fact, is the implementation of my company’s protocol, the implementation of PacketProtocol. I even wrote a code generator for that.
Also, NewServer needs to pass in a net.Listener. We’ll decide later whether to kill them or not. NewSession needs to be passed to net.conn, which is a compromise because net.listener returns net.conn. This instance needs to be passed to the Session, which is not necessary, but the client needs to go to net.dial itself. Get a net.conn, maybe provide a swnet.Dial method.
conclusion
The code I published is modified on the basis of the original code, and got some inspiration from dada’s github.com/funny/link, but there are a lot of differences. Thanks again dada for your contribution.
\