• NSQ source analysis of go- NSQ Producer processing details
  • NSQ source code analysis of go-NSQ Consumer processing details
  • NSQ source code examines NSQD message handling (topic, channel, clients…
  • NSQ source code analysis nsqD-lookupd between state synchronization & RDY command

This is a series of articles based on NSQD V1.2.0 go- NSQ v1.0.8

Overview (FIG.

As you can see from the diagram, the coding of the message publishing process is mainly controlled by the following three files

  • Producer. go – This file processes the user’s request to publish messages
  • Conn. go – This file is mainly used to handle TCP stream-related logic
  • Command-go – This file is used primarily for TCP packet logic

In these three files, there are two more goroutine w.outer and c. readloop, which are the most important functions in the process. We will focus on these two loops next

Router

func (w *Producer) router(a) {
	for {
		select {
		case t := <-w.transactionChan:
			w.transactions = append(w.transactions, t)
			err := w.conn.WriteCommand(t.cmd)
			iferr ! =nil {
				w.close()}case data := <-w.responseChan:
			w.popTransaction(FrameTypeResponse, data)
		case data := <-w.errorChan:
			w.popTransaction(FrameTypeError, data)
		case <-w.closeChan:
			goto exit
		case <-w.exitChan:
			goto exit
		}
	}

exit:
	w.transactionCleanup()
	w.wg.Done()
}
Copy the code
  • transactionChanMessage pipes, used to aggregate user-posted messages
    • All user posts are written to this channel (which may be N Goroutines, and eventually to the go Router).
    • Writes a message to the TCP stream
    • A transactions structure is also maintained, and because TCP is sequential, the body of the message in progress is maintained here using a FIFO queue
  • responseChan & errorChanThe message pipe receives a good & bad reply from the TCP receiver
    • After receiving the message, both channels process the reply, remove the header message from the Transactions queue, and send it through the messagedoneSignal (mainly return a success error
  • closeChan & exitChanUsed to receive close & exitsignal
  • exit(exit points
    • First exit the logic processing loop;
    • This is where the cleanup process is handled when a close | exit signal is received.
    • First of all, to deal withSend in theTransactions (issued a done signal, mainly to indicate that the error is lost connection
    • Wait for other messages to be published to complete concurrentProducers
    • Processing is still inInternal processMessage transactionChan (or issue an error signal to lose connection
    • When all processing is complete, wg.done () exits the block, completing the exit logic.

We can see in this loop body how go-NSQ uses SELECT & Channel to design a clean and efficient multi-threaded processing model

  • Publish and receive messages based on transactionChan, closeChan, exitChan channels in SELECT
  • Use fifO-like queues to manage published messages
  • With closeChan, exitChan receives interrupt instructions from the outside
  • ConcurrentProducers is used to count asynchronous instructions for simultaneous processing
  • Wargaming is used to block until it is cleared

readLoop

There will be a slight deviation from the source code, reducing the log printing code, and some irrelevant logic code.

func (c *Conn) readLoop(a) {
	delegate := &connMessageDelegate{c}
	for {
		if atomic.LoadInt32(&c.closeFlag) == 1 {
			goto exit
		}

		frameType, data, err := ReadUnpackedResponse(c)
		iferr ! =nil {
			goto exit
		}

		if frameType == FrameTypeResponse && bytes.Equal(data, []byte("_heartbeat_")) {
			err := c.WriteCommand(Nop())
			iferr ! =nil {
				c.delegate.OnIOError(c, err)
				goto exit
			}
			continue
		}

		switch frameType {
		case FrameTypeResponse:
			c.delegate.OnResponse(c, data)
		case FrameTypeError:
			c.delegate.OnError(c, data)
		default:
			c.delegate.OnIOError(c, fmt.Errorf("unknown frame type %d", frameType))
		}
	}

exit:
	atomic.StoreInt32(&c.readLoopRunning, 0)
	messagesInFlight := atomic.LoadInt64(&c.messagesInFlight)
	if messagesInFlight == 0 {
		c.close()
	}
	c.wg.Done()
	c.log(LogLevelInfo, "readLoop exiting")}Copy the code

This is different from the logical processing model used in the above loop,(here is a normal for loop. Here’s the explanation in three parts

  • Unpack takes bytes from the TCP stream and parses them. Get two data frame_type & data
  • frameUsing different frame types, data is passed to the producer via a delegate (as mentioned earlier)responseChan & exitChan
  • exitExit point (here the source code gives a very detailed exit process, here directly pasted
    1. CLOSE cmd sent to nsqd
    2. CLOSE_WAIT response received from nsqd
    3. set c.closeFlag
    4. readLoop() exit
    5. c.exitChan close
    6. launch cleanup() goroutine & waitForCleanup() goroutine

Producer to summarize

  • Publishing is the TCP protocol used (message assembly through command)
  • The publishing side needs to specify the corresponding NSQD during construction. The relationship between Producer and NSQD is 1:1
  • The publishing end can simultaneously write messages to multiple NSQDS (multiple producers simultaneously write messages to achieve HA)
  • Publish empty messages (message, which causes panic
  • Introduction to Key Documents
    • Producer. go implements the publishing logic of messages
    • Conn. go implements network-specific logic
    • Command. Go implements TCP packet logic

Finally, recommend your own microservices framework

Barid-go is an easy-to-use microservices framework based on modular combinations;)