This article is published on www.luozhiyun.com, a blog of the luozhiyun
This article uses the go source 15.7
From Go source directory structure and corresponding code files to understand Go in different platforms of the network I/O mode implementation. For example, epoll on Linux, KQueue on freeBSD, and IOCP on Windows.
Because our code is deployed on Linux, this article uses the epoll encapsulation implementation as an example to explain the source code implementation of I/O multiplexing in Go.
introduce
I/O multiplexing
I/O multiplexing refers to the select/epoll series of multiplexers that allow a single thread to listen for multiple file descriptors at the same time (I/O events), block waiting, and be notified when one of the file descriptors is read or written. In case many of you are not familiar with select or epoll, let’s talk about these two selectors.
So let’s start by saying what a File descriptor is, or FD by its initials, which is an abstract concept for describing references to files. It is an index value that points to a record table of open files maintained by the kernel for each process. When a program opens an existing file or creates a new file, the kernel returns a file descriptor to the process.
select
int select(int nfds,
fd_set *restrict readfds,
fd_set *restrict writefds,
fd_set *restrict errorfds,
struct timeval *restrict timeout);
Copy the code
Writefds, readFds, and Exceptfds are three sets of file descriptors. Select will traverse the first NFDS descriptors of each set and find the descriptors that can be read, written and error respectively, collectively known as ready descriptors.
The timeout parameter indicates the duration of the block when calling select. If none of the file descriptors are ready, the calling process is blocked until a descriptor is ready or a timeout is exceeded, and return. If the timeout argument is set to NULL, it blocks indefinitely until a descriptor is ready; If the timeout parameter is set to 0, it returns immediately without blocking.
When the select function returns, the ready descriptor can be found by iterating through the FDset.
The disadvantages of SELECT are also listed:
- The biggest drawback of select is that there is a limit to how many FD’s can be opened by a single process, which is set by FD_SETSIZE. The default value is 1024.
- Every time you call select, you need to copy the fd collection from user mode to kernel mode, which can be expensive if there are many FD’s.
- The kernel needs to scan the entire FD_set linearly every time, so as the number of fd descriptors monitored increases, its I/O performance will decrease linearly.
epoll
Epoll is an enhanced version of SELEC that avoids the drawbacks of high performance overhead and low number of file descriptors.
To understand what follows, let’s look at the use of epoll:
int listenfd = socket(AF_INET, SOCK_STREAM, 0); bind(listenfd, ...) listen(listenfd, ...) int epfd = epoll_create(...) ; epoll_ctl(epfd, ...) ; While (1){int n = epoll_wait(...) For (socket that receives data){// process}}Copy the code
Epoll_create creates an epoll object instance, epfd, and returns a file descriptor that refers to the instance. The file descriptor only points to the corresponding epoll instance and does not represent the actual disk file node.
Epoll instance internal storage:
- Listener list: all file descriptors to listen on, using a red-black tree;
- Ready list: All ready file descriptors, using linked lists;
Epoll_ctl adds the FD to the EPFD, sets a callback function for the FD, listens for the event, and adds it to the listening list. When an event occurs, the callback function is called and the FD is added to the ready queue of the epoll instance.
Finally, epoll_wait is called to block listening for all FDS I/O events on the epoll instance. When data is already in the ready list, epoll_wait returns directly, eliminating the need to poll the select every time.
Advantages of epoll:
Epoll’s listening list is stored in a red-black tree. All FDS added by epoll_ctl are stored in a node of the red-black tree. The red-black tree itself has stable insertion and deletion performance and time complexity O(logN), and can store a large number of FDS.
Epoll_ctl specifies a callback function for each file descriptor and adds it to the ready list when it is ready. Therefore, you do not need to iterate over each file descriptor as select does. You only need to determine whether the ready list is empty.
parsing
Netpoll is essentially an encapsulation of I/O multiplexing, so like EPoll, it is also bound by the following steps:
- Netpoll creation and initialization;
- Add tasks to be monitored to netpoll.
- Get triggered events from netpoll;
The three functions provided by epoll are encapsulated in GO:
func netpollinit()
func netpollopen(fd uintptr, pd *pollDesc) int32
func netpoll(delay int64) gList
Copy the code
The netpollinit function initializes netpoll;
Netpollopen listens for events on file descriptors;
Netpoll blocks waiting to return a set of goroutines that are already ready;
Here is a TCP server written in the Go language:
func main(a) {
listen, err := net.Listen("tcp".": 8888")
iferr ! =nil {
fmt.Println("listen error: ", err)
return
}
for {
conn, err := listen.Accept()
iferr ! =nil {
fmt.Println("accept error: ", err)
break
}
// Create a Goroutine to handle the read and write tasks
go HandleConn(conn)
}
}
Copy the code
Let’s follow the TCP server source code to see where netpoll is used to complete the epoll call.
net.Listen
The TCP server calls Net.listen to create a socket and returns the corresponding FD, which is used to initialize the Listener’s netFD(the network file descriptor encapsulated in the GO layer). Then call the netFD listenStream method to complete the initialization of socket bind&LISTEN and netFD.
The call process is as follows:
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string.string, syscall.RawConn) error) (fd *netFD, err error) {
// Create a socket
s, err := sysSocket(family, sotype, proto)
iferr ! =nil {
return nil, err
}
...
/ / create the fd
iffd, err = newFD(s, family, sotype, net); err ! =nil {
poll.CloseFunc(s)
return nil, err
}
ifladdr ! =nil && raddr == nil {
switch sotype {
case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
// Call the listenStream method to initialize the socket's bind&Listen and netFD
iferr := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err ! =nil {
fd.Close()
return nil, err
}
return fd, nil
casesyscall.SOCK_DGRAM: ... }}...return fd, nil
}
func newFD(sysfd syscall.Handle, family, sotype int, net string) (*netFD, error){ ret := &netFD{ pfd: poll.FD{ Sysfd: sysfd, IsStream: sotype == syscall.SOCK_STREAM, ZeroReadIsEOF: sotype ! = syscall.SOCK_DGRAM && sotype ! = syscall.SOCK_RAW, }, family: family, sotype: sotype, net: net, }return ret, nil
}
Copy the code
The sysSocket method issues a system call to create a socket, newFD creates a netFD, and then calls the netFD listenStream method to bind&Listen and init the netFD.
NetFD is a file descriptor encapsulation, netFD contains a FD data structure, FD contains two important data structures Sysfd and pollDesc, Sysfd is the socket system file descriptor returned by sysSocket. PollDesc is used to monitor the readability or writability of file descriptors.
ListenStream:
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string.string, syscall.RawConn) error) error{...// The binding is complete
iferr = syscall.Bind(fd.pfd.Sysfd, lsa); err ! =nil {
return os.NewSyscallError("bind", err)
}
// Perform a listening operation
iferr = listenFunc(fd.pfd.Sysfd, backlog); err ! =nil {
return os.NewSyscallError("listen", err)
}
// Initialize fd
iferr = fd.init(); err ! =nil {
return err
}
lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
fd.setAddr(fd.addrFunc()(lsa), nil)
return nil
}
Copy the code
The listenStream method calls the Bind method to Bind the FD, then calls the listenFunc method to listen on the FD, and calls the INIT method to initialize the FD and pollDesc.
func (pd *pollDesc) init(fd *FD) error {
// Call runtime_runtime_pollServerinit
serverInit.Do(runtime_pollServerInit)
// Call runtime.poll_runtime_pollOpen
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
...
return nil
}
Copy the code
Runtime_pollServerInit is guaranteed to be called only Once with the Once wrapper, which creates an instance of the epoll file descriptor on the Linux platform;
Poll_runtime_pollOpen calling netpollopen registers the fd to the epoll instance and returns a pollDesc;
Netpollinit initialization
func poll_runtime_pollServerInit(a) {
netpollGenericInit()
}
func netpollGenericInit(a) {
if atomic.Load(&netpollInited) == 0 {
lock(&netpollInitLock)
if netpollInited == 0 {
netpollinit()
atomic.Store(&netpollInited, 1)
}
unlock(&netpollInitLock)
}
}
Copy the code
NetpollGenericInit calls the platform-specific implementation of Netpollinit, and in Linux the netpollinit method of netpoll_epoll.go is called:
var (
epfd int32 = - 1 // epoll descriptor
)
func netpollinit(a) {
// Create a new epoll file descriptor
epfd = epollcreate1(_EPOLL_CLOEXEC)
...
// Create a channel for communication
r, w, errno := nonblockingPipe()
...
ev := epollevent{
events: _EPOLLIN,
}
*(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
// Add the file descriptor for reading data to the listener
errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
...
netpollBreakRd = uintptr(r)
netpollBreakWr = uintptr(w)
}
Copy the code
Calling the ePollCreate1 method creates an instance of the epoll file descriptor. Note that EPfd is a global attribute. Then create a pipe for communication and call epollCTL to add the file descriptor for reading the data to the listener.
Netpollopen added event listening
Now look at the poll_runtime_pollOpen method:
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
pd := pollcache.alloc()
lock(&pd.lock)
ifpd.wg ! =0&& pd.wg ! = pdReady { throw("runtime: blocked write on free polldesc")}ifpd.rg ! =0&& pd.rg ! = pdReady { throw("runtime: blocked read on free polldesc")
}
pd.fd = fd
pd.closing = false
pd.everr = false
pd.rseq++
pd.rg = 0
pd.rd = 0
pd.wseq++
pd.wg = 0
pd.wd = 0
pd.self = pd
unlock(&pd.lock)
var errno int32
errno = netpollopen(fd, pd)
return pd, int(errno)
}
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}
Copy the code
The poll_runtime_pollOpen method initializes a pollDesc structure with a total size of about 4KB through pollcache.alloc. Then reset pd’s properties and call netPollopen to add a new polling event to the epoll instance EPfd to listen for the readable and writable status of the file descriptor.
Let’s look at how pollCache initializes pollDesc.
type pollCache struct {
lock mutex
first *pollDesc
}
const pollBlockSize = 4 * 1024
func (c *pollCache) alloc(a) *pollDesc {
lock(&c.lock)
// Initialize the first node
if c.first == nil {
const pdSize = unsafe.Sizeof(pollDesc{})
n := pollBlockSize / pdSize
if n == 0 {
n = 1
}
mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)
// Initialize the pollDesc list
for i := uintptr(0); i < n; i++ {
pd := (*pollDesc)(add(mem, i*pdSize))
pd.link = c.first
c.first = pd
}
}
pd := c.first
c.first = pd.link
lockInit(&pd.lock, lockRankPollDesc)
unlock(&c.lock)
return pd
}
Copy the code
If the linkedheader of pollCache is empty, initialize the first node, which is a pollDesc linkedheader. Each call to the structure returns a pollDesc whose linkedheader has not been used.
Now that we’re done analyzing Net. Listen, let’s look at list.accept.
Listener.Accept
The listener. Accept method will eventually be called to netFD’s Accept method:
func (fd *netFD) accept(a) (netfd *netFD, err error) {
// Call Accept on netfd.FD to Accept the new socket connection and return the socket FD
d, rsa, errcall, err := fd.pfd.Accept()
...
// Construct a new netfd
ifnetfd, err = newFD(d, fd.family, fd.sotype, fd.net); err ! =nil {
poll.CloseFunc(d)
return nil, err
}
// Call netFD init to complete the initialization
iferr = netfd.init(); err ! =nil {
netfd.Close()
return nil, err
}
lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
return netfd, nil
}
Copy the code
This method first calls Accept to the FD to Accept the new socket connection and returns the FD corresponding to the new socket, then calls newFD to construct a new NETFD and init to complete the initialization.
Now that we’ve seen the init method above, let’s look at the Accept method:
func (fd *FD) Accept(a) (int, syscall.Sockaddr, string, error){...for {
// Use the Linux system call accept to receive the new connection and create the corresponding socket
s, rsa, errcall, err := accept(fd.Sysfd)
if err == nil {
return s, rsa, "", err
}
switch err {
case syscall.EINTR:
continue
case syscall.EAGAIN:
if fd.pd.pollable() {
// If no expected I/O event is currently occurring, waitRead will place the logical block there via park Goroutine
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue}}case syscall.ECONNABORTED:
continue
}
return - 1.nil, errcall, err
}
}
Copy the code
The fd. Accept method receives the new connection using the Linux system call Accept, creating the corresponding socket and blocking waitRead if there are no readable messages. These Goroutines are awakened by a call to the Runtime.net poll in goroutine scheduling.
PollWait Event wait
Polldesc.waitread actually calls Runtime. poll_runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int{...// Enter NetPollBlock and check whether expected I/O events occur
for! netpollblock(pd,int32(mode), false) {... }return 0
}
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
// The for loop is waiting for IO ready or IO wait
for {
old := *gpp
// GPP == pdReady indicates that the expected I/O events have occurred,
// You can simply return unblock to the current goroutine and perform the I/O operation in response
if old == pdReady {
*gpp = 0
return true
}
ifold ! =0 {
throw("runtime: double wait")}// If no expected I/O event occurs, set GPP to pdWait atomically and exit the for loop
if atomic.Casuintptr(gpp, 0, pdWait) {
break}}if waitio || netpollcheckerr(pd, mode) == 0 {
// Relinquish the current thread, put the Goroutine to sleep and wait for the runtime to wake up
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)}// be careful to not lose concurrent pdReady notification
old := atomic.Xchguintptr(gpp, 0)
if old > pdWait {
throw("runtime: corrupted polldesc")}return old == pdReady
}
Copy the code
Poll_runtime_pollWait uses the for loop to call NetPollBlock to determine whether an expected I/O event has occurred, and does not exit the loop until NetPollBlock returns true indicating I/O ready.
The netPollBlock method determines if the current state is pdReady and returns true if so. If not, set GPP to pdWait via CAS and exit the for loop. Pass the current goroutine to Park via gopark until the corresponding FD is readable/writable or some other I/O event occurs.
These Goroutines are awakened by a call to the Runtime.net poll in goroutine scheduling.
Netpoll polling wait
The core logic of the Runtime.net poll is: The timeout value of epoll_WAIT is set to delay, and epoll_WAIT is called to obtain the IO-ready FD list from the eventPoll. rdlList two-way list of epoll. The FD list returned by epoll_WAIT is traversed. A runnable Goroutine is assembled and returned from the context information encapsulated when the FD was registered by calling epoll_ctl.
After netpoll is executed, a list of goroutines corresponding to the ready FD list is returned, and the ready Goroutines are added to the dispatch queue to wait for the dispatch to run.
func netpoll(delay int64) gList {
if epfd == - 1 {
return gList{}
}
var waitms int32
// Because the unit of the incoming delay is nanosecond, convert the nanosecond to milliseconds
if delay < 0 {
waitms = - 1
} else if delay == 0 {
waitms = 0
} else if delay < 1e6 {
waitms = 1
} else if delay < 1e15 {
waitms = int32(delay / 1e6)}else {
// An arbitrary cap on how long to wait for a timer.
// 1e9 ms == ~11.5 days
waitms = 1e9
}
var events [128]epollevent
retry:
// Wait for the file descriptor to become readable or writable
n := epollwait(epfd, &events[0].int32(len(events)), waitms)
// Return a negative value, then call epollWait again to wait
if n < 0{...goto retry
}
var toRun gList
// Indicates that the file descriptor being monitored has an event waiting to be processed
for i := int32(0); i < n; i++ {
ev := &events[i]
if ev.events == 0 {
continue}...// Determine the event type, read type or write type
var mode int32
ifev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) ! =0 {
mode += 'r'
}
ifev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) ! =0 {
mode += 'w'
}
ifmode ! =0 {
// Retrieve pollDesc stored in epollevent
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.everr = false
if ev.events == _EPOLLERR {
pd.everr = true
}
// Call netPollReady and pass in the fd pollDesc
netpollready(&toRun, pd, mode)
}
}
return toRun
}
Copy the code
Netpoll calls epollWait to get the ready fd list. The corresponding epoll function is epoll_wait. The toRun is a linked list of G’s that store the goroutines to be restored and are returned to the caller. If n returned by ePollWait is greater than zero, the file descriptor being monitored has a pending event that needs to be processed by calling the for loop. The loop sets mode based on the time type, and then pulls out the corresponding pollDesc and calls netPollReady.
Let’s look at netPollReady again:
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
var rg, wg *g
// Get the pointer to g
if mode == 'r' || mode == 'r'+'w' {
rg = netpollunblock(pd, 'r'.true)}if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w'.true)}// Add the corresponding g to toRun list
ifrg ! =nil {
toRun.push(rg)
}
ifwg ! =nil {
toRun.push(wg)
}
}
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
gpp := &pd.rg
// Determine the event type based on the mode passed in
if mode == 'w' {
gpp = &pd.wg
}
for {
// Fetch g stored in GPP
old := *gpp
if old == pdReady {
return nil
}
if old == 0 && !ioready {
return nil
}
var new uintptr
if ioready {
new = pdReady
}
// CAS converts read or write semaphores to pdReady
if atomic.Casuintptr(gpp, old, new) {
if old == pdWait {
old = 0
}
// Return the corresponding g pointer
return (*g)(unsafe.Pointer(old))
}
}
}
Copy the code
Runtime.net poll: runtime.netpoll: runtime.netpoll: runtime.netpoll
- Execute in the scheduler
runtime.schedule()
Is executed in this methodruntime.findrunable()
In theruntime.findrunable()
Call theruntime.netpoll
Get the goroutine to execute; - Go Runtime creates an independent Sysmon monitoring thread when the program is started. Sysmon runs every 20us to 10ms. Each run checks whether netpoll has been executed more than 10ms since the last execution
runtime.netpoll
;
Those who are interested in the call of these entries can go and see for themselves.
conclusion
This article starts with SELECT and ePoll with I/O multiplexing, then goes back to the GO language to see how it implements such structures. By tracing the source code, we can find that go also encapsulates its own functions according to epoll:
func netpollinit(a)
func netpollopen(fd uintptr, pd *pollDesc) int32
func netpoll(block bool) gList
Copy the code
These three functions are used to create instances, register, and wait for events on epoll.
For I/O multiplexing is not very familiar with the students can also take this opportunity to learn more about network programming knowledge, expand the knowledge.
Reference
www.infoq.cn/article/boe…
Draveness. Me/golang/docs…
zhuanlan.zhihu.com/p/64138532
Imageslr. Making. IO / 2020/02/27 /…
Singlecool.com/2020/12/13/…