NSQ is a very famous message queue in the go language. It is very easy to get started, the documentation is clear, and the source code is not complicated. It should also be mentioned that it is really convenient to write message queues using go’s channels. When we receive messages from producers, we must store them in memory for efficiency. If we use shared memory, such as a container in Java, we need to lock both storing and retrieving messages. But in GO, we don’t have to lock it, we just throw one coroutine into the channel, and we take another coroutine out of the channel, very convenient.
To understand message queues more deeply, I decided to implement a mini-version of NSQ in a series of articles. This article will implement the most basic functions of NSQ: producers publish messages and consumers subscribe to receive messages. Each successive edition builds on the previous one.
Before looking at the code, let’s review the NSQ messaging process.
NSQ message passing process
A client connected to an NSQ has two roles: producer or consumer, or possibly both. As shown in the figure above, a producer sends a message to a given topic, which copies the message to all channels below it, which then sends the message to the subscribing consumer. If multiple consumers subscribe to the same channel, Then each message in the channel will be sent to a random consumer.
Mini-nsq workflow
- Start a TCP service to listen for connection requests from clients
- Create a working coroutine for each successfully connected client to handle messages from the client (including consumer subscriptions, producer publishing messages, and so on)
- Messages sent by the producer are cached in the corresponding topic, and each topic has a working coroutine that sends the cached messages to the channel that subscribes to the topic
- Create a working coroutine for each successfully connected client that sends messages from the bound channel to that client
The code analysis
As shown in the figure below, the code is mainly divided into two parts: the test program under apps and our project file under NSQD
Base components (Message, Topic, Channel)
message.go
type MessageID [MsgIDLength]byte
const (
MsgIDLength = 16
)
type Message struct {
ID MessageID
Body []byte
}
func NewMessage(id MessageID, body []byte) *Message {
return &Message{
ID: id,
Body: body,
}
}
func (m *Message) Bytes(a) ([]byte,error) {
buf := new(bytes.Buffer)
_, err := buf.Write(m.ID[:])
iferr ! =nil {
return nil, err
}
_, err = buf.Write(m.Body)
iferr ! =nil {
return nil, err
}
return buf.Bytes(), nil
}
Copy the code
You can see that message consists of two parts: a 16-byte ID and Body; The Bytes method is used to convert message into byte slices for final network transmission.
topic.go
type Topic struct {
name string
sync.RWMutex // To ensure channelMap's security
channelMap map[string]*Channel // Record all channels under this topic
memoryMsgChan chan *Message // Hold the message sent to the topic
channelUpdateChan chan int // This is used to inform when the channel under this topic changes
}
func NewTopic(topicName string) *Topic {
t := &Topic{
name: topicName,
channelMap: make(map[string]*Channel),
memoryMsgChan: make(chan *Message, 10000),
channelUpdateChan: make(chan int),}go t.messagePump() // Start the work coroutine
return t
}
Copy the code
See the comments for the specific meanings of the fields in the Topic structure. Note that when we new a topic, we open a working coroutine for that topic to pass temporary messages from that topic to the channel under that topic, as follows
func (t *Topic) messagePump(a) {
var msg *Message
var chans []*Channel
var memoryMsgChan chan *Message
t.Lock()
for _, c := range t.channelMap {
chans = append(chans, c)
}
t.Unlock()
if len(chans) > 0 {
memoryMsgChan = t.memoryMsgChan
}
// main message loop
for {
select {
case msg = <-memoryMsgChan:
// The number of chans must be greater than 0, otherwise the message will be lost,
// So we will set memoryMsgChan to nil when chans is 0
for _, channel := range chans {
err := channel.PutMessage(msg)
iferr ! =nil {
log.Printf(
"TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
t.name, msg.ID, channel.name, err)
}
}
case <-t.channelUpdateChan:
log.Println("Topic update channel")
chans = chans[:0]
t.Lock()
for _, c := range t.channelMap {
chans = append(chans, c)
}
t.Unlock()
if len(chans) == 0 {
memoryMsgChan = nil
} else {
memoryMsgChan = t.memoryMsgChan
}
}
}
}
Copy the code
We use chans to store all channels in this topic, because channels increase and decrease dynamically, and when a channel changes, the channelUpdateChan will deliver the message, We need to iterate through the channelMap to assign to the chans. Note that if there are no channels in topic, we must not retrieve message from t. memorymsgchan, otherwise the message will be lost. One trick used here is to use an intermediate variable, memoryMsgChan, to set memoryMsgChan to nil when the number of channels under topic is zero, so that the select does not go into the branch.
Something else
func (t *Topic) PutMessage(m *Message) error {
log.Printf("The message into the topic")
t.memoryMsgChan <- m
return nil
}
func (t *Topic) GetChannel(channelName string) *Channel {
t.Lock()
channel, isNew := t.getOrCreateChannel(channelName)
t.Unlock()
if isNew {
t.channelUpdateChan <- 1
}
return channel
}
// this expects the caller to handle locking
func (t *Topic) getOrCreateChannel(channelName string) (*Channel, bool) {
channel, ok := t.channelMap[channelName]
if! ok { channel = NewChannel(t.name, channelName) t.channelMap[channelName] = channel log.Printf("TOPIC(%s): new channel(%s)", t.name, channel.name)
return channel, true
}
return channel, false
}
func (t *Topic) GenerateID(a) MessageID {
var h MessageID
return h
}
Copy the code
PutMessage stores messages to a topic. GetChannel gets an existing Channel or creates a new Channel. Note that a new Channel sends a signal to the channelUpdateChan. GenerateID generates a unique ID for Message, which we won’t use for now, so we’ll leave it at that.
channel.go
type Channel struct {
topicName string
name string
memoryMsgChan chan *Message // Hold messages sent to the channel
}
// NewChannel creates a new instance of the Channel type and returns a pointer
func NewChannel(topicName string, channelName string) *Channel {
return &Channel{
topicName: topicName,
name: channelName,
memoryMsgChan: make(chan *Message, 10000),}}// PutMessage writes a Message to the queue
func (c *Channel) PutMessage(m *Message) error {
log.Printf("The message into the channel, the body: % s",m.Body)
c.memoryMsgChan <- m
return nil
}
Copy the code
The Channel structure is very simple. The PutMessage method is called in the working coroutine of the topic above to pass the message from the topic to the Channel. Then when will these messages in temporary channels be sent to corresponding consumers? Let’s move on.
Other components
Program Entry (NSqD.go)
type NSQD struct {
sync.RWMutex
topicMap map[string]*Topic
}
func Start(a) (*NSQD, error) {
var err error
var tcpListener net.Listener
tcpListener, err = net.Listen("tcp"."0.0.0.0:4150")
iferr ! =nil {
return nil, fmt.Errorf("listen (%s) failed - %s"."0.0.0.0:4150", err)
}
log.Printf("TCP: listening on %s", tcpListener.Addr())
n := &NSQD{
topicMap: make(map[string]*Topic),
}
tcpServer := &tcpServer{nsqd: n,tcpListener: tcpListener}
go tcpServer.serve()
return n, nil
}
// GetTopic performs a thread safe operation
// Create a new one
func (n *NSQD) GetTopic(topicName string) *Topic {
n.Lock()
defer n.Unlock()
t, ok := n.topicMap[topicName]
if ok {
return t
}
t = NewTopic(topicName)
n.topicMap[topicName] = t
return t
}
// channels returns a flat slice of all channels in all topics
func (n *NSQD) channels(a)[] *Channel {
var channels []*Channel
n.RLock()
for _, t := range n.topicMap {
t.RLock()
for _, c := range t.channelMap {
channels = append(channels, c)
}
t.RUnlock()
}
n.RUnlock()
return channels
}
Copy the code
The NSQD structure holds all the topics in the message queue; Its GetTopic method is used to fetch the corresponding topic based on the given topicName and create a new topic if none exists. Its Channels method is used to fetch all channels in the message queue. Both of these methods are not difficult, so let’s look at the program’s Start entry method. The Start method starts a TCP server listening on port 4150, then creates a tcpServer structure and starts a new coroutine to perform the serve method of tcpServer.
tcp_server.go
type tcpServer struct {
nsqd *NSQD
tcpListener net.Listener
}
func (tcpServer *tcpServer) serve (a) error {
for {
clientConn, err := tcpServer.tcpListener.Accept()
iferr ! =nil {
break
}
// Each client connection has a coroutine to handle
go func(a) {
log.Printf("TCP: new client(%s)", clientConn.RemoteAddr())
prot := &protocolV2{nsqd: tcpServer.nsqd}
client := prot.NewClient(clientConn)
err := prot.IOLoop(client)
iferr ! =nil {
log.Printf("client(%s) - %s", clientConn.RemoteAddr(), err)
}
client.Close()
}()
}
return nil
}
Copy the code
As you can see, the serve method receives connections from clients in an infinite for loop and creates a working coroutine for each client that successfully establishes a connection. Specifically, in this working coroutine we create a Protocol structure and a client structure, and then call the IOLoop method of protocol.
Before looking at protocol.go, let’s take a look at client.go
client.go
const defaultBufferSize = 16 * 1024
type client struct {
sync.Mutex
// original connection
net.Conn
// reading/writing interfaces
Reader *bufio.Reader
Writer *bufio.Writer
SubEventChan chan *Channel // Pass the subscription event
}
func newClient(conn net.Conn) *client {
c := &client{
Conn: conn,
Reader: bufio.NewReaderSize(conn, defaultBufferSize),
Writer: bufio.NewWriterSize(conn, defaultBufferSize),
// There is a cache in case the subscription is blocked when the coroutine processing the subscription event is not ready for the subscription event.
// Since a consumer can only subscribe once, the capacity is 1
SubEventChan: make(chan *Channel, 1),}return c
}
func (c *client) Flush(a) error {
return c.Writer.Flush()
}
Copy the code
You can see that the client structure mainly holds a SubEventChan to pass subscription events, and because bufio cache is used, there is a Flush method to force the message to be sent.
protocal.go
Protocal. go takes responsibility for receiving messages from producers, accepting subscriptions from consumers and sending messages to consumers, encoding and decoding messages according to the protocol in each network send and receive. It’s probably the most complex file in the world, so let’s take a look at it.
// Each client has a corresponding working coroutine that receives messages from the client and actually processes them
func (p *protocol) IOLoop(client *client) error {
var err error
var line []byte
// Create another coroutine to handle consumer correlation
go p.messagePump(client)
for {
line, err = client.Reader.ReadSlice('\n')
iferr ! =nil {
if err == io.EOF {
err = nil
} else {
err = fmt.Errorf("failed to read command - %s", err)
}
break
}
// trim the '\n'
line = line[:len(line)- 1]
// optionally trim the '\r'
if len(line) > 0 && line[len(line)- 1] = ='\r' {
line = line[:len(line)- 1]
}
params := bytes.Split(line, separatorBytes)
err = p.Exec(client, params)
iferr ! =nil {
break}}return err
Copy the code
First, we know from the previous section that each connected client has a special coroutine to execute the IOLoop method. Then, you can see from the code that the IOLoop method is basically in an infinite for loop, constantly receiving and processing messages from the client. For the time being, we only deal with PUB and SUB messages, where the client sends a message as a producer to the specified topic, and SUB messages where the client subscribes as a consumer to the specified topic specified channel. Let’s take a look at the format of these two messages
PUB
Publish a message to a topic:
PUB <topic_name>\n
[ 4-byte size in bytes ][ N-byte binary data ]
Copy the code
SUB
Subscribe to a topic/channel
SUB <topic_name> <channel_name>\n
Copy the code
Then let’s look at how IOLoop parses messages from clients: First read the message until ‘\n’, then remove the read content ‘\n’ and then slice it according to the space to get params. In Exec method, we distinguish the message type according to the value of the first element of params and call the corresponding method for processing. Note that we haven’t actually finished reading the “PUB” message yet, and we’ll do another read in the PUB method, as analyzed below.
func (p *protocol) PUB(client *client, params [][]byte) error {
var err error
topicName := string(params[1])
messageLen := make([]byte.4)
_, err = io.ReadFull(client.Reader, messageLen)
iferr ! =nil {
return err
}
bodyLen:= int32(binary.BigEndian.Uint32(messageLen))
messageBody := make([]byte, bodyLen)
_, err = io.ReadFull(client.Reader, messageBody)
iferr ! =nil {
return err
}
topic := p.nsqd.GetTopic(topicName)
msg := NewMessage(topic.GenerateID(), messageBody)
log.Printf("receive message from %s, topic:%s, message: %s",client.RemoteAddr(),topicName,string(messageBody))
_ = topic.PutMessage(msg)
return nil
}
Copy the code
In the PUB method, first we get topicName from params; Then we read a second time to get message. First we read 4 bytes to get the size of Message (bodyLen). Then we read the size of bodyLen to get Message. We then call the GetTopic method to get the given topic (note that the GetTopic method creates a new one if it can’t find the given topic) and finally store the message in the topic.
func (p *protocol) SUB(client *client, params [][]byte) error {
topicName := string(params[1])
channelName := string(params[2])
var channel *Channel
topic := p.nsqd.GetTopic(topicName)
channel = topic.GetChannel(channelName)
// update message pump
client.SubEventChan <- channel
return nil
}
Copy the code
The SUB method also fetches the topic and channel specified, and sends the channel to the client’s SubEventChan (where the channel and client are bound), indicating that a subscription event has occurred.
Note that in the IOLoop method above, we start by creating a new working coroutine to call the messagePump method to handle consumer correlation. Let’s take a closer look at this method.
func (p *protocol) messagePump(client *client) {
var err error
var memoryMsgChan chan *Message
var subChannel *Channel
// subEventChan is newly created so that it can be set to nil below so that a client can subscribe only once
subEventChan := client.SubEventChan
for {
select {
case subChannel = <-subEventChan: // indicates that a subscription event occurred, where the subChannel is the channel to which the consumer is actually bound
log.Printf("Topic :%s channel:%s subscription event",subChannel.topicName,subChannel.name)
memoryMsgChan = subChannel.memoryMsgChan
// you can't SUB anymore
subEventChan = nil
case msg := <-memoryMsgChan: // If the memory channel corresponding to the channel has a message
err = p.SendMessage(client, msg)
iferr ! =nil {
go func(a) {
_ = subChannel.PutMessage(msg)
}()
log.Printf("PROTOCOL(V2): [%s] messagePump error - %s", client.RemoteAddr(), err)
goto exit
}
}
}
exit:
log.Printf("PROTOCOL(V2): [%s] exiting messagePump", client.RemoteAddr())
}
Copy the code
We first listen for subscription messages from subEventChan (as passed by the SUB method above); After the client subscribes, we listen for the channel bound to the client and send a message, if any, to that client. If the sending fails here, our current solution is to put the message back into a channel and wait for it to be sent again.
Let’s look at the specific methods responsible for sending messages
func (p *protocol) SendMessage(client *client, msg *Message) error {
log.Printf("PROTOCOL(V2): writing to client(%s) - message: %s", client.RemoteAddr(), msg.Body)
msgByte, err := msg.Bytes()
iferr ! =nil {
return err
}
return p.Send(client, msgByte)
}
func (p *protocol) Send(client *client,data []byte) error {
client.Lock()
defer client.Unlock()
_, err := SendFramedResponse(client.Writer, data)
iferr ! =nil {
return err
}
// Since client.Writer uses the Bufio cache, we will force a temporary refresh here
err = client.Flush()
return err
}
// To actually send the message, preceded by a 4bytes message length
func SendFramedResponse(w io.Writer, data []byte) (int, error) {
beBuf := make([]byte.4)
size := uint32(len(data))
binary.BigEndian.PutUint32(beBuf, size)
n, err := w.Write(beBuf)
iferr ! =nil {
return n, err
}
n, err = w.Write(data)
return n + 4, err
}
Copy the code
SendMessage is responsible for sending the Message to the consumer, which eventually calls SendFramedResponse to prefix the final Message with a 4bytes Message length and send it.
test
Test code analysis
apps/nsqd/main.go
func main(a) {
log.SetFlags(log.Lshortfile | log.Ltime)
_, err := nsqd.Start()
iferr ! =nil {
log.Printf("failed to instantiate nsqd - %s", err)
}
select{}}Copy the code
The Start procedure of NSQ is very simple, just call the Start of NSQD directly. Here, in order to prevent the exit of the program, we temporarily block with SELECT.
apps/client/client.go
func main(a) {
nsqdAddr := "127.0.0.1:4150"
conn, err := net.Dial("tcp", nsqdAddr)
go readFully(conn)
iferr ! =nil {
log.Fatal(err)
}
cmd := Publish("mytopic"And []byte("ha ha"))
cmd.WriteTo(conn)
cmd = Subscribe("mytopic"."mychannel")
cmd.WriteTo(conn)
select{}}func readFully(conn net.Conn) {
len: =make([]byte.4)
for {
_, err := conn.Read(len)
iferr ! =nil {
fmt.Printf("error during read: %s", err)
}
size :=binary.BigEndian.Uint32(len)
data := make([]byte, size)
var n int
n, err = conn.Read(data)
iferr ! =nil {
fmt.Printf("error during read: %s", err)
}
fmt.Printf("receive: <%s> ,size:%d\n", data[16:n],n)
}
}
Copy the code
For the test client, first we connect to the local port 4150, then send a PUB message, send a SUB message, and at the same time, we will start a new coroutine to continuously receive messages from the server. When a message is read, it first reads the 4-byte length, then ignores the Message ID and prints the message body. When sending the message, we use command. Go to encode the message, the file is not complicated, you can see by yourself.
The test results
We start the server first, then the client, and finally see the following results
The service side
The client
The code address
git clone https://github.com/xianxueniao150/mini-nsq.git
git checkout day01
Copy the code