The flow chart

Source code structure

Introduction to Main

func main{
    envflag.Parse()   // Initialize environment variables, including CMD line and env
    netstorage.InitStorageNodes(*storageNodes)  // Initialize the storageNode
    relabel.Init()  //Init label
    storage.SetMaxLabelsPerTimeseries(*maxLabelsPerTimeseries)
    common.StartUnmarshalWorkers()  // Start Marshal's thread
    writeconcurrencylimiter.Init()  // Limit on the number of threads that can initially write. Default is CPU*4
    if len(*clusternativeListenAddr) > 0 {  // Vminsert forwards the data sent by the vminsert
		clusternativeServer = clusternativeserver.MustStart(*clusternativeListenAddr, func(c net.Conn) error {
			return clusternative.InsertHandler(c)
		})
	}
	if len(*graphiteListenAddr) > 0 {  // Graph type data forwarding, TCP and UDP long connection
		graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, func(r io.Reader) error {
			var at auth.Token // TODO: properly initialize auth token
			return graphite.InsertHandler(&at, r)
		})
	}
	if len(*influxListenAddr) > 0 {  // Data forwarding, TCP and UDP long connections
		influxServer = influxserver.MustStart(*influxListenAddr, func(r io.Reader) error {
			var at auth.Token // TODO: properly initialize auth token
			return influx.InsertHandlerForReader(&at, r)
		})
	}
	if len(*opentsdbListenAddr) > 0 {  // Opentsdb data forwarding, TCP and UDP long connection
		opentsdbServer = opentsdbserver.MustStart(*opentsdbListenAddr, func(r io.Reader) error {
			var at auth.Token // TODO: properly initialize auth token
			return opentsdb.InsertHandler(&at, r)
		}, opentsdbhttp.InsertHandler)
	}
	if len(*opentsdbHTTPListenAddr) > 0 {
		opentsdbhttpServer = opentsdbhttpserver.MustStart(*opentsdbHTTPListenAddr, opentsdbhttp.InsertHandler)
	}

	go func(a) {
		httpserver.Serve(*httpListenAddr, requestHandler)
	}()

	sig := procutil.WaitForSigterm()
	logger.Infof("service received signal %s", sig)

	logger.Infof("gracefully shutting down http service at %q", *httpListenAddr)
	startTime = time.Now()
	iferr := httpserver.Stop(*httpListenAddr); err ! =nil {
		logger.Fatalf("cannot stop http service: %s", err)
	}
	logger.Infof("successfully shut down http service in %.3f seconds", time.Since(startTime).Seconds())

	startTime = time.Now()
	netstorage.Stop()
	logger.Infof("successfully stopped netstorage in %.3f seconds", time.Since(startTime).Seconds())

	fs.MustStopDirRemover()

	logger.Infof("the vminsert has been stopped")}Copy the code

The source code of Main can be seen that Main is divided into three parts:

  1. Initialization part: for exampleenvandflagInitialization of. Then thenetstorageI’m going to initialize it, and then I’m going toUnmarshalWorkersandwriterLimiterInitialize. How each piece works will be explained later.
  2. Start the servervminsertData accept server,graphandopentsdbThe data receiver server is started, and the HttpServer is started.
  3. Exit processing:vminsertThe program exit gracefully, mainly by clearing some status information, and then flush the data cached in vminsert into vmStorage.

Initialize the

Env & Flag parsing

func Parse(a) {
	flag.Parse()
	if! *enable {// Whether to enable initialization of environment variables
		return
	}

	// Remember explicitly set command-line flags.
	flagsSet := make(map[string]bool)
	flag.Visit(func(f *flag.Flag) {
		flagsSet[f.Name] = true
	})

	// Obtain the remaining flag values from environment vars.
	flag.VisitAll(func(f *flag.Flag) {    // Find the value of the environment variable for each parameter you want to set
		if flagsSet[f.Name] {     // If CMD line is set, use cmdline
			// The flag is explicitly set via command-line.
			return
		}
		// Get flag value from environment var.
		fname := getEnvFlagName(f.Name)     // Get the value of the environment variable as the value, or set to the default if the environment variable is not present either
		if v, ok := os.LookupEnv(fname); ok {
			iferr := f.Value.Set(v); err ! =nil {
				// Do not use lib/logger here, since it is uninitialized yet.
				log.Fatalf("cannot set flag %s to %q, which is read from environment variable %q: %s", f.Name, v, fname, err)
			}
		}
	})
}
Copy the code

Envflag.parse () sets flag parameters. If environment variables are enabled, the command obtains parameters from the environment variables that are not specified by CMD.

NetStorage Init

func InitStorageNodes(addrs []string) {
	if len(addrs) > 255 {   // More than 255 nodes reported an error
		logger.Panicf("BUG: too much addresses: %d; max supported %d addresses".len(addrs), 255)
	}
    storageNodes = storageNodes[:0]  // Initialize the storageNode list
	for _, addr := range addrs { // Create sn for each ADDRS
		sn := &storageNode{
			dialer: netutil.NewTCPDialer("vminsert", addr),

			dialErrors:           metrics.NewCounter(fmt.Sprintf(`vm_rpc_dial_errors_total{name="vminsert", addr=%q}`, addr)),
			handshakeErrors:      metrics.NewCounter(fmt.Sprintf(`vm_rpc_handshake_errors_total{name="vminsert", addr=%q}`, addr)),
			connectionErrors:     metrics.NewCounter(fmt.Sprintf(`vm_rpc_connection_errors_total{name="vminsert", addr=%q}`, addr)),
			rowsPushed:           metrics.NewCounter(fmt.Sprintf(`vm_rpc_rows_pushed_total{name="vminsert", addr=%q}`, addr)),
			rowsSent:             metrics.NewCounter(fmt.Sprintf(`vm_rpc_rows_sent_total{name="vminsert", addr=%q}`, addr)),
			rowsReroutedFromHere: metrics.NewCounter(fmt.Sprintf(`vm_rpc_rows_rerouted_from_here_total{name="vminsert", addr=%q}`, addr)),
			rowsReroutedToHere:   metrics.NewCounter(fmt.Sprintf(`vm_rpc_rows_rerouted_to_here_total{name="vminsert", addr=%q}`, addr)),
		}
		.... 
		storageNodes = append(storageNodes, sn)  // Add the created SN to the SN list
	}
    // Set the cache value for insert.// Execute for each SN
    for idx, sn := range storageNodes {
		storageNodesWG.Add(1)
		go func(sn *storageNode, idx int) {
			sn.run(storageNodesStopCh, idx)
			storageNodesWG.Done()
		}(sn, idx)
	}

	rerouteWorkerWG.Add(1)
	go func(a) {
		rerouteWorker(rerouteWorkerStopCh)
		rerouteWorkerWG.Done()
	}()
}

func (sn *storageNode) run(stopCh <-chan struct{}, snIdx int) {
    // Judge each copy. The maximum number of copies does not exceed the length of the SN list
	replicas := *replicationFactor
	if replicas <= 0 {
		replicas = 1
	}
	if replicas > len(storageNodes) {
		replicas = len(storageNodes)
	}
	ticker := time.NewTicker(200 * time.Millisecond)  //200ms for a refresh
    defer ticker.Stop()
	var br bufRows
	brLastResetTime := fasttime.UnixTimestamp()
	var waitCh <-chan struct{}
	mustStop := false
	for! mustStop {// Loop until the program stops
		sn.brLock.Lock()
		bufLen := len(sn.br.buf)  // Determine the length of buF
		sn.brLock.Unlock()
		waitCh = nil
		if bufLen > 0 {
			// Do not sleep if sn.br.buf isn't empty.
			waitCh = closedCh  If buf is not empty, flush directly
		}
		select {
		case <-stopCh:
			mustStop = true   // Stop the program
			// Make sure the sn.buf is flushed last time before returning
			// in order to send the remaining bits of data.
		case <-ticker.C:  / / 200 ms to flush
		case <-waitCh:  // Perform data flush directly
		}
		sn.brLock.Lock()
		sn.br, br = br, sn.br  // Lock the data and copy a new BR in
		sn.brLock.Unlock()
		currentTime := fasttime.UnixTimestamp()
		if len(br.buf) < cap(br.buf)/4 && currentTime-brLastResetTime > 10 {  // If the amount of data is small, the data is not transferred this time and saved to the next data transfer
			// Free up capacity space occupied by br.buf in order to reduce memory usage after spikes.
			br.buf = append(br.buf[:0:0], br.buf...)
			brLastResetTime = currentTime
		}
		sn.checkHealth()
		if len(br.buf) == 0 {
			// Nothing to send.
			continue
		}
		// Send br to replicas storageNodes starting from snIdx.
		for! sendBufToReplicasNonblocking(&br, snIdx, replicas) {// Send data to the SN
			t := timerpool.Get(200 * time.Millisecond) // Reset timer
			select {
			case <-stopCh:
				timerpool.Put(t)
				return
			case <-t.C:
				timerpool.Put(t)
				sn.checkHealth()
			}
		}
		br.reset()  / / empty buf
	}
   
   
  func sendBufToReplicasNonblocking(br *bufRows, snIdx, replicas int) bool {
	usedStorageNodes := make(map[*storageNode]bool, replicas)  // Use map to mark whether the SN has transmitted data, ensuring that each SN stores only one copy
	for i := 0; i < replicas; i++ { // Find an SN for each copy to store the data
		idx := snIdx + i // The next I SNS of this SN are used as data storage nodes
		attempts := 0
		for {
			attempts++
			if attempts > len(storageNodes) {  // No SN error can be found after iterating through all SN
				if i == 0 { // An error is reported if none of the copies are stored successfully, and Warn is printed if any copies are stored successfully
					// The data wasn't replicated at all.
					logger.Warnf("cannot push %d bytes with %d rows to storage nodes, since all the nodes are temporarily unavailable; "+
						"re-trying to send the data soon".len(br.buf), br.rows)
					return false
				}
				// The data is partially replicated, so just emit a warning and return true.
				// We could retry sending the data again, but this may result in uncontrolled duplicate data.
				// So it is better returning true.
				rowsIncompletelyReplicatedTotal.Add(br.rows)
				logger.Warnf("cannot make a copy #%d out of %d copies according to -replicationFactor=%d for %d bytes with %d rows, "+
					"since a part of storage nodes is temporarily unavailable", i+1, replicas, *replicationFactor, len(br.buf), br.rows)
				return true
			}
			if idx >= len(storageNodes) {
				idx %= len(storageNodes)  // If idx is greater than SN, set this parameter
			}
			sn := storageNodes[idx]
			idx++
			if usedStorageNodes[sn] { // This SN has been stored, find the next one
				// The br has been already replicated to sn. Skip it.
				continue
			}
			if! sn.sendBufRowsNonblocking(br) {// Unable to send data, next
				// Cannot send data to sn. Go to the next sn.
				continue
			}
			// Successfully sent data to sn.
			usedStorageNodes[sn] = true  // The SN has been sent
			break}}return true}}Copy the code

Netstorage is initialized by parsing storageNode parameters to obtain the storageNode address. Start a coroutine for each address and run the run function. Run function mainly by traversal of the channel to get the data, then sent via sendBufToReplicasNonblocking function data. SendBufToReplicasNonblocking to copy in the strategies such as judgment, current strategy is to set up a HashMap VM, then place a copy of each took the node HashValue set to True, so that a machine only store a copy of the data.

UnMarshalWorker Init

func StartUnmarshalWorkers(a) {
	ifunmarshalWorkCh ! =nil {
		logger.Panicf("BUG: it looks like startUnmarshalWorkers() has been alread called without stopUnmarshalWorkers()")
	}
	gomaxprocs := cgroup.AvailableCPUs() // Get the number of available cpus
	unmarshalWorkCh = make(chan UnmarshalWork, gomaxprocs)
	unmarshalWorkersWG.Add(gomaxprocs)
	for i := 0; i < gomaxprocs; i++ { // Each CPU starts a worker to Sink
		go func(a) {
			defer unmarshalWorkersWG.Done()
			for uw := range unmarshalWorkCh {
				uw.Unmarshal()
			}
		}()
	}
}

func ScheduleUnmarshalWork(uw UnmarshalWork) {  // Put the UW into chan for scheduling
	unmarshalWorkCh <- uw
}
Copy the code

The main function of StartUnmarshalWorkers is to get the number of available cpus based on cgroup parameters, and then Unmarshal one coroutine per CPU. The main work of Vminsert is to convert the collected data into data that VmStorage can use. ScheduleUnmarshalWork is a function that is called after data is obtained. The main function is to place the UW in the scheduling queue and wait for the UW to be scheduled and executed.

The Server part

The logic of this part is basically similar, the main difference is that the parsing method and process of each data are different, and the roughly used function names are similar. This part takes VmInsert as an example to explain.

func InsertHandler(c net.Conn) error {
	bc, err := handshake.VMInsertServer(c, 0)
	iferr ! =nil {
		return fmt.Errorf("cannot perform vminsert handshake with client %q: %w", c.RemoteAddr(), err)
	}
	return writeconcurrencylimiter.Do(func(a) error {
		return parser.ParseStream(bc, func(rows []storage.MetricRow) error {
			return insertRows(rows)   // Insert rows data})})}func getUnmarshalWork(a) *unmarshalWork {
	v := unmarshalWorkPool.Get()
	if v == nil {
		return &unmarshalWork{}
	}
	return v.(*unmarshalWork)
}

func putUnmarshalWork(uw *unmarshalWork) {
	uw.reset()
	unmarshalWorkPool.Put(uw)
}

var unmarshalWorkPool sync.Pool
func ParseStream(bc *handshake.BufferedConn, callback func(rows []storage.MetricRow) error) error {
	var wg sync.WaitGroup
	var (
		callbackErrLock sync.Mutex
		callbackErr     error
	)
	for {
		uw := getUnmarshalWork()  // Get the UW from sync.Pool
		uw.callback = func(rows []storage.MetricRow) {  // Set the uW callback function
			iferr := callback(rows); err ! =nil {
				processErrors.Inc()
				callbackErrLock.Lock()
				if callbackErr == nil {
					callbackErr = fmt.Errorf("error when processing native block: %w", err)
				}
				callbackErrLock.Unlock()
			}
		}
		uw.wg = &wg
		var err error
		uw.reqBuf, err = readBlock(uw.reqBuf[:0], bc)   // Get a block of data from the network connection
		iferr ! =nil {
			wg.Wait()
			if err == io.EOF {
				// Remote end gracefully closed the connection.
				putUnmarshalWork(uw)    // If the read fails, put the UW in sync.Pool for next use
				return nil
			}
			return fmt.Errorf("cannot read packet size: %w", err)
		}
		blocksRead.Inc()
		wg.Add(1)
		common.ScheduleUnmarshalWork(uw)   // Send the UW to uW scheduling}}// Unmarshal implements common.UnmarshalWork
func (uw *unmarshalWork) Unmarshal(a) {
	defer uw.wg.Done()
	iferr := uw.unmarshal(); err ! =nil {
		parseErrors.Inc()
		logger.Errorf("error when unmarshaling clusternative block: %s", err)
		putUnmarshalWork(uw)
		return
	}
	mrs := uw.mrs
	for len(mrs) > maxRowsPerCallback {
		// Limit the number of rows passed to callback in order to reduce memory usage
		// when processing big packets of rows.
		uw.callback(mrs[:maxRowsPerCallback])
		mrs = mrs[maxRowsPerCallback:]
	}
	uw.callback(mrs)
	putUnmarshalWork(uw)
}

func insertRows(rows []storage.MetricRow) error {
	ctx := netstorage.GetInsertCtx()   // InsertCtx into pool InsertCtx
	defer netstorage.PutInsertCtx(ctx)

	ctx.Reset() // This line is required for initializing ctx internals. Initialize Ctx to clear the previous Ctx
	hasRelabeling := relabel.HasRelabeling()
	var at auth.Token
	var rowsPerTenant *metrics.Counter
	var mn storage.MetricName
	for i := range rows {
		mr := &rows[i]
		iferr := mn.UnmarshalRaw(mr.MetricNameRaw); err ! =nil {  // Get MetricName information from the encoded data
			return fmt.Errorf("cannot unmarshal MetricNameRaw: %w", err)
		}
		if rowsPerTenant == nil|| mn.AccountID ! = at.AccountID || mn.ProjectID ! = at.ProjectID {// If the information of the AT is changed, fill it again
			at.AccountID = mn.AccountID
			at.ProjectID = mn.ProjectID
			rowsPerTenant = rowsTenantInserted.Get(&at)
		}
		ctx.Labels = ctx.Labels[:0]
		ctx.AddLabelBytes(nil, mn.MetricGroup)
		for j := range mn.Tags {
			tag := &mn.Tags[j]
			ctx.AddLabelBytes(tag.Key, tag.Value)
		}
		if hasRelabeling {
			ctx.ApplyRelabeling()  // Do not understand this operation
		}
		if len(ctx.Labels) == 0 {
			// Skip metric without labels.
			continue
		}
		ctx.SortLabelsIfNeeded()
		iferr := ctx.WriteDataPoint(&at, ctx.Labels, mr.Timestamp, mr.Value); err ! =nil {   // Write data
			return err
		}
		rowsPerTenant.Inc()
	}
	rowsInserted.Add(len(rows))
	rowsPerInsert.Update(float64(len(rows)))
	return ctx.FlushBufs()  // Push the remaining data to the SN
}

func (ctx *InsertCtx) WriteDataPoint(at *auth.Token, labels []prompb.Label, timestamp int64, value float64) error {
	ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], at.AccountID, at.ProjectID, labels)  // Encode Label and AT-related information into [] bytes and package them together
	storageNodeIdx := ctx.GetStorageNodeIdx(at, labels)    // Hash the label information to find the SN for storage
	return ctx.WriteDataPointExt(at, storageNodeIdx, ctx.MetricNameBuf, timestamp, value)
}

func (ctx *InsertCtx) GetStorageNodeIdx(at *auth.Token, labels []prompb.Label) int {
	if len(storageNodes) == 1 {
		// Fast path - only a single storage node.
		return 0
	}

	buf := ctx.labelsBuf[:0]
	buf = encoding.MarshalUint32(buf, at.AccountID)
	buf = encoding.MarshalUint32(buf, at.ProjectID)
	for i := range labels {
		label := &labels[i]
		buf = marshalBytesFast(buf, label.Name)
		buf = marshalBytesFast(buf, label.Value)
	}
	h := xxhash.Sum64(buf)  // Generates the Hash value
	ctx.labelsBuf = buf

	idx := int(jump.Hash(h, int32(len(storageNodes)))) // Generate index of corresponding SN
	return idx
}

func (ctx *InsertCtx) WriteDataPointExt(at *auth.Token, storageNodeIdx int, metricNameRaw []byte, timestamp int64, value float64) error {
	br := &ctx.bufRowss[storageNodeIdx]
	sn := storageNodes[storageNodeIdx]
	bufNew := storage.MarshalMetricRow(br.buf, metricNameRaw, timestamp, value)   // Package ts and value information into buF
	if len(bufNew) >= maxBufSizePerStorageNode {  // If the existing data in br is too large, send it directly to SN
		// Send buf to storageNode, since it is too big.
		iferr := br.pushTo(sn); err ! =nil {
			return err
		}
		br.buf = storage.MarshalMetricRow(bufNew[:0], metricNameRaw, timestamp, value)  // Clear the previous data and set the most recent data into the BR
	} else {
		br.buf = bufNew  // If the value does not reach the upper limit, assign the value directly
	}
	br.rows++
	return nil
}
Copy the code