We have introduced the related concepts of NSQ as well as the installation and application of NSQ. From the beginning of this article, the implementation details of NSQ will be introduced with the source code.
A single NSQD in NSQ can have multiple topics, and each topic can have multiple channels. A channel receives copies of all the messages in this topic to achieve multicast distribution, and each message on the channel is evenly distributed to its subscribers to achieve load balancing.
The entry function
Let’s first look at the entry function of NSQD:
Func main() {PRG := & Program {} if err := svc.run (PRG, syscall.sigint, syscall.sigterm); // Func main() {PRG := & Program {} if err := svc.run (PRG, syscall.sigint, syscall.sigterm); err ! = nil { logFatal("%s", err) } } func (p *program) Init(env svc.Environment) error { if env.IsWindowsService() { dir := filepath.Dir(os.Args[0]) return os.Chdir(dir) } return nil } func (p *program) Start() error { opts := nsqd.NewOptions() flagSet := nsqdFlagSet(opts) flagSet.Parse(os.Args[1:]) ... }Copy the code
Elegant background process management via third-party SVC packages, svc.run () -> svc.init () -> svc.start (), to Start NSQD instances.
The configuration item is initialized
Initialize the configuration item (OPts, CFG), load the historical data (nsqD.loadmetadata), persist the latest data (nsqD.persistmetadata), and then open the coroutine and enter the nsqd.main () Main function.
Resolve(opts, flagSet, CFG) NSQD, err := nsqd.New(opts) if err! = nil { logFatal("failed to instantiate nsqd - %s", err) } p.nsqd = nsqd err = p.nsqd.LoadMetadata() if err ! = nil { logFatal("failed to load metadata - %s", err) } err = p.nsqd.PersistMetadata() if err ! = nil { logFatal("failed to persist metadata - %s", err) } go func() { err := p.nsqd.Main() if err ! = nil { p.Stop() os.Exit(1) } }()Copy the code
Then it is to initialize tcpServer, httpServer, httpsServer, and then monitor queue information (n.kueuescanloop), node information management (n.lookuploop), statistics (n.tatsdloop) output.
NSQD = n.waitgroup.wrap (func() {exitFunc(protocol.tcpserver (n.tcplistener,)) {exitFunc(protocol.tcpserver (n.tcplistener,)); tcpServer, n.logf)) }) httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired) n.waitGroup.Wrap(func() { exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf)) }) if n.tlsConfig ! = nil && n.getOpts().HTTPSAddress ! = "" { httpsServer := newHTTPServer(ctx, true, true) n.waitGroup.Wrap(func() { exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf)) }) } n.waitGroup.Wrap(n.queueScanLoop) n.waitGroup.Wrap(n.lookupLoop) if n.getOpts().StatsdAddress ! = "" { n.waitGroup.Wrap(n.statsdLoop) }Copy the code
Handle the request
TCP/HTTP requests are processed separately, and handler coroutines are enabled for concurrent processing. NewHTTPServer registers routes using the Decorate mode (explained below).
/ / in NSQD/HTTP. Go: 44 router: = httprouter. The New () router. HandleMethodNotAllowed = true. The router PanicHandler = http_api.LogPanicHandler(ctx.nsqd.logf) router.NotFound = http_api.LogNotFoundHandler(ctx.nsqd.logf) router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqd.logf) s := &httpServer{ ctx: ctx, tlsEnabled: tlsEnabled, tlsRequired: tlsRequired, router: router, } router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText)) router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.V1)) // v1 negotiate router.Handle("POST", "/pub", http_api.Decorate(s.doPUB, http_api.V1)) router.Handle("POST", "/mpub", http_api.Decorate(s.doMPUB, http_api.V1)) router.Handle("GET", "/stats", http_api.Decorate(s.doStats, log, http_api.V1)) // only v1 router.Handle("POST", "/topic/create", http_api.Decorate(s.doCreateTopic, log, http_api.V1)) router.Handle("POST", "/topic/delete", http_api.Decorate(s.doDeleteTopic, log, http_api.V1))Copy the code
Distribute http-pipeline routes
Go :22 for {clientConn, err := listener.accept () if err! = nil { if nerr, ok := err.(net.Error); ok && nerr.Temporary() { logf(lg.WARN, "temporary Accept() failure - %s", err) runtime.Gosched() continue } // theres no direct way to detect this error because it is not exposed if ! strings.Contains(err.Error(), "use of closed network connection") { return fmt.Errorf("listener.Accept() error - %s", err) } break } go handler.Handle(clientConn) }Copy the code
The above implementation is the main code for TCP-handler processing.
TCP parsing protocol
TCP parses V2 and uses prot.IOLoop(CONN) encapsulated by the internal protocol for processing.
Protocol switch protocolMagic {case "V2": prot = &protocolv2 {CTX: p.ctx} default: protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL")) clientConn.Close() p.ctx.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'", clientConn.RemoteAddr(), protocolMagic) return } err = prot.IOLoop(clientConn) if err ! = nil { p.ctx.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err) return }Copy the code
Message generation and consumption
P. ec(command execution) and P. end(result sending) are carried out through internal protocols to ensure that each NSQD node can correctly generate and consume messages. Once there is error in the above process, it will be captured and processed to ensure the reliability of distributed delivery.
// In NSQD /protocol_v2.go:79 params := bytes.split (line, separatorBytes) p.tx. Nsqd. logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %s", client, params) var response []byte response, err = p.Exec(client, params) if err ! = nil { ctx := "" if parentErr := err.(protocol.ChildErr).Parent(); parentErr ! = nil { ctx = " - " + parentErr.Error() } p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ctx) sendErr := p.Send(client, frameTypeError, []byte(err.Error())) if sendErr ! = nil { p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx) break } // errors of type FatalClientErr should forceably close the connection if _, ok := err.(*protocol.FatalClientErr); ok { break } continue } if response ! = nil { err = p.Send(client, frameTypeResponse, response) if err ! = nil { err = fmt.Errorf("failed to send response - %s", err) break } }Copy the code
NSQD also enables TCP and HTTP services, both of which can be provided to producers and consumers. The HTTP service also provides NSQadmin with access to NSQD local topic and channel information.
summary
This paper mainly introduces NSQD. In general, the implementation of NSQD is not complicated. NSQD is a daemon that receives, queues, and delivers messages to clients. NSQD can run independently, but is typically configured by the cluster in which the NSQlookupd instance resides.
In the next article, we will look at the details of the implementation of other modules in NSQ.
Recommended reading
NSQ Parsing – An overview of high-performance messaging middleware
High-performance messaging middleware NSQ Parsing – Application practices
In microservice architecture, ELK is used for log collection and unified processing
How do I handle Go error exceptions without a try-catch?