In the last article, I first learned about the demo demonstration of NSQ three modules (NSQD, NSQlookupd, NSqadmin). This article starts from the source code, and analyzes the execution process and logical processing of NSQD step by step, learning other people’s excellent project architecture, in order to apply what I have learned.
1. nsqd
Perform the entrance
In NSQ /apps/ NSQD /main.go, you can find the execution entry file as follows:
2. nsqd
Execute master logic source code
2.1 Elegant background process management through third-party SVC package, svc.run () -> svc.init () -> svc.start (), Start NSQD instance;
func main(a) {
prg := &program{}
iferr := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err ! =nil {
logFatal("%s", err)
}
}
func (p *program) Init(env svc.Environment) error {
if env.IsWindowsService() {
dir := filepath.Dir(os.Args[0])
return os.Chdir(dir)
}
return nil
}
func (p *program) Start(a) error {
opts := nsqd.NewOptions()
flagSet := nsqdFlagSet(opts)
flagSet.Parse(os.Args[1:)... }Copy the code
2.2 Initialize the configuration item (OPTS, CFG), load the historical data (nsqd.LoadMetadata), persist the latest data (nsqD. PersistMetadata), and then open the coroutine to enter the Main function nsqd.Main().
options.Resolve(opts, flagSet, cfg)
nsqd, err := nsqd.New(opts)
iferr ! =nil {
logFatal("failed to instantiate nsqd - %s", err)
}
p.nsqd = nsqd
err = p.nsqd.LoadMetadata()
iferr ! =nil {
logFatal("failed to load metadata - %s", err)
}
err = p.nsqd.PersistMetadata()
iferr ! =nil {
logFatal("failed to persist metadata - %s", err)
}
go func(a) {
err := p.nsqd.Main()
iferr ! =nil {
p.Stop()
os.Exit(1)}} ()Copy the code
2.3 Initialize tcpServer, httpServer, and httpsServer, and then monitor queue information (n.kueuescanloop), node information management (n.lookuploop), and statistics information (n.tatsdloop).
tcpServer := &tcpServer{ctx: ctx}
n.waitGroup.Wrap(func(a) {
exitFunc(protocol.TCPServer(n.tcpListener, tcpServer, n.logf))
})
httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
n.waitGroup.Wrap(func(a) {
exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))
})
ifn.tlsConfig ! =nil&& n.getOpts().HTTPSAddress ! ="" {
httpsServer := newHTTPServer(ctx, true.true)
n.waitGroup.Wrap(func(a) {
exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf))
})
}
n.waitGroup.Wrap(n.queueScanLoop)
n.waitGroup.Wrap(n.lookupLoop)
ifn.getOpts().StatsdAddress ! ="" {
n.waitGroup.Wrap(n.statsdLoop)
}
Copy the code
2.4 Process TCP/HTTP requests separately and enable the Handler coroutine for concurrent processing. NewHTTPServer registers routes using the Decorate mode (explained later).
Http-class route Distribution:
router := httprouter.New()
router.HandleMethodNotAllowed = true
router.PanicHandler = http_api.LogPanicHandler(ctx.nsqd.logf)
router.NotFound = http_api.LogNotFoundHandler(ctx.nsqd.logf)
router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqd.logf)
s := &httpServer{
ctx: ctx,
tlsEnabled: tlsEnabled,
tlsRequired: tlsRequired,
router: router,
}
router.Handle("GET"."/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))
router.Handle("GET"."/info", http_api.Decorate(s.doInfo, log, http_api.V1))
// v1 negotiate
router.Handle("POST"."/pub", http_api.Decorate(s.doPUB, http_api.V1))
router.Handle("POST"."/mpub", http_api.Decorate(s.doMPUB, http_api.V1))
router.Handle("GET"."/stats", http_api.Decorate(s.doStats, log, http_api.V1))
// only v1
router.Handle("POST"."/topic/create", http_api.Decorate(s.doCreateTopic, log, http_api.V1))
router.Handle("POST"."/topic/delete", http_api.Decorate(s.doDeleteTopic, log, http_api.V1))
Copy the code
TCP – handler to deal with:
for {
clientConn, err := listener.Accept()
iferr ! =nil {
if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
logf(lg.WARN, "temporary Accept() failure - %s", err)
runtime.Gosched()
continue
}
// theres no direct way to detect this error because it is not exposed
if! strings.Contains(err.Error(),"use of closed network connection") {
return fmt.Errorf("listener.Accept() error - %s", err)
}
break
}
go handler.Handle(clientConn)
}
Copy the code
2.5 TCP Parses V2 and uses prot.IOLoop(conn) encapsulated by internal protocols for processing.
var prot protocol.Protocol
switch protocolMagic {
case " V2":
prot = &protocolV2{ctx: p.ctx}
default:
protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
clientConn.Close()
p.ctx.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
clientConn.RemoteAddr(), protocolMagic)
return
}
err = prot.IOLoop(clientConn)
iferr ! =nil {
p.ctx.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)
return
}
Copy the code
2.6 P.ec (command execution) and P.end (result sending) are carried out through internal protocols to ensure that each NSQD node can correctly generate and consume messages. Once any error occurs in the above process, it will be captured and processed to ensure the reliability of distributed delivery.
params := bytes.Split(line, separatorBytes)
p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %s", client, params)
var response []byte
response, err = p.Exec(client, params)
iferr ! =nil {
ctx := ""
ifparentErr := err.(protocol.ChildErr).Parent(); parentErr ! =nil {
ctx = "-" + parentErr.Error()
}
p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ctx)
sendErr := p.Send(client, frameTypeError, []byte(err.Error()))
ifsendErr ! =nil {
p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx)
break
}
// errors of type FatalClientErr should forceably close the connection
if _, ok := err.(*protocol.FatalClientErr); ok {
break
}
continue
}
ifresponse ! =nil {
err = p.Send(client, frameTypeResponse, response)
iferr ! =nil {
err = fmt.Errorf("failed to send response - %s", err)
break}}Copy the code
3. nsqd
Summary of flow chart
The above process is summarized as follows:
[Summary] It can be seen from the source code that the code logic is clear and clear, and the use of Go coroutine is efficient and concurrent to process the message production and consumption of distributed multi-node NSQD. There are many details to be carefully analyzed in the next step, so that we can apply what we have learned.