An overview of
NSQ is a real-time distributed messaging platform designed to operate on a large scale, processing billions of messages a day, and is used by many Internet companies;
NSQD is a daemon that receives, queues and delivers messages to clients. It can run on its own, but is typically configured by the cluster in which the nsQlookupd instance is located (where it can declare topics and channels for everyone to find); It listens on two TCP ports, one for the client and the other for the HTTP API; It can also listen for HTTPS on a third port
The module
NSQ consists of NSQD, NSQlookupd, and NSqadmin
nsqlookupd
Nsqlookupd is a daemon that manages topology information. The client finds the producers of a given topic by querying nsQlookupd, and the NSQD node broadcasts topic and channel information, with the following capabilities
- Uniqueness, in one
nsq
There’s only one in the servicensqlookupd
Services, of course, can also be deployed in multiple clustersnsqlookupd
But there is no correlation between them - Decentralization, even though
nsqlookupd
Crash will also not affect the runningnsqd
service - Act as a
nsqd
andnaqadmin
Middleware for information interaction - To provide a
http
Query the service and update the client regularlynsqd
Address directory of
nsqd
NSQD is a daemon that receives, queues, and delivers messages to clients
- Pair subscribed to the same
topic
And the samechannel
70% of consumers use load balancing strategies (not polling) - As long as
channel
Exist, even if there is nochannel
The consumers will also be the producersmessage
Caches to queues (note expiration processing of messages) - In the guaranteed queue
message
Will be consumed at least once, even thoughnsqd
Exit, which also temporarily stores messages in the queue to disk (except for unexpected circumstances such as terminating a process) - Limited memory footprint, configurable
nsqd
In eachchannel
The queue is cached in memorymessage
The quantity, once exceeded,message
Will be cached to disk topic
.channel
Once created, it will always exist, to be timely in the admin console or with code to remove invalidtopic
andchannel
Avoid the waste of resources
nsqadmin
Is a set of WEB UI that brings together real-time statistics for a cluster and performs various administrative tasks
Source code analysis
This article and the following analysis are based on version 1.0.0 code. For readability purposes, I have put comments outside the function and covered them basically. I will not explain how to use them in this article
nsqlookupd.go
package nsqlookupd
/ / lock
// Configure options
// tcpListener TCP HTTP port listener as mentioned above
// httpListener
// The waitGroup thread synchronizes
/ / database
type NSQLookupd struct {
sync.RWMutex
opts *Options
tcpListener net.Listener
httpListener net.Listener
waitGroup util.WaitGroupWrapper
DB *RegistrationDB
}
// If no Logger is specified, create a new one
// new NSQLookupd does something with 'NewRegistrationDB'
// Parse log level
func New(opts *Options) *NSQLookupd {
if opts.Logger == nil {
opts.Logger = log.New(os.Stderr, opts.LogPrefix, log.Ldate|log.Ltime|log.Lmicroseconds)
}
n := &NSQLookupd{
opts: opts,
DB: NewRegistrationDB(),
}
var err error
opts.logLevel, err = lg.ParseLogLevel(opts.LogLevel, opts.Verbose)
iferr ! =nil {
n.logf(LOG_FATAL, "%s", err)
os.Exit(1)
}
n.logf(LOG_INFO, version.String("nsqlookupd"))
return n
}
// Create context, CTX is NSQLookupd, why not introduce native context struct?
// create tcpListener, where lock is used, indicating concurrency in the scenario
// Create a tcpServer based on CTX
// Create a TCPServer after the waitGroup thread is synchronized
// Repeat the above steps to create the HTTPServer
func (l *NSQLookupd) Main(a) {
ctx := &Context{l}
tcpListener, err := net.Listen("tcp", l.opts.TCPAddress)
iferr ! =nil {
l.logf(LOG_FATAL, "listen (%s) failed - %s", l.opts.TCPAddress, err)
os.Exit(1)
}
l.Lock()
l.tcpListener = tcpListener
l.Unlock()
tcpServer := &tcpServer{ctx: ctx}
l.waitGroup.Wrap(func(a) {
protocol.TCPServer(tcpListener, tcpServer, l.logf)
})
httpListener, err := net.Listen("tcp", l.opts.HTTPAddress)
iferr ! =nil {
l.logf(LOG_FATAL, "listen (%s) failed - %s", l.opts.HTTPAddress, err)
os.Exit(1)
}
l.Lock()
l.httpListener = httpListener
l.Unlock()
httpServer := newHTTPServer(ctx)
l.waitGroup.Wrap(func(a) {
http_api.Serve(httpListener, httpServer, "HTTP", l.logf)
})
}
// Obtain the TCP address and continue to lock, indicating that the address may be changed
func (l *NSQLookupd) RealTCPAddr(a) *net.TCPAddr {
l.RLock()
defer l.RUnlock()
return l.tcpListener.Addr().(*net.TCPAddr)
}
// Get the HTTP address
func (l *NSQLookupd) RealHTTPAddr(a) *net.TCPAddr {
l.RLock()
defer l.RUnlock()
return l.httpListener.Addr().(*net.TCPAddr)
}
// Close tcpListener and wait for thread synchronization
func (l *NSQLookupd) Exit(a) {
ifl.tcpListener ! =nil {
l.tcpListener.Close()
}
ifl.httpListener ! =nil {
l.httpListener.Close()
}
l.waitGroup.Wait()Copy the code
If you want to know how to use nsqlookupd_test.go, you can see the test nsqlookupd_test.go 😂. In the above code, we see the db part
registrationdb.go
package nsqlookupd
/ / lock
// Store Producers with a Registration key
type RegistrationDB struct {
sync.RWMutex
registrationMap map[Registration]Producers
}
type Registration struct {
Category string
Key string
SubKey string
}
type Registrations []Registration
// * Node information *
// Last updated
/ / identifier
/ / address
/ / host name
// Broadcast address
/ / TCP addresses
/ / the HTTP address
/ / version number
type PeerInfo struct {
lastUpdate int64
id string
RemoteAddress string `json:"remote_address"`
Hostname string `json:"hostname"`
BroadcastAddress string `json:"broadcast_address"`
TCPPort int `json:"tcp_port"`
HTTPPort int `json:"http_port"`
Version string `json:"version"`
}
// * producer *
// Node information
// Whether to delete
// Delete time
type Producer struct {
peerInfo *PeerInfo
tombstoned bool
tombstonedAt time.Time
}
type Producers []*Producer
// Convert to a string
func (p *Producer) String(a) string {
return fmt.Sprintf("%s [%d, %d]", p.peerInfo.BroadcastAddress, p.peerInfo.TCPPort, p.peerInfo.HTTPPort)
}
/ / delete
func (p *Producer) Tombstone(a) {
p.tombstoned = true
p.tombstonedAt = time.Now()
}
// Whether to delete
func (p *Producer) IsTombstoned(lifetime time.Duration) bool {
return p.tombstoned && time.Now().Sub(p.tombstonedAt) < lifetime
}
/ / create RegistrationDB
func NewRegistrationDB(a) *RegistrationDB {
return &RegistrationDB{
registrationMap: make(map[Registration]Producers),
}
}
// Add a registry key
func (r *RegistrationDB) AddRegistration(k Registration) {
r.Lock()
defer r.Unlock()
_, ok := r.registrationMap[k]
if! ok { r.registrationMap[k] = Producers{} } }// Add a producer to the registration
// Take the producers and iterate through,
// If it doesn't exist, add it
// If so, return false
func (r *RegistrationDB) AddProducer(k Registration, p *Producer) bool {
r.Lock()
defer r.Unlock()
producers := r.registrationMap[k]
found := false
for _, producer := range producers {
if producer.peerInfo.id == p.peerInfo.id {
found = true}}if found == false {
r.registrationMap[k] = append(producers, p)
}
return! found }// Delete producer from registration based on id
// If it does not exist, return false
// Create a new producer, iterate over the original Producers,
// If the id is different, add it and delete it successfully
func (r *RegistrationDB) RemoveProducer(k Registration, id string) (bool.int) {
r.Lock()
defer r.Unlock()
producers, ok := r.registrationMap[k]
if! ok {return false.0
}
removed := false
cleaned := Producers{}
for _, producer := range producers {
ifproducer.peerInfo.id ! = id { cleaned =append(cleaned, producer)
} else {
removed = true}}// Note: this leaves keys in the DB even if they have empty lists
r.registrationMap[k] = cleaned
return removed, len(cleaned)
}
// Delete a registration
func (r *RegistrationDB) RemoveRegistration(k Registration) {
r.Lock()
defer r.Unlock()
delete(r.registrationMap, k)
}
// Need to filter
func (r *RegistrationDB) needFilter(key string, subkey string) bool {
return key == "*" || subkey == "*"
}
// Look up Registrations by category, key, subkey
// if key == '*' or subkey == '*', only one is found
// Otherwise, the registrationMap is iterated and all registrations that meet the criteria are returned
func (r *RegistrationDB) FindRegistrations(category string, key string, subkey string) Registrations {
r.RLock()
defer r.RUnlock()
if! r.needFilter(key, subkey) { k := Registration{category, key, subkey}if _, ok := r.registrationMap[k]; ok {
return Registrations{k}
}
return Registrations{}
}
results := Registrations{}
for k := range r.registrationMap {
if! k.IsMatch(category, key, subkey) {continue
}
results = append(results, k)
}
return results
}
// Find Producers by category, key, and subkey
// Same as above, there is nothing to say
func (r *RegistrationDB) FindProducers(category string, key string, subkey string) Producers {
r.RLock()
defer r.RUnlock()
if! r.needFilter(key, subkey) { k := Registration{category, key, subkey}return r.registrationMap[k]
}
results := Producers{}
for k, producers := range r.registrationMap {
if! k.IsMatch(category, key, subkey) {continue
}
for _, producer := range producers {
found := false
for _, p := range results {
if producer.peerInfo.id == p.peerInfo.id {
found = true}}if found == false {
results = append(results, producer)
}
}
}
return results
}
// Look up Registrations by id
// Still traversal nothing to say
func (r *RegistrationDB) LookupRegistrations(id string) Registrations {
r.RLock()
defer r.RUnlock()
results := Registrations{}
for k, producers := range r.registrationMap {
for _, p := range producers {
if p.peerInfo.id == id {
results = append(results, k)
break}}}return results
}
// Whether it matches
func (k Registration) IsMatch(category string, key string, subkey string) bool {
ifcategory ! = k.Category {return false
}
ifkey ! ="*"&& k.Key ! = key {return false
}
ifsubkey ! ="*"&& k.SubKey ! = subkey {return false
}
return true
}
/ / filter
func (rr Registrations) Filter(category string, key string, subkey string) Registrations {
output := Registrations{}
for _, k := range rr {
if k.IsMatch(category, key, subkey) {
output = append(output, k)
}
}
return output
}
// keys
func (rr Registrations) Keys(a) []string {
keys := make([]string.len(rr))
for i, k := range rr {
keys[i] = k.Key
}
return keys
}
// subkeys
func (rr Registrations) SubKeys(a) []string {
subkeys := make([]string.len(rr))
for i, k := range rr {
subkeys[i] = k.SubKey
}
return subkeys
}
// Filter by time
func (pp Producers) FilterByActive(inactivityTimeout time.Duration, tombstoneLifetime time.Duration) Producers {
now := time.Now()
results := Producers{}
for _, p := range pp {
cur := time.Unix(0, atomic.LoadInt64(&p.peerInfo.lastUpdate))
if now.Sub(cur) > inactivityTimeout || p.IsTombstoned(tombstoneLifetime) {
continue
}
results = append(results, p)
}
return results
}
// Node information
func (pp Producers) PeerInfo(a)[] *PeerInfo {
results := []*PeerInfo{}
for _, p := range pp {
results = append(results, p.peerInfo)
}
return results
}Copy the code
Now, you can see that RegistrationDB contains all node information in a map structure; Name db, in fact, at most count a cache just 2333333; The client queries nsqlookupd to discover the producer of a given topic.
Well, the first chapter is over for the time being, so stay tuned for the rest