The flow chart
Source code structure
Introduction to Main
func main{
envflag.Parse() // Initialize environment variables, including CMD line and env
netstorage.InitStorageNodes(*storageNodes) // Initialize the storageNode
relabel.Init() //Init label
storage.SetMaxLabelsPerTimeseries(*maxLabelsPerTimeseries)
common.StartUnmarshalWorkers() // Start Marshal's thread
writeconcurrencylimiter.Init() // Limit on the number of threads that can initially write. Default is CPU*4
if len(*clusternativeListenAddr) > 0 { // Vminsert forwards the data sent by the vminsert
clusternativeServer = clusternativeserver.MustStart(*clusternativeListenAddr, func(c net.Conn) error {
return clusternative.InsertHandler(c)
})
}
if len(*graphiteListenAddr) > 0 { // Graph type data forwarding, TCP and UDP long connection
graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, func(r io.Reader) error {
var at auth.Token // TODO: properly initialize auth token
return graphite.InsertHandler(&at, r)
})
}
if len(*influxListenAddr) > 0 { // Data forwarding, TCP and UDP long connections
influxServer = influxserver.MustStart(*influxListenAddr, func(r io.Reader) error {
var at auth.Token // TODO: properly initialize auth token
return influx.InsertHandlerForReader(&at, r)
})
}
if len(*opentsdbListenAddr) > 0 { // Opentsdb data forwarding, TCP and UDP long connection
opentsdbServer = opentsdbserver.MustStart(*opentsdbListenAddr, func(r io.Reader) error {
var at auth.Token // TODO: properly initialize auth token
return opentsdb.InsertHandler(&at, r)
}, opentsdbhttp.InsertHandler)
}
if len(*opentsdbHTTPListenAddr) > 0 {
opentsdbhttpServer = opentsdbhttpserver.MustStart(*opentsdbHTTPListenAddr, opentsdbhttp.InsertHandler)
}
go func(a) {
httpserver.Serve(*httpListenAddr, requestHandler)
}()
sig := procutil.WaitForSigterm()
logger.Infof("service received signal %s", sig)
logger.Infof("gracefully shutting down http service at %q", *httpListenAddr)
startTime = time.Now()
iferr := httpserver.Stop(*httpListenAddr); err ! =nil {
logger.Fatalf("cannot stop http service: %s", err)
}
logger.Infof("successfully shut down http service in %.3f seconds", time.Since(startTime).Seconds())
startTime = time.Now()
netstorage.Stop()
logger.Infof("successfully stopped netstorage in %.3f seconds", time.Since(startTime).Seconds())
fs.MustStopDirRemover()
logger.Infof("the vminsert has been stopped")}Copy the code
The source code of Main can be seen that Main is divided into three parts:
- Initialization part: for example
env
andflag
Initialization of. Then thenetstorage
I’m going to initialize it, and then I’m going toUnmarshalWorkers
andwriterLimiter
Initialize. How each piece works will be explained later. - Start the server
vminsert
Data accept server,graph
andopentsdb
The data receiver server is started, and the HttpServer is started. - Exit processing:
vminsert
The program exit gracefully, mainly by clearing some status information, and then flush the data cached in vminsert into vmStorage.
Initialize the
Env & Flag parsing
func Parse(a) {
flag.Parse()
if! *enable {// Whether to enable initialization of environment variables
return
}
// Remember explicitly set command-line flags.
flagsSet := make(map[string]bool)
flag.Visit(func(f *flag.Flag) {
flagsSet[f.Name] = true
})
// Obtain the remaining flag values from environment vars.
flag.VisitAll(func(f *flag.Flag) { // Find the value of the environment variable for each parameter you want to set
if flagsSet[f.Name] { // If CMD line is set, use cmdline
// The flag is explicitly set via command-line.
return
}
// Get flag value from environment var.
fname := getEnvFlagName(f.Name) // Get the value of the environment variable as the value, or set to the default if the environment variable is not present either
if v, ok := os.LookupEnv(fname); ok {
iferr := f.Value.Set(v); err ! =nil {
// Do not use lib/logger here, since it is uninitialized yet.
log.Fatalf("cannot set flag %s to %q, which is read from environment variable %q: %s", f.Name, v, fname, err)
}
}
})
}
Copy the code
Envflag.parse () sets flag parameters. If environment variables are enabled, the command obtains parameters from the environment variables that are not specified by CMD.
NetStorage Init
func InitStorageNodes(addrs []string) {
if len(addrs) > 255 { // More than 255 nodes reported an error
logger.Panicf("BUG: too much addresses: %d; max supported %d addresses".len(addrs), 255)
}
storageNodes = storageNodes[:0] // Initialize the storageNode list
for _, addr := range addrs { // Create sn for each ADDRS
sn := &storageNode{
dialer: netutil.NewTCPDialer("vminsert", addr),
dialErrors: metrics.NewCounter(fmt.Sprintf(`vm_rpc_dial_errors_total{name="vminsert", addr=%q}`, addr)),
handshakeErrors: metrics.NewCounter(fmt.Sprintf(`vm_rpc_handshake_errors_total{name="vminsert", addr=%q}`, addr)),
connectionErrors: metrics.NewCounter(fmt.Sprintf(`vm_rpc_connection_errors_total{name="vminsert", addr=%q}`, addr)),
rowsPushed: metrics.NewCounter(fmt.Sprintf(`vm_rpc_rows_pushed_total{name="vminsert", addr=%q}`, addr)),
rowsSent: metrics.NewCounter(fmt.Sprintf(`vm_rpc_rows_sent_total{name="vminsert", addr=%q}`, addr)),
rowsReroutedFromHere: metrics.NewCounter(fmt.Sprintf(`vm_rpc_rows_rerouted_from_here_total{name="vminsert", addr=%q}`, addr)),
rowsReroutedToHere: metrics.NewCounter(fmt.Sprintf(`vm_rpc_rows_rerouted_to_here_total{name="vminsert", addr=%q}`, addr)),
}
....
storageNodes = append(storageNodes, sn) // Add the created SN to the SN list
}
// Set the cache value for insert.// Execute for each SN
for idx, sn := range storageNodes {
storageNodesWG.Add(1)
go func(sn *storageNode, idx int) {
sn.run(storageNodesStopCh, idx)
storageNodesWG.Done()
}(sn, idx)
}
rerouteWorkerWG.Add(1)
go func(a) {
rerouteWorker(rerouteWorkerStopCh)
rerouteWorkerWG.Done()
}()
}
func (sn *storageNode) run(stopCh <-chan struct{}, snIdx int) {
// Judge each copy. The maximum number of copies does not exceed the length of the SN list
replicas := *replicationFactor
if replicas <= 0 {
replicas = 1
}
if replicas > len(storageNodes) {
replicas = len(storageNodes)
}
ticker := time.NewTicker(200 * time.Millisecond) //200ms for a refresh
defer ticker.Stop()
var br bufRows
brLastResetTime := fasttime.UnixTimestamp()
var waitCh <-chan struct{}
mustStop := false
for! mustStop {// Loop until the program stops
sn.brLock.Lock()
bufLen := len(sn.br.buf) // Determine the length of buF
sn.brLock.Unlock()
waitCh = nil
if bufLen > 0 {
// Do not sleep if sn.br.buf isn't empty.
waitCh = closedCh If buf is not empty, flush directly
}
select {
case <-stopCh:
mustStop = true // Stop the program
// Make sure the sn.buf is flushed last time before returning
// in order to send the remaining bits of data.
case <-ticker.C: / / 200 ms to flush
case <-waitCh: // Perform data flush directly
}
sn.brLock.Lock()
sn.br, br = br, sn.br // Lock the data and copy a new BR in
sn.brLock.Unlock()
currentTime := fasttime.UnixTimestamp()
if len(br.buf) < cap(br.buf)/4 && currentTime-brLastResetTime > 10 { // If the amount of data is small, the data is not transferred this time and saved to the next data transfer
// Free up capacity space occupied by br.buf in order to reduce memory usage after spikes.
br.buf = append(br.buf[:0:0], br.buf...)
brLastResetTime = currentTime
}
sn.checkHealth()
if len(br.buf) == 0 {
// Nothing to send.
continue
}
// Send br to replicas storageNodes starting from snIdx.
for! sendBufToReplicasNonblocking(&br, snIdx, replicas) {// Send data to the SN
t := timerpool.Get(200 * time.Millisecond) // Reset timer
select {
case <-stopCh:
timerpool.Put(t)
return
case <-t.C:
timerpool.Put(t)
sn.checkHealth()
}
}
br.reset() / / empty buf
}
func sendBufToReplicasNonblocking(br *bufRows, snIdx, replicas int) bool {
usedStorageNodes := make(map[*storageNode]bool, replicas) // Use map to mark whether the SN has transmitted data, ensuring that each SN stores only one copy
for i := 0; i < replicas; i++ { // Find an SN for each copy to store the data
idx := snIdx + i // The next I SNS of this SN are used as data storage nodes
attempts := 0
for {
attempts++
if attempts > len(storageNodes) { // No SN error can be found after iterating through all SN
if i == 0 { // An error is reported if none of the copies are stored successfully, and Warn is printed if any copies are stored successfully
// The data wasn't replicated at all.
logger.Warnf("cannot push %d bytes with %d rows to storage nodes, since all the nodes are temporarily unavailable; "+
"re-trying to send the data soon".len(br.buf), br.rows)
return false
}
// The data is partially replicated, so just emit a warning and return true.
// We could retry sending the data again, but this may result in uncontrolled duplicate data.
// So it is better returning true.
rowsIncompletelyReplicatedTotal.Add(br.rows)
logger.Warnf("cannot make a copy #%d out of %d copies according to -replicationFactor=%d for %d bytes with %d rows, "+
"since a part of storage nodes is temporarily unavailable", i+1, replicas, *replicationFactor, len(br.buf), br.rows)
return true
}
if idx >= len(storageNodes) {
idx %= len(storageNodes) // If idx is greater than SN, set this parameter
}
sn := storageNodes[idx]
idx++
if usedStorageNodes[sn] { // This SN has been stored, find the next one
// The br has been already replicated to sn. Skip it.
continue
}
if! sn.sendBufRowsNonblocking(br) {// Unable to send data, next
// Cannot send data to sn. Go to the next sn.
continue
}
// Successfully sent data to sn.
usedStorageNodes[sn] = true // The SN has been sent
break}}return true}}Copy the code
Netstorage is initialized by parsing storageNode parameters to obtain the storageNode address. Start a coroutine for each address and run the run function. Run function mainly by traversal of the channel to get the data, then sent via sendBufToReplicasNonblocking function data. SendBufToReplicasNonblocking to copy in the strategies such as judgment, current strategy is to set up a HashMap VM, then place a copy of each took the node HashValue set to True, so that a machine only store a copy of the data.
UnMarshalWorker Init
func StartUnmarshalWorkers(a) {
ifunmarshalWorkCh ! =nil {
logger.Panicf("BUG: it looks like startUnmarshalWorkers() has been alread called without stopUnmarshalWorkers()")
}
gomaxprocs := cgroup.AvailableCPUs() // Get the number of available cpus
unmarshalWorkCh = make(chan UnmarshalWork, gomaxprocs)
unmarshalWorkersWG.Add(gomaxprocs)
for i := 0; i < gomaxprocs; i++ { // Each CPU starts a worker to Sink
go func(a) {
defer unmarshalWorkersWG.Done()
for uw := range unmarshalWorkCh {
uw.Unmarshal()
}
}()
}
}
func ScheduleUnmarshalWork(uw UnmarshalWork) { // Put the UW into chan for scheduling
unmarshalWorkCh <- uw
}
Copy the code
The main function of StartUnmarshalWorkers is to get the number of available cpus based on cgroup parameters, and then Unmarshal one coroutine per CPU. The main work of Vminsert is to convert the collected data into data that VmStorage can use. ScheduleUnmarshalWork is a function that is called after data is obtained. The main function is to place the UW in the scheduling queue and wait for the UW to be scheduled and executed.
The Server part
The logic of this part is basically similar, the main difference is that the parsing method and process of each data are different, and the roughly used function names are similar. This part takes VmInsert as an example to explain.
func InsertHandler(c net.Conn) error {
bc, err := handshake.VMInsertServer(c, 0)
iferr ! =nil {
return fmt.Errorf("cannot perform vminsert handshake with client %q: %w", c.RemoteAddr(), err)
}
return writeconcurrencylimiter.Do(func(a) error {
return parser.ParseStream(bc, func(rows []storage.MetricRow) error {
return insertRows(rows) // Insert rows data})})}func getUnmarshalWork(a) *unmarshalWork {
v := unmarshalWorkPool.Get()
if v == nil {
return &unmarshalWork{}
}
return v.(*unmarshalWork)
}
func putUnmarshalWork(uw *unmarshalWork) {
uw.reset()
unmarshalWorkPool.Put(uw)
}
var unmarshalWorkPool sync.Pool
func ParseStream(bc *handshake.BufferedConn, callback func(rows []storage.MetricRow) error) error {
var wg sync.WaitGroup
var (
callbackErrLock sync.Mutex
callbackErr error
)
for {
uw := getUnmarshalWork() // Get the UW from sync.Pool
uw.callback = func(rows []storage.MetricRow) { // Set the uW callback function
iferr := callback(rows); err ! =nil {
processErrors.Inc()
callbackErrLock.Lock()
if callbackErr == nil {
callbackErr = fmt.Errorf("error when processing native block: %w", err)
}
callbackErrLock.Unlock()
}
}
uw.wg = &wg
var err error
uw.reqBuf, err = readBlock(uw.reqBuf[:0], bc) // Get a block of data from the network connection
iferr ! =nil {
wg.Wait()
if err == io.EOF {
// Remote end gracefully closed the connection.
putUnmarshalWork(uw) // If the read fails, put the UW in sync.Pool for next use
return nil
}
return fmt.Errorf("cannot read packet size: %w", err)
}
blocksRead.Inc()
wg.Add(1)
common.ScheduleUnmarshalWork(uw) // Send the UW to uW scheduling}}// Unmarshal implements common.UnmarshalWork
func (uw *unmarshalWork) Unmarshal(a) {
defer uw.wg.Done()
iferr := uw.unmarshal(); err ! =nil {
parseErrors.Inc()
logger.Errorf("error when unmarshaling clusternative block: %s", err)
putUnmarshalWork(uw)
return
}
mrs := uw.mrs
for len(mrs) > maxRowsPerCallback {
// Limit the number of rows passed to callback in order to reduce memory usage
// when processing big packets of rows.
uw.callback(mrs[:maxRowsPerCallback])
mrs = mrs[maxRowsPerCallback:]
}
uw.callback(mrs)
putUnmarshalWork(uw)
}
func insertRows(rows []storage.MetricRow) error {
ctx := netstorage.GetInsertCtx() // InsertCtx into pool InsertCtx
defer netstorage.PutInsertCtx(ctx)
ctx.Reset() // This line is required for initializing ctx internals. Initialize Ctx to clear the previous Ctx
hasRelabeling := relabel.HasRelabeling()
var at auth.Token
var rowsPerTenant *metrics.Counter
var mn storage.MetricName
for i := range rows {
mr := &rows[i]
iferr := mn.UnmarshalRaw(mr.MetricNameRaw); err ! =nil { // Get MetricName information from the encoded data
return fmt.Errorf("cannot unmarshal MetricNameRaw: %w", err)
}
if rowsPerTenant == nil|| mn.AccountID ! = at.AccountID || mn.ProjectID ! = at.ProjectID {// If the information of the AT is changed, fill it again
at.AccountID = mn.AccountID
at.ProjectID = mn.ProjectID
rowsPerTenant = rowsTenantInserted.Get(&at)
}
ctx.Labels = ctx.Labels[:0]
ctx.AddLabelBytes(nil, mn.MetricGroup)
for j := range mn.Tags {
tag := &mn.Tags[j]
ctx.AddLabelBytes(tag.Key, tag.Value)
}
if hasRelabeling {
ctx.ApplyRelabeling() // Do not understand this operation
}
if len(ctx.Labels) == 0 {
// Skip metric without labels.
continue
}
ctx.SortLabelsIfNeeded()
iferr := ctx.WriteDataPoint(&at, ctx.Labels, mr.Timestamp, mr.Value); err ! =nil { // Write data
return err
}
rowsPerTenant.Inc()
}
rowsInserted.Add(len(rows))
rowsPerInsert.Update(float64(len(rows)))
return ctx.FlushBufs() // Push the remaining data to the SN
}
func (ctx *InsertCtx) WriteDataPoint(at *auth.Token, labels []prompb.Label, timestamp int64, value float64) error {
ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], at.AccountID, at.ProjectID, labels) // Encode Label and AT-related information into [] bytes and package them together
storageNodeIdx := ctx.GetStorageNodeIdx(at, labels) // Hash the label information to find the SN for storage
return ctx.WriteDataPointExt(at, storageNodeIdx, ctx.MetricNameBuf, timestamp, value)
}
func (ctx *InsertCtx) GetStorageNodeIdx(at *auth.Token, labels []prompb.Label) int {
if len(storageNodes) == 1 {
// Fast path - only a single storage node.
return 0
}
buf := ctx.labelsBuf[:0]
buf = encoding.MarshalUint32(buf, at.AccountID)
buf = encoding.MarshalUint32(buf, at.ProjectID)
for i := range labels {
label := &labels[i]
buf = marshalBytesFast(buf, label.Name)
buf = marshalBytesFast(buf, label.Value)
}
h := xxhash.Sum64(buf) // Generates the Hash value
ctx.labelsBuf = buf
idx := int(jump.Hash(h, int32(len(storageNodes)))) // Generate index of corresponding SN
return idx
}
func (ctx *InsertCtx) WriteDataPointExt(at *auth.Token, storageNodeIdx int, metricNameRaw []byte, timestamp int64, value float64) error {
br := &ctx.bufRowss[storageNodeIdx]
sn := storageNodes[storageNodeIdx]
bufNew := storage.MarshalMetricRow(br.buf, metricNameRaw, timestamp, value) // Package ts and value information into buF
if len(bufNew) >= maxBufSizePerStorageNode { // If the existing data in br is too large, send it directly to SN
// Send buf to storageNode, since it is too big.
iferr := br.pushTo(sn); err ! =nil {
return err
}
br.buf = storage.MarshalMetricRow(bufNew[:0], metricNameRaw, timestamp, value) // Clear the previous data and set the most recent data into the BR
} else {
br.buf = bufNew // If the value does not reach the upper limit, assign the value directly
}
br.rows++
return nil
}
Copy the code