1. Create consumers

Through c: = NSQ. NewConsumer (…). Ways to create consumers

2. Consumers register message listening

C. addHandler (nsq.handlerfunc (func(MSG * nsq.message)))

AddConcurrentHandlers()

(3) Start a Goroutine in AddConcurrentHandlers(), call handlerLoop(Handler), start an infinite loop in handlerLoop, through the unbuffered channel incomingMessages, block listening to the latest messages, The message is retrieved and passed to the callback handler.handleMessage (message) for processing

func (r *Consumer) AddHandler(handler Handler) { r.AddConcurrentHandlers(handler, 1) } func (r *Consumer) AddConcurrentHandlers(handler Handler, concurrency int) { for i := 0; i < concurrency; i++ { go r.handlerLoop(handler) } } func (r *Consumer) handlerLoop(handler Handler) { for { message, ok := <-r.incomingMessages if ! ok { goto exit } if r.shouldFailMessage(message, handler) { message.Finish() continue } err := handler.HandleMessage(message) ... }Copy the code
3. Connect NSQLookupd

Call c.connecttonSQLookupd (v), which calls queryLookupd()->ConnectToNSQD()

func (r *Consumer) ConnectToNSQLookupd(addr string) error { ... if numLookupd == 1 { r.queryLookupd() r.wg.Add(1) go r.lookupdLoop() } return nil } func (r *Consumer) queryLookupd() { . for _, addr := range nsqdAddrs { err = r.ConnectToNSQD(addr) ... }}Copy the code

As you can see in ConnectToNSQD(), a TCP connection to NSQD is established through conn.connect ()


func (r *Consumer) ConnectToNSQD(addr string) error {
	...
	resp, err := conn.Connect()
	...
	
}
Copy the code

A Goroutine is opened in Connect, listening for incoming messages in an infinite loop in the readLoop method

func (c *Conn) Connect() (*IdentifyResponse, error) { dialer := &net.Dialer{ LocalAddr: c.config.LocalAddr, Timeout: c.config.DialTimeout, } conn, err := dialer.Dial("tcp", c.addr) if err ! = nil { return nil, err } c.conn = conn.(*net.TCPConn) ... go c.readLoop() go c.writeLoop() return resp, nil }Copy the code

When a message is received, it is processed by the c.delegate.onMessage () method, which sends the message to incomingMessages, an unbuffered message channel, completing the logic of receiving the message.

func (c *Conn) readLoop() { delegate := &connMessageDelegate{c} for { ... frameType, data, err := ReadUnpackedResponse(c) switch frameType { ... case FrameTypeMessage: msg, err := DecodeMessage(data) ... c.delegate.OnMessage(c, msg) ... }... } func (r *Consumer) onConnMessage(c *Conn, msg *Message) { ... r.incomingMessages <- msg ... }Copy the code