During this period, I reviewed the REACTOR model, but the theory should be applied to practice. So I will share my ideas and key codes for writing the REACTOR network framework. Project making address https://github.com/HobbyBear/reactor_network

Reacotor briefly

In GO, the traditional IO model is that a link is processed by one coroutine, but after epoll and kqueue appear, one coroutine can poll to obtain client messages for processing through epoll.wait and Kevent methods.

The REACTOR model is two FDS, one main reactor and one sub reactor. The main reactor listens for the client link, and then registers the client link with the sub reactor. After that, the sub reactor performs read and write processing.

Start the sample

type example struct{}func (e example) MsgCallBack(c *Connection, data []byte) []byte {
	fmt.Println(string(data))
	return []byte("Got the message.")}func main(a) {
	ch := make(chan int)
	s := New("tcp".": 8080",example{})
	s.Start()

	<-ch
}
Copy the code

Design ideas

server

Our server application, abstracted as a server, contains the main reactor and a sub-reactor set that contains an instance of epoll or kqueue for read and write events. When I start up in the server, I start two coroutines to process the operations of each REACTOR.

type Server struct {
	ListenerPoll *poll.Poll // Encapsulate kqueue to listen for links
	// todo makes arrays
	WPoll          *poll.Poll  // Subreactor encapsulates for read and write requests
	ConnectionPoll map[int]*Connection
	Handler        Handler  // The handler that processes the message
}
func New(network string, addr string, h Handler) *Server {
	var (
		listener net.Listener
		err      error
		s        *Server
	)
	s = &Server{
		ListenerPoll:   poll.Create(),
		WPoll:          poll.Create(),
		ConnectionPoll: make(map[int]*Connection),
		Handler:        h,
	}
        // Listen on events
	listener, err = net.Listen(network, addr)
	l, _ := listener.(*net.TCPListener)
        
	file, err := l.File()
	iferr ! =nil {
		log.Fatal(err)
	}
	fd := int(file.Fd())
	if err = unix.SetNonblock(fd, true); err ! =nil {
		log.Fatal(err)
	}
        // listend fd listens for read events
	s.ListenerPoll.AddReadEvent(fd)

	return s
}
func (s *Server) Start(a) {
        // Listen
	go s.ListenerPoll.RunLoop(func(fd int, events poll.Event) {
		ifevents&poll.EventRead ! =0 {
			nfd, _, err := unix.Accept(fd)
			iferr ! =nil {
				iferr ! = unix.EAGAIN { log.Fatal("accept:", err)
				}
				return
			}
			if err := unix.SetNonblock(nfd, true); err ! =nil {
				_ = unix.Close(nfd)
				log.Fatal("set nonblock:", err)
				return
			}
                        
                       // Abstracts a link
			conn := NewConn(nfd, s.Handler.MsgCallBack)
			s.ConnectionPoll[nfd] = conn
			log.Println(fmt.Sprintf("%d Three handshakes complete", nfd))
                        // Register read events
			s.WPoll.AddReadEvent(nfd)
		}
	})

	// connection Reads and writes data
	go s.WPoll.RunLoop(func(fd int, events poll.Event) {
		log.Println("sub reactor receive data", fd, events&poll.EventRead, events&poll.EventWrite)

		ifevents&poll.EventRead ! =0 {
			if connection, ok := s.ConnectionPoll[fd]; ok {
				connection.HandleRead()
				if connection.WriteBuf.Len() > 0 {
					s.WPoll.EnableWriteEvent(fd)
				}
			}
		}

		ifevents&poll.EventErr ! =0 {
			// todo conn closed
			err := unix.Close(fd)
			iferr ! =nil {
				log.Fatal(err)
			}
			log.Println(fmt.Sprintf("%d closed the link", fd))
		}

		ifevents&poll.EventWrite ! =0 {
			if conn, ok := s.ConnectionPoll[fd]; ok {
				if conn.WriteBuf.Len() > 0 {
					conn.HandleWrite()
				}
                                // After writing buF data, delete the write event listener and listen for the read event
				s.WPoll.EnableRead(fd)
			}
		}

	})
}

Copy the code

connection

The first 4 bytes of the TCP stream represent the length of the data stream, and then read a 4 byte length of the data stream, and then read the corresponding length of the data stream. There are two kinds of analysis.

  • The file descriptor reads len less than the remaining data length

Check the data length first. If it is smaller than the remaining data length, stop reading because it cannot form a complete message body.

  • Len read is greater than or equal to the remaining data length

Can form a complete message body, loop read, until the final data does not meet the requirements of the complete message body.

type Connection struct {
	fd       int
	ReadBuf  *bytes.Buffer
	WriteBuf *bytes.Buffer
	proto    *DefaultProtocol  // Protocol for sticky and unpack
	callFunc MsgCallBack
}


func NewConn(fd int,c MsgCallBack) *Connection {
	return &Connection{
		fd:       fd,
		ReadBuf:  bytes.NewBuffer([]byte{}),
		WriteBuf: bytes.NewBuffer([]byte{}),
		proto:    &DefaultProtocol{},
		callFunc: c,
	}
}
Copy the code

Connection handles read operations,

func (c *Connection) HandleRead(a) {
        // Use a temporary buffer to read data from the file descriptor. Too small a buffer may cause incomplete messages to be read
	tmpBuf := make([]byte.1024)
	n, err := unix.Read(c.fd, tmpBuf)
	iferr ! =nil {
		iferr ! = unix.EAGAIN{ log.Fatal(c.fd," conn read: ",err)
		}
		return
	}
	log.Println("Fd received a readable request".string(tmpBuf[:n]), n)
	if n == 0 {
		return
	}
        // Write temporary buffer data to conn readbufC.R eadBuf. Write (tmpBuf [: n])// Unpack
	c.proto.Unpack(c, c.callFunc)
}

// called when the connection triggers a write event
func (c *Connection) HandleWrite(a) {
	if c.WriteBuf.Len() == 0 {
		return
	}
	log.Println("Write data:", c.WriteBuf.String())
	_, err := unix.Write(c.fd, c.WriteBuf.Bytes())
	iferr ! =nil {
		log.Fatal(c.fd, " write ", err)
	}
	c.WriteBuf.Reset()
}
Copy the code

See how unpacking is done

func (d *DefaultProtocol) Unpack(c *Connection, msgCall MsgCallBack) {
        // Ensure that the first 4 bytes representing the length of the loop processing message
	for c.ReadBuf.Len() > 4 {
               // Get the length of the packet
		var length = binary.BigEndian.Uint32(c.ReadBuf.Bytes()[:4])
               // The rest of the packet is not long enough to be read from Conn's readbuf when the next read operation is triggered
		if c.ReadBuf.Len() < int(length)+4 {
			log.Println("read bug len little ,wait next time")
			return
		}

		err := binary.Read(c.ReadBuf, binary.BigEndian, &length)

		iferr ! =nil {
			log.Fatal(err)
		}
		log.Println("The length of the data obtained is, length)
		cmdData := make([]byte, length)
		n, err := c.ReadBuf.Read(cmdData)
		iferr ! =nil {
			log.Fatal(err)
		}
		log.Println("The length of data read is.", n)
		out := msgCall(c, cmdData)
                // After the message is processed by the callback function, the returned data is written to writebuf. The server determines the length of Conn's writebuf. If the length is greater than 0, the server listens for connection read/write events
		c.WriteBuf.Write(d.Pack(out))
	}
}
Copy the code

A few points that need to be improved

  • The connection Buffer currently has a memory leak problem with bytes.buffer because each read from a FD is written to the Buffer, and the bytes.buffer gets bigger and bigger if you keep reading
  • The project is currently designed for The MAC environment of Kqueue, without the Use of Linux epoll
  • Applying temporary buffers too frequently for each read operation can cause gc to be too high