NSQ is a very famous message queue in the go language. It is very easy to get started, the documentation is clear, and the source code is not complicated. It should also be mentioned that it is really convenient to write message queues using go’s channels. When we receive messages from producers, we must store them in memory for efficiency. If we use shared memory, such as a container in Java, we need to lock both storing and retrieving messages. But in GO, we don’t have to lock it, we just throw one coroutine into the channel, and we take another coroutine out of the channel, very convenient.

To understand message queues more deeply, I decided to implement a mini-version of NSQ in a series of articles. This article will implement the most basic functions of NSQ: producers publish messages and consumers subscribe to receive messages. Each successive edition builds on the previous one.

Before looking at the code, let’s review the NSQ messaging process.

NSQ message passing process

A client connected to an NSQ has two roles: producer or consumer, or possibly both. As shown in the figure above, a producer sends a message to a given topic, which copies the message to all channels below it, which then sends the message to the subscribing consumer. If multiple consumers subscribe to the same channel, Then each message in the channel will be sent to a random consumer.

Mini-nsq workflow

  1. Start a TCP service to listen for connection requests from clients
  2. Create a working coroutine for each successfully connected client to handle messages from the client (including consumer subscriptions, producer publishing messages, and so on)
  3. Messages sent by the producer are cached in the corresponding topic, and each topic has a working coroutine that sends the cached messages to the channel that subscribes to the topic
  4. Create a working coroutine for each successfully connected client that sends messages from the bound channel to that client

The code analysis

As shown in the figure below, the code is mainly divided into two parts: the test program under apps and our project file under NSQD

Base components (Message, Topic, Channel)

message.go

type MessageID [MsgIDLength]byte

const (
	MsgIDLength       = 16
)

type Message struct {
	ID        MessageID
	Body      []byte
}

func NewMessage(id MessageID, body []byte) *Message {
	return &Message{
		ID:        id,
		Body:      body,
	}
}

func (m *Message) Bytes(a) ([]byte,error) {
	buf := new(bytes.Buffer)
	_, err := buf.Write(m.ID[:])
	iferr ! =nil {
		return nil, err
	}
	_, err = buf.Write(m.Body)
	iferr ! =nil {
		return nil, err
	}
	return buf.Bytes(), nil
}
Copy the code

You can see that message consists of two parts: a 16-byte ID and Body; The Bytes method is used to convert message into byte slices for final network transmission.

topic.go

type Topic struct {
	name              string

	sync.RWMutex 	// To ensure channelMap's security
	channelMap        map[string]*Channel // Record all channels under this topic

	memoryMsgChan     chan *Message // Hold the message sent to the topic
	channelUpdateChan chan int  // This is used to inform when the channel under this topic changes
}

func NewTopic(topicName string) *Topic {
	t := &Topic{
		name:              topicName,
		channelMap:        make(map[string]*Channel),
		memoryMsgChan:     make(chan *Message, 10000),
		channelUpdateChan: make(chan int),}go t.messagePump() // Start the work coroutine
	return t
}
Copy the code

See the comments for the specific meanings of the fields in the Topic structure. Note that when we new a topic, we open a working coroutine for that topic to pass temporary messages from that topic to the channel under that topic, as follows

func (t *Topic) messagePump(a) {
	var msg *Message
	var chans []*Channel
	var memoryMsgChan chan *Message

	t.Lock()
	for _, c := range t.channelMap {
		chans = append(chans, c)
	}
	t.Unlock()

	if len(chans) > 0 {
		memoryMsgChan = t.memoryMsgChan
	}

	// main message loop
	for {
		select {
		case msg = <-memoryMsgChan:
			// The number of chans must be greater than 0, otherwise the message will be lost,
			// So we will set memoryMsgChan to nil when chans is 0
			for _, channel := range chans {
				err := channel.PutMessage(msg)
				iferr ! =nil {
					log.Printf(
						"TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
						t.name, msg.ID, channel.name, err)
				}
			}
		case <-t.channelUpdateChan:
			log.Println("Topic update channel")
			chans = chans[:0]
			t.Lock()
			for _, c := range t.channelMap {
				chans = append(chans, c)
			}
			t.Unlock()
			if len(chans) == 0 {
				memoryMsgChan = nil
			} else {
				memoryMsgChan = t.memoryMsgChan
			}
		}
	}
}
Copy the code

We use chans to store all channels in this topic, because channels increase and decrease dynamically, and when a channel changes, the channelUpdateChan will deliver the message, We need to iterate through the channelMap to assign to the chans. Note that if there are no channels in topic, we must not retrieve message from t. memorymsgchan, otherwise the message will be lost. One trick used here is to use an intermediate variable, memoryMsgChan, to set memoryMsgChan to nil when the number of channels under topic is zero, so that the select does not go into the branch.

Something else

func (t *Topic) PutMessage(m *Message) error {
	log.Printf("The message into the topic")
	t.memoryMsgChan <- m
	return nil
}

func (t *Topic) GetChannel(channelName string) *Channel {
	t.Lock()
	channel, isNew := t.getOrCreateChannel(channelName)
	t.Unlock()
	if isNew {
		t.channelUpdateChan <- 1
	}
	return channel
}

// this expects the caller to handle locking
func (t *Topic) getOrCreateChannel(channelName string) (*Channel, bool) {
	channel, ok := t.channelMap[channelName]
	if! ok { channel = NewChannel(t.name, channelName) t.channelMap[channelName] = channel log.Printf("TOPIC(%s): new channel(%s)", t.name, channel.name)
		return channel, true
	}
	return channel, false
}


func (t *Topic) GenerateID(a) MessageID {
	var h MessageID
	return h
}
Copy the code

PutMessage stores messages to a topic. GetChannel gets an existing Channel or creates a new Channel. Note that a new Channel sends a signal to the channelUpdateChan. GenerateID generates a unique ID for Message, which we won’t use for now, so we’ll leave it at that.

channel.go

type Channel struct {
	topicName string
	name      string
	memoryMsgChan chan *Message  // Hold messages sent to the channel
}

// NewChannel creates a new instance of the Channel type and returns a pointer
func NewChannel(topicName string, channelName string) *Channel {
	return &Channel{
		topicName:      topicName,
		name:           channelName,
		memoryMsgChan:  make(chan *Message, 10000),}}// PutMessage writes a Message to the queue
func (c *Channel) PutMessage(m *Message) error {
	log.Printf("The message into the channel, the body: % s",m.Body)
	c.memoryMsgChan <- m
	return nil
}
Copy the code

The Channel structure is very simple. The PutMessage method is called in the working coroutine of the topic above to pass the message from the topic to the Channel. Then when will these messages in temporary channels be sent to corresponding consumers? Let’s move on.

Other components

Program Entry (NSqD.go)

type NSQD struct {
	sync.RWMutex
	topicMap map[string]*Topic
}

func Start(a) (*NSQD, error) {
	var err error
	var tcpListener net.Listener
	tcpListener, err = net.Listen("tcp"."0.0.0.0:4150")
	iferr ! =nil {
		return nil, fmt.Errorf("listen (%s) failed - %s"."0.0.0.0:4150", err)
	}
	log.Printf("TCP: listening on %s", tcpListener.Addr())

	n := &NSQD{
		topicMap:             make(map[string]*Topic),
	}
	tcpServer := &tcpServer{nsqd: n,tcpListener: tcpListener}
	go tcpServer.serve()
	return n, nil
}


// GetTopic performs a thread safe operation
// Create a new one
func (n *NSQD) GetTopic(topicName string) *Topic {
	n.Lock()
	defer n.Unlock()
	t, ok := n.topicMap[topicName]
	if ok {
		return t
	}
	t = NewTopic(topicName)
	n.topicMap[topicName] = t
	return t
}

// channels returns a flat slice of all channels in all topics
func (n *NSQD) channels(a)[] *Channel {
	var channels []*Channel
	n.RLock()
	for _, t := range n.topicMap {
		t.RLock()
		for _, c := range t.channelMap {
			channels = append(channels, c)
		}
		t.RUnlock()
	}
	n.RUnlock()
	return channels
}
Copy the code

The NSQD structure holds all the topics in the message queue; Its GetTopic method is used to fetch the corresponding topic based on the given topicName and create a new topic if none exists. Its Channels method is used to fetch all channels in the message queue. Both of these methods are not difficult, so let’s look at the program’s Start entry method. The Start method starts a TCP server listening on port 4150, then creates a tcpServer structure and starts a new coroutine to perform the serve method of tcpServer.

tcp_server.go

type tcpServer struct {
	nsqd  *NSQD
	tcpListener   net.Listener
}


func (tcpServer *tcpServer) serve (a) error {
	for {
		clientConn, err := tcpServer.tcpListener.Accept()
		iferr ! =nil {
			break
		}
		// Each client connection has a coroutine to handle
		go func(a) {
			log.Printf("TCP: new client(%s)", clientConn.RemoteAddr())

			prot := &protocolV2{nsqd: tcpServer.nsqd}

			client := prot.NewClient(clientConn)

			err := prot.IOLoop(client)
			iferr ! =nil {
				log.Printf("client(%s) - %s", clientConn.RemoteAddr(), err)
			}
			client.Close()
		}()
	}

	return nil
}
Copy the code

As you can see, the serve method receives connections from clients in an infinite for loop and creates a working coroutine for each client that successfully establishes a connection. Specifically, in this working coroutine we create a Protocol structure and a client structure, and then call the IOLoop method of protocol.

Before looking at protocol.go, let’s take a look at client.go

client.go

const defaultBufferSize = 16 * 1024

type client struct {
	sync.Mutex

	// original connection
	net.Conn

	// reading/writing interfaces
	Reader *bufio.Reader
	Writer *bufio.Writer

	SubEventChan      chan *Channel // Pass the subscription event


}

func newClient(conn net.Conn) *client {
	c := &client{
		Conn: conn,
		Reader: bufio.NewReaderSize(conn, defaultBufferSize),
		Writer: bufio.NewWriterSize(conn, defaultBufferSize),
		// There is a cache in case the subscription is blocked when the coroutine processing the subscription event is not ready for the subscription event.
		// Since a consumer can only subscribe once, the capacity is 1
		SubEventChan:      make(chan *Channel, 1),}return c
}

func (c *client) Flush(a) error {
	return c.Writer.Flush()
}
Copy the code

You can see that the client structure mainly holds a SubEventChan to pass subscription events, and because bufio cache is used, there is a Flush method to force the message to be sent.

protocal.go

Protocal. go takes responsibility for receiving messages from producers, accepting subscriptions from consumers and sending messages to consumers, encoding and decoding messages according to the protocol in each network send and receive. It’s probably the most complex file in the world, so let’s take a look at it.

// Each client has a corresponding working coroutine that receives messages from the client and actually processes them
func (p *protocol) IOLoop(client *client) error {
        var err error
	var line []byte

	// Create another coroutine to handle consumer correlation
	go p.messagePump(client)

	for {
		line, err = client.Reader.ReadSlice('\n')
		iferr ! =nil {
			if err == io.EOF {
				err = nil
			} else {
				err = fmt.Errorf("failed to read command - %s", err)
			}
			break
		}

		// trim the '\n'
		line = line[:len(line)- 1]
		// optionally trim the '\r'
		if len(line) > 0 && line[len(line)- 1] = ='\r' {
			line = line[:len(line)- 1]
		}
		params := bytes.Split(line, separatorBytes)

		err = p.Exec(client, params)
		iferr ! =nil {
			break}}return err
Copy the code

First, we know from the previous section that each connected client has a special coroutine to execute the IOLoop method. Then, you can see from the code that the IOLoop method is basically in an infinite for loop, constantly receiving and processing messages from the client. For the time being, we only deal with PUB and SUB messages, where the client sends a message as a producer to the specified topic, and SUB messages where the client subscribes as a consumer to the specified topic specified channel. Let’s take a look at the format of these two messages

PUB

Publish a message to a topic:

PUB <topic_name>\n
[ 4-byte size in bytes ][ N-byte binary data ]
Copy the code

SUB

Subscribe to a topic/channel

SUB <topic_name> <channel_name>\n
Copy the code

Then let’s look at how IOLoop parses messages from clients: First read the message until ‘\n’, then remove the read content ‘\n’ and then slice it according to the space to get params. In Exec method, we distinguish the message type according to the value of the first element of params and call the corresponding method for processing. Note that we haven’t actually finished reading the “PUB” message yet, and we’ll do another read in the PUB method, as analyzed below.

func (p *protocol) PUB(client *client, params [][]byte)  error {
	var err error
	topicName := string(params[1])
	messageLen := make([]byte.4)
	_, err  = io.ReadFull(client.Reader, messageLen)
	iferr ! =nil {
		return err
	}
	bodyLen:= int32(binary.BigEndian.Uint32(messageLen))
	messageBody := make([]byte, bodyLen)
	_, err = io.ReadFull(client.Reader, messageBody)
	iferr ! =nil {
		return err
	}

	topic := p.nsqd.GetTopic(topicName)
	msg := NewMessage(topic.GenerateID(), messageBody)
	log.Printf("receive message from %s, topic:%s, message: %s",client.RemoteAddr(),topicName,string(messageBody))
	_ = topic.PutMessage(msg)
	return nil
}
Copy the code

In the PUB method, first we get topicName from params; Then we read a second time to get message. First we read 4 bytes to get the size of Message (bodyLen). Then we read the size of bodyLen to get Message. We then call the GetTopic method to get the given topic (note that the GetTopic method creates a new one if it can’t find the given topic) and finally store the message in the topic.

func (p *protocol) SUB(client *client, params [][]byte)  error {
	topicName := string(params[1])
	channelName := string(params[2])

	var channel *Channel
	topic := p.nsqd.GetTopic(topicName)
	channel = topic.GetChannel(channelName)
	// update message pump
	client.SubEventChan <- channel

	return nil
}
Copy the code

The SUB method also fetches the topic and channel specified, and sends the channel to the client’s SubEventChan (where the channel and client are bound), indicating that a subscription event has occurred.

Note that in the IOLoop method above, we start by creating a new working coroutine to call the messagePump method to handle consumer correlation. Let’s take a closer look at this method.

func (p *protocol) messagePump(client *client) {
	var err error
	var memoryMsgChan chan *Message
	var subChannel *Channel
	// subEventChan is newly created so that it can be set to nil below so that a client can subscribe only once
	subEventChan := client.SubEventChan

	for {
		select {
		case subChannel = <-subEventChan:  // indicates that a subscription event occurred, where the subChannel is the channel to which the consumer is actually bound
			log.Printf("Topic :%s channel:%s subscription event",subChannel.topicName,subChannel.name)
			memoryMsgChan = subChannel.memoryMsgChan
			// you can't SUB anymore
			subEventChan = nil
		case msg := <-memoryMsgChan: // If the memory channel corresponding to the channel has a message
			err = p.SendMessage(client, msg)
			iferr ! =nil {
                                go func(a) {
					_ = subChannel.PutMessage(msg)
				}()
				log.Printf("PROTOCOL(V2): [%s] messagePump error - %s", client.RemoteAddr(), err)
				goto exit
			}
		}
	}

exit:
	log.Printf("PROTOCOL(V2): [%s] exiting messagePump", client.RemoteAddr())
}
Copy the code

We first listen for subscription messages from subEventChan (as passed by the SUB method above); After the client subscribes, we listen for the channel bound to the client and send a message, if any, to that client. If the sending fails here, our current solution is to put the message back into a channel and wait for it to be sent again.

Let’s look at the specific methods responsible for sending messages

func (p *protocol) SendMessage(client *client, msg *Message) error {
	log.Printf("PROTOCOL(V2): writing to client(%s) - message: %s", client.RemoteAddr(), msg.Body)

	msgByte, err := msg.Bytes()
	iferr ! =nil {
		return err
	}
	return p.Send(client, msgByte)
}

func (p *protocol) Send(client *client,data []byte) error {
	client.Lock()
	defer client.Unlock()
	_, err := SendFramedResponse(client.Writer, data)
	iferr ! =nil {
		return err
	}
	// Since client.Writer uses the Bufio cache, we will force a temporary refresh here
	err = client.Flush()
	return err
}

// To actually send the message, preceded by a 4bytes message length
func SendFramedResponse(w io.Writer, data []byte) (int, error) {
	beBuf := make([]byte.4)
	size := uint32(len(data))

	binary.BigEndian.PutUint32(beBuf, size)
	n, err := w.Write(beBuf)
	iferr ! =nil {
		return n, err
	}

	n, err = w.Write(data)
	return n + 4, err
}
Copy the code

SendMessage is responsible for sending the Message to the consumer, which eventually calls SendFramedResponse to prefix the final Message with a 4bytes Message length and send it.

test

Test code analysis

apps/nsqd/main.go

func main(a) {
	log.SetFlags(log.Lshortfile | log.Ltime)
	_, err := nsqd.Start()
	iferr ! =nil {
		log.Printf("failed to instantiate nsqd - %s", err)
	}
	select{}}Copy the code

The Start procedure of NSQ is very simple, just call the Start of NSQD directly. Here, in order to prevent the exit of the program, we temporarily block with SELECT.

apps/client/client.go

func main(a) {
	nsqdAddr := "127.0.0.1:4150"
	conn, err := net.Dial("tcp", nsqdAddr)
	go readFully(conn)
	iferr ! =nil {
		log.Fatal(err)
	}
	cmd := Publish("mytopic"And []byte("ha ha"))
	cmd.WriteTo(conn)

	cmd = Subscribe("mytopic"."mychannel")
	cmd.WriteTo(conn)

	select{}}func readFully(conn net.Conn) {
	len: =make([]byte.4)
	for {
		_, err := conn.Read(len)
		iferr ! =nil {
			fmt.Printf("error during read: %s", err)
		}
		size :=binary.BigEndian.Uint32(len)
		data := make([]byte, size)
		var n int
		n, err = conn.Read(data)
		iferr ! =nil {
			fmt.Printf("error during read: %s", err)
		}
		fmt.Printf("receive: <%s> ,size:%d\n", data[16:n],n)
	}
}
Copy the code

For the test client, first we connect to the local port 4150, then send a PUB message, send a SUB message, and at the same time, we will start a new coroutine to continuously receive messages from the server. When a message is read, it first reads the 4-byte length, then ignores the Message ID and prints the message body. When sending the message, we use command. Go to encode the message, the file is not complicated, you can see by yourself.

The test results

We start the server first, then the client, and finally see the following results

The service side

The client

The code address

git clone https://github.com/xianxueniao150/mini-nsq.git
git checkout day01
Copy the code