Wechat official account: Wu Qinqiang’s late night canteen
introduce
I came across an article written in 15 years. To be honest, the title really attracted me, but after reading it several times, it was really wonderful. I won’t translate this article directly. The requirement of the project is that the client sends the request and the server receives the request to process the data. That’s essentially what it is,
I changed the original business code slightly, but it didn’t affect the core module. In the first version, every time a Request is received, a Goroutine is started for processing, quick response, very normal operation.
The following code
).
package main
import (
"fmt"
"log"
"net/http"
"time"
)
type Payload struct {
// It doesn't matter what you send
}
func (p *Payload) UpdateToS3(a) error {
// Store logic and simulate operation time
time.Sleep(500 * time.Millisecond)
fmt.Println("Upload successful")
return nil
}
func payloadHandler(w http.ResponseWriter, r *http.Request) {
// Service filtering
// Request body to parse......
var p Payload
go p.UpdateToS3()
w.Write([]byte("Operation successful"))}func main(a) {
http.HandleFunc("/payload", payloadHandler)
log.Fatal(http.ListenAndServe(": 8099".nil))}Copy the code
What’s the problem with that? Under normal circumstances, there is no problem. But in a high-concurrency scenario, if you don’t control the number of Goroutines, your CPU usage skyrockets, your memory usage skyrockets, until the program crashes.
If this operation is performed on a database, such as mysql, the server disk IO, network bandwidth, CPU load, and memory consumption of your database will be very high, and it will crash. So, once there is something out of control in the program, it is often a red flag.
In the edition
package main
import (
"fmt"
"log"
"net/http"
"time"
)
const MaxQueue = 400
var Queue chan Payload
func init(a) {
Queue = make(chan Payload, MaxQueue)
}
type Payload struct {
// It doesn't matter what you send
}
func (p *Payload) UpdateToS3(a) error {
// Store logic and simulate operation time
time.Sleep(500 * time.Millisecond)
fmt.Println("Upload successful")
return nil
}
func payloadHandler(w http.ResponseWriter, r *http.Request) {
// Service filtering
// Request body to parse......
var p Payload
//go p.UpdateToS3()
Queue <- p
w.Write([]byte("Operation successful"))}// Handle the task
func StartProcessor(a) {
for {
select {
case payload := <-Queue:
payload.UpdateToS3()
}
}
}
func main(a) {
http.HandleFunc("/payload", payloadHandler)
// Open a separate g to receive and process tasks
go StartProcessor()
log.Fatal(http.ListenAndServe(": 8099".nil))}Copy the code
This version uses a buffered channel to do this, which controls unlimited Goroutine, but still doesn’t solve the problem.
Processing requests is a synchronous operation that processes only one task at a time, but with high concurrency requests come in faster than they can be processed. In this case, once the channel is full, subsequent requests will be blocked and so on. Then you will find that the response time will start to increase dramatically and there will be no response at all.
Final version
package main
import (
"fmt"
"log"
"net/http"
"time"
)
const (
MaxWorker = 100 // Set arbitrary values
MaxQueue = 200 // Set arbitrary values
)
// A buffer channel for sending work requests
var JobQueue chan Job
func init(a) {
JobQueue = make(chan Job, MaxQueue)
}
type Payload struct{}
type Job struct {
PayLoad Payload
}
type Worker struct {
WorkerPool chan chan Job
JobChannel chan Job
quit chan bool
}
func NewWorker(workerPool chan chan Job) Worker {
return Worker{
WorkerPool: workerPool,
JobChannel: make(chan Job),
quit: make(chan bool),}}// The Start method starts a worker loop, listens for exit channel, and stops the loop as needed
func (w Worker) Start(a) {
go func(a) {
for {
// Register the current worker to the worker queue
w.WorkerPool <- w.JobChannel
select {
case job := <-w.JobChannel:
// Where the real business is
// Simulate the operation time
time.Sleep(500 * time.Millisecond)
fmt.Printf("Uploaded successfully :%v\n", job)
case <-w.quit:
return(1)}}}}func (w Worker) stop(a) {
go func(a) {
w.quit <- true(1)}}// Initialize the operation
type Dispatcher struct {
// Register the worker Channel pool of dispatcher
WorkerPool chan chan Job
}
func NewDispatcher(maxWorkers int) *Dispatcher {
pool := make(chan chan Job, maxWorkers)
return &Dispatcher{WorkerPool: pool}
}
func (d *Dispatcher) Run(a) {
// Start running n workers
for i := 0; i < MaxWorker; i++ {
worker := NewWorker(d.WorkerPool)
worker.Start()
}
go d.dispatch()
}
func (d *Dispatcher) dispatch(a) {
for {
select {
case job := <-JobQueue:
go func(job Job) {
// Try to get an available worker job channel and block until one is available
jobChannel := <-d.WorkerPool
// Distribute the task to the worker Job Channel
jobChannel <- job
}(job)
}
}
}
// Receive the request and screen the task into the JobQueue.
func payloadHandler(w http.ResponseWriter, r *http.Request) {
work := Job{PayLoad: Payload{}}
JobQueue <- work
_, _ = w.Write([]byte("Operation successful"))}func main(a) {
// Create a worker using the scheduler to listen for jobs from JobQueue
d := NewDispatcher(MaxWorker)
d.Run()
http.HandleFunc("/payload", payloadHandler)
log.Fatal(http.ListenAndServe(": 8099".nil))}Copy the code
Finally, two levels of channel are used. The first level is to put the user request data into the Chan Job, which is equivalent to the task queue to be processed.
The other level holds tasks that can be processedwork
Cache queue of typechan chan Job
. The scheduler places the pending tasks in an idle cache queue,work
Will always process its cache queue. In this way, aworker
Pool. I’ve drawn a rough diagram to help you understand
First, when we receive a request, we create the Job task and place it in the task queue for the Work pool to process.
func payloadHandler(w http.ResponseWriter, r *http.Request) {
work := Job{PayLoad: Payload{}}
JobQueue <- work
_, _ = w.Write([]byte("Operation successful"))}Copy the code
After the scheduler initializes the Work pool, in dispatch, once we receive a job from JobQueue, we try to get a available worker and distribute the job to the worker’s job channel. Note that this process is not synchronous, but each time a job is received, a G is turned on for processing. This ensures that JobQueue does not block, and that writes to JobQueue theoretically do not block.
func (d *Dispatcher) Run(a) {
// Start running n workers
for i := 0; i < MaxWorker; i++ {
worker := NewWorker(d.WorkerPool)
worker.Start()
}
go d.dispatch()
}
func (d *Dispatcher) dispatch(a) {
for {
select {
case job := <-JobQueue:
go func(job Job) {
// Try to get an available worker job channel and block until one is available
jobChannel := <-d.WorkerPool
// Distribute the task to the worker Job Channel
jobChannel <- job
}(job)
}
}
}
Copy the code
Again, the uncontrollable G is different from the one up here. It is only in the blocking read Chan state for a very short time. When an idle worker is awakened, then the task is distributed. The whole life cycle is much shorter than the above operation.
Finally, IT is highly recommended to have a look at the original text. The original address is: Marcio.io /2015/07/ Han…