Wechat official account: Wu Qinqiang’s late night canteen

introduce

I came across an article written in 15 years. To be honest, the title really attracted me, but after reading it several times, it was really wonderful. I won’t translate this article directly. The requirement of the project is that the client sends the request and the server receives the request to process the data. That’s essentially what it is,

I changed the original business code slightly, but it didn’t affect the core module. In the first version, every time a Request is received, a Goroutine is started for processing, quick response, very normal operation.

The following code

).


package main

import (
	"fmt"
	"log"
	"net/http"
	"time"
)

type Payload struct {
	// It doesn't matter what you send
}

func (p *Payload) UpdateToS3(a) error {
	// Store logic and simulate operation time
	time.Sleep(500 * time.Millisecond)
	fmt.Println("Upload successful")
	return nil
}

func payloadHandler(w http.ResponseWriter, r *http.Request) {
	// Service filtering
	// Request body to parse......
	var p Payload
	go p.UpdateToS3()
	w.Write([]byte("Operation successful"))}func main(a) {
	http.HandleFunc("/payload", payloadHandler)
	log.Fatal(http.ListenAndServe(": 8099".nil))}Copy the code

What’s the problem with that? Under normal circumstances, there is no problem. But in a high-concurrency scenario, if you don’t control the number of Goroutines, your CPU usage skyrockets, your memory usage skyrockets, until the program crashes.

If this operation is performed on a database, such as mysql, the server disk IO, network bandwidth, CPU load, and memory consumption of your database will be very high, and it will crash. So, once there is something out of control in the program, it is often a red flag.

In the edition

package main

import (
	"fmt"
	"log"
	"net/http"
	"time"
)

const MaxQueue = 400

var Queue chan Payload

func init(a) {
	Queue = make(chan Payload, MaxQueue)
}

type Payload struct {
	// It doesn't matter what you send
}

func (p *Payload) UpdateToS3(a) error {
	// Store logic and simulate operation time
	time.Sleep(500 * time.Millisecond)
	fmt.Println("Upload successful")
	return nil
}

func payloadHandler(w http.ResponseWriter, r *http.Request) {
	// Service filtering
	// Request body to parse......
	var p Payload
	//go p.UpdateToS3()
	Queue <- p
	w.Write([]byte("Operation successful"))}// Handle the task
func StartProcessor(a) {
	for {
		select {
		case payload := <-Queue:
			payload.UpdateToS3()
		}
	}
}

func main(a) {
	http.HandleFunc("/payload", payloadHandler)
	// Open a separate g to receive and process tasks
	go StartProcessor()
	log.Fatal(http.ListenAndServe(": 8099".nil))}Copy the code

This version uses a buffered channel to do this, which controls unlimited Goroutine, but still doesn’t solve the problem.

Processing requests is a synchronous operation that processes only one task at a time, but with high concurrency requests come in faster than they can be processed. In this case, once the channel is full, subsequent requests will be blocked and so on. Then you will find that the response time will start to increase dramatically and there will be no response at all.

Final version

package main

import (
"fmt"
"log"
"net/http"
"time"
)

const (
	MaxWorker = 100 // Set arbitrary values
	MaxQueue  = 200 // Set arbitrary values
)

// A buffer channel for sending work requests
var JobQueue chan Job

func init(a) {
	JobQueue = make(chan Job, MaxQueue)
}

type Payload struct{}

type Job struct {
	PayLoad Payload
}

type Worker struct {
	WorkerPool chan chan Job
	JobChannel chan Job
	quit       chan bool
}

func NewWorker(workerPool chan chan Job) Worker {
	return Worker{
		WorkerPool: workerPool,
		JobChannel: make(chan Job),
		quit:       make(chan bool),}}// The Start method starts a worker loop, listens for exit channel, and stops the loop as needed
func (w Worker) Start(a) {
	go func(a) {
		for {
			// Register the current worker to the worker queue
			w.WorkerPool <- w.JobChannel
			select {
			case job := <-w.JobChannel:
				// Where the real business is
				// Simulate the operation time
				time.Sleep(500 * time.Millisecond)
				fmt.Printf("Uploaded successfully :%v\n", job)
			case <-w.quit:
				return(1)}}}}func (w Worker) stop(a) {
	go func(a) {
		w.quit <- true(1)}}// Initialize the operation

type Dispatcher struct {
	// Register the worker Channel pool of dispatcher
	WorkerPool chan chan Job
}

func NewDispatcher(maxWorkers int) *Dispatcher {
	pool := make(chan chan Job, maxWorkers)
	return &Dispatcher{WorkerPool: pool}
}

func (d *Dispatcher) Run(a) {
	// Start running n workers
	for i := 0; i < MaxWorker; i++ {
		worker := NewWorker(d.WorkerPool)
		worker.Start()
	}
	go d.dispatch()
}

func (d *Dispatcher) dispatch(a) {
	for {
		select {
		case job := <-JobQueue:
			go func(job Job) {
				// Try to get an available worker job channel and block until one is available
				jobChannel := <-d.WorkerPool
				// Distribute the task to the worker Job Channel
				jobChannel <- job
			}(job)
		}
	}
}

// Receive the request and screen the task into the JobQueue.
func payloadHandler(w http.ResponseWriter, r *http.Request) {
	work := Job{PayLoad: Payload{}}
	JobQueue <- work
	_, _ = w.Write([]byte("Operation successful"))}func main(a) {
	// Create a worker using the scheduler to listen for jobs from JobQueue
	d := NewDispatcher(MaxWorker)
	d.Run()
	http.HandleFunc("/payload", payloadHandler)
	log.Fatal(http.ListenAndServe(": 8099".nil))}Copy the code

Finally, two levels of channel are used. The first level is to put the user request data into the Chan Job, which is equivalent to the task queue to be processed.

The other level holds tasks that can be processedworkCache queue of typechan chan Job. The scheduler places the pending tasks in an idle cache queue,workWill always process its cache queue. In this way, aworkerPool. I’ve drawn a rough diagram to help you understand

First, when we receive a request, we create the Job task and place it in the task queue for the Work pool to process.

func payloadHandler(w http.ResponseWriter, r *http.Request) {
	work := Job{PayLoad: Payload{}}
	JobQueue <- work
	_, _ = w.Write([]byte("Operation successful"))}Copy the code

After the scheduler initializes the Work pool, in dispatch, once we receive a job from JobQueue, we try to get a available worker and distribute the job to the worker’s job channel. Note that this process is not synchronous, but each time a job is received, a G is turned on for processing. This ensures that JobQueue does not block, and that writes to JobQueue theoretically do not block.

func (d *Dispatcher) Run(a) {
	// Start running n workers
	for i := 0; i < MaxWorker; i++ {
		worker := NewWorker(d.WorkerPool)
		worker.Start()
	}
	go d.dispatch()
}

func (d *Dispatcher) dispatch(a) {
	for {
		select {
		case job := <-JobQueue:
			go func(job Job) {
				// Try to get an available worker job channel and block until one is available
				jobChannel := <-d.WorkerPool
				// Distribute the task to the worker Job Channel
				jobChannel <- job
			}(job)
		}
	}
}
Copy the code

Again, the uncontrollable G is different from the one up here. It is only in the blocking read Chan state for a very short time. When an idle worker is awakened, then the task is distributed. The whole life cycle is much shorter than the above operation.

Finally, IT is highly recommended to have a look at the original text. The original address is: Marcio.io /2015/07/ Han…