NSQ server code we can start by looking at the main function of the main module NSQD, which is the entrance to the whole program. Before an NSQD starts, there is some configuration and calls to nsqD.loadMetadata () and nsqD.persistmetadata () to load and persist messages. In order to focus on the main process, we can override the parts that we don’t need to worry too much about.

In main, we start an httpServer and a TCPServer. They both do exactly the same thing, so we can just focus on the flow of TCPServer.

Each TCP connection on the NSQ server is a long connection. Once established, data is continuously sent and received through the Socket. After TCP listening is enabled, the CLIENT receives connections through continuous polling. After establishing a connection, the initial handshake is made through the version number, and all clients that do not conform to the protocol are rejected. The server creates a Goroutine handler for each incoming connection.

The following is the main TCP flow of the entire server.

// nsq.go in main function tcpListener, err := net.Listen("tcp", n.getOpts().TCPAddress) n.Lock() n.tcpListener = tcpListener n.Unlock() tcpServer := &tcpServer{ctx: ctx} n.waitGroup.Wrap(func() { protocol.TCPServer(n.tcpListener, tcpServer, n.logf) }) // internal/protocol/tcp_server.go func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) { for { clientConn, err := listener.Accept() go handler.Handle(clientConn) } } // nsqd/tcp.go func (p *tcpServer) Handle(clientConn net.Conn) { buf := make([]byte, 4) _, err := io.ReadFull(clientConn, buf) protocolMagic := string(buf) var prot protocol.Protocol switch protocolMagic { case " V2": prot = &protocolV2{ctx: p.ctx} default: / / refuse any other all do not conform to the connection of the agreement protocol. The SendFramedResponse (clientConn frameTypeError, []byte("E_BAD_PROTOCOL")) clientConn.Close() return } //... err = prot.IOLoop(clientConn) }Copy the code

IOLoop is called within the Goroutine Handle of each connection. This function is typical of how TCP readers and writers work. The function itself is a reader for loop that continuously reads the data sent by the client and serializes it into the corresponding instruction to execute.

// nsqd/protocol_v2.go func (p *protocolV2) IOLoop(conn net.Conn) error { var line []byte clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1) client := newClientV2(clientID, conn, p.ctx) messagePumpStartedChan := make(chan bool) go p.messagePump(client, MessagePumpStartedChan) < -messagepumpstartedchan // wait for the Pump function to initialize. line, err = client.Reader.ReadSlice('\n') //... Split(line, separatorBytes) var Response []byte response, err = p.exec (client, params)} //... }Copy the code

The IOLoop messagePump function is a TCP write function that writes messages to the Socket.

func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) { //... Flushed := true close(startedChan) // Notify Reader that it can start reading messages. for { //... select { //... case msg := <-memoryMsgChan: //... subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout) client.SendingMessage() err = p.SendMessage(client, msg) if err ! = nil { goto exit } flushed = false case <-client.ExitChan: goto exit } } }Copy the code

The messagePump function, whose data is mainly from go-channel, essentially writes in-memory data to the network.

Topic

Before introducing Topic in detail, let’s first briefly understand the basic concept of Topic, its role in NSQ, and its relationship with channel (in order not to confuse the concept, the channel in this article corresponds to the CHANNEL of NSQ. Golang channels are all replaced with Go-channels.

An NSQD has multiple topics, and each topic has multiple channels. Each channel receives a copy from topic, and messages can be sent in multiple ways. Each message is placed in a channel, and multiple subscribers subscribe to a channel to receive messages in a distributed manner to achieve load balancing.

Let’s first look at the construction of a Topic

func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic { t := &Topic{ //... } if strings.HasSuffix(topicName, "#ephemeral") { t.ephemeral = true t.backend = newDummyBackendQueue() } else { //... t.backend = diskqueue.New( //... ) } t.waitgroup.wrap (func() {t.messagepump ()}) t.tx.nsqd.Notify(t) // Notify nsqlookupd to register this topic return t}Copy the code

If the Topic name contains #ephemeral, the messages are not backed up for this Topic. What follows the newDummyBackendQueue is essentially a series of empty operations in which messages are completely discarded once entered. A messagePump is then launched to receive and process the message (each topic has a messagePump polling continuously).

func (t *Topic) messagePump() { t.RLock() for _, c := range t.channelMap { chans = append(chans, c) } t.RUnlock() for { select { case msg = <-memoryMsgChan: //t.memoryMsgChan case buf = <-backendChan: //t.backend msg, err = decodeMessage(buf) //... case //... case <-t.exitChan: goto exit } for i, channel := range chans { chanMsg := msg // copy the message because each channel // needs a unique instance but... // fastpath to avoid copy if its the first channel // (the topic already created the first copy) if i > 0 { chanMsg = NewMessage(msg.ID, msg.Body) chanMsg.Timestamp = msg.Timestamp chanMsg.deferred = msg.deferred } if chanMsg.deferred ! = 0 { channel.PutMessageDeferred(chanMsg, chanMsg.deferred) continue } err := channel.PutMessage(chanMsg) } } }Copy the code

As we can see above, the main sources of messages are memoryMsgChan and backendChan. Every time a message is received, it is sent to all channels under the current topic (this is why two consumers subscribe to the same message and different channels receive the same message). If only one channel message is delivered directly, multiple channels copy the message.

Let’s take a look at how backendChan’s message came about. The DiskQueue maintains an ioLoop that reads and writes a message to a file or reads a message from a file and delivers it.

func (d *diskQueue) ioLoop() { var dataRead []byte var err error var count int64 var r chan []byte syncTicker := time.NewTicker(d.syncTimeout) for { //... if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) { dataRead, err = d.readOne() //... r = d.readChan } else { r = nil } select { case r <- dataRead: Count++ // moveForward sets needSync flag if a file is removed d.movforward () case dataWrite := Count++ d. iteresponsechan <-d. riteOne(dataWrite) case <-d. xitChan: goto exit}}}Copy the code

Let’s look at how messages from TCP data flow into a topic.

func (p *protocolV2) IOLoop(conn net.Conn) error {
	//...
    response, err = p.Exec(client, params)
    //...
}
func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) {
    switch{
    //...
	case bytes.Equal(params[0], []byte("PUB")):
		return p.PUB(client, params)
    }
}
func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) {
    //...
    topic := p.ctx.nsqd.GetTopic(topicName)
	msg := NewMessage(topic.GenerateID(), messageBody)
	err = topic.PutMessage(msg)
    //...
}
// PutMessage writes a Message to the queue
func (t *Topic) PutMessage(m *Message) error {
	//...
	err := t.put(m)
	//...
	return nil
}
func (t *Topic) put(m *Message) error {
	select {
	case t.memoryMsgChan <- m:
	default:
        //....
		err := writeMessageToBackend(b, m, t.backend)
		bufferPoolPut(b)
		t.ctx.nsqd.SetHealth(err)
	}
	return nil
}Copy the code

As we can see above, after the message comes from TCP, it gets the global NSQD instance after decoding and finds the corresponding topic for delivery. A topic finds its underlying channel post (a topic’s pump message polls memoryMsgChan) and if it fails it is saved to Backend (diskQueue). The flush operation also writes memoryChan messages to disk.

Channel

There are two types of messages in a Channel, one is delayed message and the other is normal message. We can see in the NSQ client interface that two different messages have different publish interfaces. Each message is stored in memory with a hash table and a priority queue. The role of the two different data structures will be discussed in more detail below.

The following is the data structure inside a channel.

type Channel struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms requeueCount uint64 TimeoutCount uint64 // Number of messages that have timed out sync.rwmutex // topicName of the global Channel read/write lock String // Each channel is associated with a topic name string // Channel name CTX *context // Global NSQ instance variable Backend BackendQueue // Message backup queue MemoryMsgChan chan *Message // Memory queue, entry for messages to a channel. State tracking clients map[int64]Consumer // Ephemeral bool deleteCallback func(*Channel) // Delete Channel callback Deleter sync. Once / / remove lock / / Stats tracking e2eProcessingLatencyStream * quantile quantile / / status report / / TODO: These can be DRYd up deferredMessage map[MessageID]* pqueue.item // deferredPQ pqueue.priorityQueue // DeferredMessage queue InFlightMessages map[MessageID]*Message // Messages to be sent inFlightPQ inFlightPqueue // Queues to be sent InFlightMutex sync.Mutex // send queue lock}Copy the code

A Channel is only responsible for receiving data and sending it out. The message is backed up to a queue before sending it out. If an error occurs, the message is backed up to a file. It does nothing more than serve as a temporary storage place for messages. All the things it doesn’t have a Goroutine loop inside to do polling. The following official diagram shows the shape of a channel, where goroutine refers to goroutine loops in other modules.

After NSQ establishes a connection with the client, each client establishes a channel with NSQ through sub instruction, and each message is sent in through pub instruction. After the message is sent in, the topic of the message will be found through the global NSQ instance and sent in. From the workflow of the topic above, we can see that the message eventually enters the corresponding channel. Within the Goroutine pumMessage of the Protocol, by pulling a channel from the corresponding client (created during the sub instruction), MemoryMsgChan, a Go channel in the channel, will fetch the message and deliver it to the remote client.

Here is the code for this sequence of processes:

func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) { var channel *Channel for { //... GetTopic(topicName) // Get the corresponding topic channel = topic.GetChannel(channelName) // Get the channel below topic channel.AddClient(client.ID, client) //... break } //... client.Channel = channel // update message pump client.SubEventChan <- channel } func (p *protocolV2) messagePump(client  *clientV2, startedChan chan bool) { //... var memoryMsgChan chan *Message //... for{ //... memoryMsgChan = subChannel.memoryMsgChan //... select { //... Case subChannel = < -subeventchan: case MSG := < -memorymsgchan: subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout) client.SendingMessage() err = p.SendMessage(client, msg) if err ! = nil { goto exit } flushed = false } } }Copy the code

Once a message is sent to the client, it is not discarded because it may fail to send, as we know from the StartInFlightTimeout function above. StartInFlightTimeout involves storing messages with both a hash table and a priority queue, as mentioned earlier. The reason for using priority queues here is because in the message retransmission mechanism messages need to be resend according to the priority in time. The message can be deleted and discarded only after the client returns the FIN directive. Whenever NSQ receives a message, it immediately sends it out or backs it up to a file (channel and topic Putmessage have corresponding implementations).

Pqueue PriorityQueue inFlightPqueue inFlightPqueue PriorityQueue inFlightPqueue inFlightPqueue PriorityQueue inFlightPqueue inFlightPqueue inFlightPqueue inFlightPqueue

The following is the StartInFlightTimeout implementation

func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error { //... MSG. Pri = now.add (timeout).unixnano () err := c.pushInflightMessage (MSG) // Write to the hash table if err! = nil {return err} c.addtoinflightpq (MSG) // Write to priority queue return nil}Copy the code

We can use the FIN directive to see how to remove a message from the queue.

//p.Exec(client, params)
switch {
	case bytes.Equal(params[0], []byte("FIN")):
		return p.FIN(client, params)
        //....
    }
//p.FIN(client, params)
client.Channel.FinishMessage(client.ID, *id)
//channel.FinishMessage
func (c *Channel) FinishMessage(clientID int64, id MessageID) error {
	msg, err := c.popInFlightMessage(clientID, id)
    //...
	c.removeFromInFlightPQ(msg)
	//...
}

func (c *Channel) popInFlightMessage(clientID int64, id MessageID) (*Message, error) {
	c.inFlightMutex.Lock()
	msg, ok := c.inFlightMessages[id]
	//...
	delete(c.inFlightMessages, id)
	c.inFlightMutex.Unlock()
	return msg, nil
}

func (c *Channel) removeFromInFlightPQ(msg *Message) {
	c.inFlightMutex.Lock()
	c.inFlightPQ.Remove(msg.index)
	c.inFlightMutex.Unlock()
}Copy the code

Clearing expired messages

Let’s look at another important Goroutine-queuescanloop back in NSQ Main. If you’re just getting started with the NSQ source code, this function will be a little confusing to read, because it has a complicated Redis’ probabilistic expiration algorithm in front of it, which I can’t quite explain right now, so we’ll just skip it and think of it as a timer.

QueueScanLoop retrieves all channels at regular intervals and calls resizePool, where the Pool refers to the message queue in the channel. What resizePool actually does is resend inFlightMessage and DeferredMessage messages in channels based on the timeout period.

func (n *NSQD) queueScanLoop() { //... n.resizePool(len(channels), workCh, responseCh, closeCh) //... } func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) { n.waitGroup.Wrap(func() { n.queueScanWorker(workCh, responseCh, closeCh) }) } func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) { for { select { case c := <-workCh: now := time.Now().UnixNano() dirty := false if c.processInFlightQueue(now) { dirty = true } if c.processDeferredQueue(now) { dirty = true } responseCh <- dirty case <-closeCh: return } } } func (c *Channel) processInFlightQueue(t int64) bool { c.exitMutex.RLock() defer c.exitMutex.RUnlock() Dirty: = false for {c.i nFlightMutex. Lock (MSG), _ : = c.i nFlightPQ. PeekAndShift (t) / / according to timeout out messages in the queue. c.inFlightMutex.Unlock() if msg == nil { goto exit } dirty = true _, err := c.popInFlightMessage(msg.clientID, MSG.ID) // Delete the hash message if err! = nil { goto exit } atomic.AddUint64(&c.timeoutCount, 1) c.RLock() client, C := clients[msg.clientid] c Unlock() if ok {client.timedOutMessage ()} c put(MSG) } exit: return dirty} // c.RocessDeferredQueue (now) and processInFlightQueue do the same thing.Copy the code

Note that if the client does not reply whether the message has been processed, the message will continue to be queued and queued.

summary

The above is the whole NSQ server source analysis, here you can summarize its workflow.

The NSQ server starts listening on a port and establishes a TCP long connection with the client. Each connection has a Goroutine polling TCP messages, and an iolOOP is set up for the connection and the client, with two separate Goroutines reading and writing. The message enters NSQ through the pub instruction, and the corresponding topic is responsible for forwarding to the memeryMsgChan(Go-channel) of the corresponding channel. The ioloop of each client gets the message and sends it out through the memeryMsgChan(go-channel) of the channel. When the client finishes processing the message and responds to the Fin command, the message that was queued for backup is discarded.