When we use goroutine to execute functions concurrently, we can take advantage of sync.waitgroup {}, which looks like this:

func testGoroutine() {
	wg := sync.WaitGroup{}
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {
			wg.Done()
			fmt.Println("hello world")
		}()
	}
	wg.Wait()
}
Copy the code

After looking at the code above, we need to consider that if the Goroutine is hanging because some RPC requests are too slow, the Goroutine will be stuck in WG.wait () and the request will fail

Unless the framework you are using provides a timeout capability, or you go out of the RPC request has timeout break capability

So how do we keep the code from being destroyedhangLive?

The simplest solution is to increase the timeout!

There’s actually a lot of ways to do timeouts

  • Based on thectxcontext.WithTimeOut()implementation
  • Based on theselectimplementation

I’m going to do timeout based on select just to show you how the code works. Okay

func testWithGoroutineTimeOut() {
	var wg sync.WaitGroup
	done := make(chan struct{})
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {defer wg.done ()}()} // Wg.wait () also go out to prevent blocking the go at wG.wait ()func() {
		wg.Wait()
		close(done}() select {// Finish normallycase< - done: / / timeoutcase <-time.After(500 * time.Millisecond):
	}
}
Copy the code

As you can see, the above code already implements timeout based on SELECT

But we have higher requirements for this interface.

  • goroutineNo error handling,
  • At this timegoOut of thegoroutineQuantity is dependentforThe number of cycles, let’s sayforCycle 100W times, causegoroutineToo many problems

You can write a coroutine pool to solve the problem of goroutine overload.

We can use sync waitGroup+ non-blocking channel as follows:

package ezgopool

import "sync"

// goroutine pool
typeGoroutinePool struct{c chan struct{} wg * sync.waitgroup} block func when channel is full NewGoroutinePool(maxSize int) *GoroutinePool {if maxSize <= 0 {
		panic("max size too small")}return &GoroutinePool{
		c:  make(chan struct{}, maxSize),
		wg: new(sync.WaitGroup),
	}
}

// add
func (g *GoroutinePool) Add(delta int) {
	g.wg.Add(delta)
	for i := 0; i < delta; i++ {
		g.c <- struct{}{}
	}

}

// done
func (g *GoroutinePool) Done() {
	<-g.c
	g.wg.Done()
}

// wait
func (g *GoroutinePool) Wait() {
	g.wg.Wait()
}

Copy the code

This is the implementation of the coroutine pool, which is actually quite simple. My blog also documents another open source implementation of the Golang coroutine pool, which can be found at juejin.cn/post/684490…

Then finally our timeout + error fast return + coroutine pool model is complete

func testGoroutineWithTimeOut() {
	 wg :=sync.WaitGroup{}
	done:= make(chan struct{}) // create block chan errChan := make(chan error) pool.newgoroutinepool (10)for i := 0; i < 10; i++ {
		pool.Add(1)
		go func() {
			pool.Done()
			iferr! =nil{ errChan<-errors.New("error")
			}
		}()
	}

	go func() {
		pool.Wait()
		close(done}() select {// error fast return, apply to get interfacecase err := <-errChan:
		return nil, err
	case <-done:
	case <-time.After(500 * time.Millisecond):
	}
}

Copy the code

thank you