Preamble: These days in writing a tool script analysis of a large number of log files on the line, should have been a boring work, but in line with the principle of doing the ultimate, inspired me to keep thinking about how to optimize. This paper will explain the process of optimization bit by bit from the very beginning of the development process, and finally realize a Java-like worker thread pool with Golang, which has gained a lot.

One, the brainless goroutine stage

1. Task Background The function of this tool is briefly described as follows: Firstly, the amount of online logs is very large, and then it is necessary to read the content of log files. Then, it is necessary to analyze the log items one by one, match the desired log items and write them into files, and then extract the data in the files for calculation. Log entry format simulated data is as follows:

{
 "id":xx,"time":"2017-11-19"."key1":"value1"."key2":"value2". }Copy the code

Gorountine takes a look at the code (just the core code, leaving out the file manipulation and exception handling), and then gives you some ideas for this stage

Var wirteChan = make(chan []byte) // to write to the file var waitgroup sync.waitgroup // to control synchronization funcmain// Initialize the write channel InitWriter(outLogFileWriter) // Then iterate through each log file, opening a Gorountine for each log entryfor _,file := range logDir {
	    if file.IsDir() {
			continue
		} else {
			HandlerFile(arg.dir + "/"+ file.name ()) // process each file}}} /** * initialize Writer's channel */ func InitWriter(outLogFileWriter * bufio.writer) {gofunc() {
		forData := range wirteChan {nn, err := outlogfilewriter.write (data)}}()} Func HandlerFile(fileName string) {file, err := os.Open(fileName) defer file.Close() br := bufio.NewReader(file)for {
		data, err := br.ReadBytes('\n')
		if err == io.EOF {
			break
		} else{go Handler(data) // open one G ata time to process, write writeChannel}}}Copy the code

Parsing: Writing a blog is not like to enlarge the length of the code, so the above is only important code, the above code comments have said also do not repeat. Okay, so what’s wrong with this code? Let’s assume for now that we have a very large file in which each entry is executed without a G. So, if we run it, we will find that it is very slow. What’s the problem? First of all, we cannot control the number of G’s. Secondly, the log file is very large, so the number of G’s is very large when running. Multiple G’s need to write data to a channel, so serious blocking will occur. This method is not applicable for a variety of reasons

Second, join the buffered task queue

1. The task queue we said above that we can’t control the number of tasks, so I’m joining a task queue here to queue tasks and control the number of tasks. The code:

/** * Job structure, which contains the data to be processed and the processing function (this can be modified as needed) */typeJob struct {Data []byte Proc func([]byte)} Var JobQueue chan Job = make(chan Job, arg.maxqueue) // Func Handler(Data []byte) {JobQueue chan Job = make(chan Job, arg.maxqueue)for range job := <-Queue {
        job.Proc(Data)
    }
}
Copy the code

(2) The Job is a function called by the address of the function and the parameters of the function, so we can put the processing function into the Job. Then let the handler handle it. Think of here, a little bit admire oneself, then run cheerfully. Well, not much faster (depending on your handler, the Proc of Job). What? Calm down and analyze, I really feel so cute. I just wrapped the task and used a buffered task queue, and since the number of jobs created was much larger than the capacity of a single M, buffering just kicked the can down the road a bit.

Third, Job/Worker model

In fact, at this point, the mind on how to optimize has a little B number. I am reminded of the concept of thread pools in Java. I can create a thread pool with multiple workers (the number of workers can be specified), and each worker will fetch the task from the queue and continue to fetch the task after processing it. In order to improve versatility, the parameter types are changed to interface{}. Okay, so let’s look at the code, and the code here is critical, so I put it all up

typeJob struct {Data interface{} Proc func(interface{})} Var JobQueue chan Job = make(chan Job, arg.maxqueue) //Woker is used to execute the Job from the Job queuetypeWorker struct {WokerPool chan chan Job // // to create a NewWorker, pass the Worker pool parameter func NewWorker(wokerPool chan chan Job) Worker {returnWorker{ WokerPool: wokerPool, JobChannel: make(chan Job), Quit: Make (chan bool),}} (1) Register the worker's JobChannel with the WorkerPool (2) monitor whether there are new tasks coming on the JobChannel (3) monitor whether there are closed requests. Func (worker worker)Start() {
	go func() {
		forWorker. WokerPool < -worker. JobChannel select {worker.WokerPool < -worker. JobChannelcase job := <-worker.JobChannel:
				job.Proc(job.Data)
			caseQuit: = <-worker. quit: // You can exit directly after receiving the shutdown messageif quit {
					return}}}}()} //Worker shutdown: just send a shutdown signal to func (Worker Worker)Stop() {
	go func() {
		worker.Quit <- true}()} // The scheduler that manages workers, including the maximum number of workers and workerPooltypeStruct {MaxWorker int WorkerPool chan chan Job}Run() {// start maxworkerfori := 0; i < dispatcher.MaxWorker; I++ {worker := NewWorker(dispatcher.workerpool) worker.start () (dispatcher *Dispatcher)dispatch() {
	for {
		select {
		casejob := <-JobQueue: Go func(job job) {jobChannel := <-dispatcher.WorkerPool // Get an available worker jobChannel < -job // Send the job to the worker}(job)}} } // NewDispatcher(maxWorker int) *Dispatcher {make(chan chan Job, maxWorker)return &Dispatcher{
		WorkerPool: workerPool,
		MaxWorker:  maxWorker,
	}
}
Copy the code

Every sentence in the code is commented very clearly, will not repeat. We can start the model by saying dispatcher := NewDispatcher(MaxWorker) dispatcher.run (). It is important to note that the processing function should be written according to your business, packaged with the data as a Job and sent to the JobQueue. Then I ran my script, and tens of gigabytes of files went through three rounds of processing functions (that is, I needed three rounds of processing, each based on the results of the last round), which took between three and four minutes, with low CPU usage, etc. For those that take a lot of time, you can use the pprof tool to analyze where the slowness is

Four,

Because I just learned the concurrency principle of Golang before, and then happened to have this task, so I began to explore and optimize from zero. After writing the whole tool, I had a deeper understanding of golang concurrency, and I became familiar with locking and file operation. Harvest a lot of things, so I encourage to learn a new thing, not only understand the principle, but also their own hands, so that it is firm. In fact, there are still some deficiencies in this model, which will be further optimized in the future. During the period also referred to some very good blog, here also express thanks.