1. Create consumers
Through c: = NSQ. NewConsumer (…). Ways to create consumers
2. Consumers register message listening
C. addHandler (nsq.handlerfunc (func(MSG * nsq.message)))
AddConcurrentHandlers()
(3) Start a Goroutine in AddConcurrentHandlers(), call handlerLoop(Handler), start an infinite loop in handlerLoop, through the unbuffered channel incomingMessages, block listening to the latest messages, The message is retrieved and passed to the callback handler.handleMessage (message) for processing
func (r *Consumer) AddHandler(handler Handler) { r.AddConcurrentHandlers(handler, 1) } func (r *Consumer) AddConcurrentHandlers(handler Handler, concurrency int) { for i := 0; i < concurrency; i++ { go r.handlerLoop(handler) } } func (r *Consumer) handlerLoop(handler Handler) { for { message, ok := <-r.incomingMessages if ! ok { goto exit } if r.shouldFailMessage(message, handler) { message.Finish() continue } err := handler.HandleMessage(message) ... }Copy the code
3. Connect NSQLookupd
Call c.connecttonSQLookupd (v), which calls queryLookupd()->ConnectToNSQD()
func (r *Consumer) ConnectToNSQLookupd(addr string) error { ... if numLookupd == 1 { r.queryLookupd() r.wg.Add(1) go r.lookupdLoop() } return nil } func (r *Consumer) queryLookupd() { . for _, addr := range nsqdAddrs { err = r.ConnectToNSQD(addr) ... }}Copy the code
As you can see in ConnectToNSQD(), a TCP connection to NSQD is established through conn.connect ()
func (r *Consumer) ConnectToNSQD(addr string) error {
...
resp, err := conn.Connect()
...
}
Copy the code
A Goroutine is opened in Connect, listening for incoming messages in an infinite loop in the readLoop method
func (c *Conn) Connect() (*IdentifyResponse, error) { dialer := &net.Dialer{ LocalAddr: c.config.LocalAddr, Timeout: c.config.DialTimeout, } conn, err := dialer.Dial("tcp", c.addr) if err ! = nil { return nil, err } c.conn = conn.(*net.TCPConn) ... go c.readLoop() go c.writeLoop() return resp, nil }Copy the code
When a message is received, it is processed by the c.delegate.onMessage () method, which sends the message to incomingMessages, an unbuffered message channel, completing the logic of receiving the message.
func (c *Conn) readLoop() { delegate := &connMessageDelegate{c} for { ... frameType, data, err := ReadUnpackedResponse(c) switch frameType { ... case FrameTypeMessage: msg, err := DecodeMessage(data) ... c.delegate.OnMessage(c, msg) ... }... } func (r *Consumer) onConnMessage(c *Conn, msg *Message) { ... r.incomingMessages <- msg ... }Copy the code