Preface: This project is used to record my (647) study and accumulation in Go language direction. This series is fairly basic and is recommended for developers who want to get started with Go. The directory is as follows: Go language foundation (1) — Introduction, environment configuration, HelloWorld Go language foundation (2) — Basic common syntax Go language foundation (3) — Object-oriented programming Go language foundation (4) — high-quality fault tolerance Go language foundation (5) — Concurrent programming Go language foundation (6) — Testing, Reflection, Unsafe Go Basics (7) — Architecture & Common Tasks Go Basics (8) — Performance Tuning


This paper will introduce the following contents: 1. Coroutine mechanism (Groutine) 2. Shared memory concurrency (coroutine security) 3.CSP concurrency (Channel) 4. Multipath selection and timeout control (SELECT) 5. Channel closure and broadcast (channel) 6. Canceling tasks 7. Canceling Context and associated tasks 8. Common concurrent tasks (actual combat)

1. Coroutine mechanism

I’m sure you all know the concept of “thread” and “process”.

In Go, “coroutines” can be understood as more lightweight threads. The efficiency of system Kernel can be maximized by scheduling “coroutines”.

Using a table, let’s compare coroutines with threads.

  • Thread vs. Groutine:
\ Default stack size (when created) Kernel Space Entity (KSE)
Thread the Thread 1M 1:1.
Coroutines Groutine 2K M : N

The advantages of coroutines vs. threads are:

  • Switching between threads involves system threads in the kernel (kernel entity), which will cause a large cost.
  • While multiple coroutines in the same system thread (kernel entity) to reduce the number of switching system threads (kernel entity). (As shown in the picture above)

Use of coroutines:

Syntax: go + func

func TestGroutine(t *testing.T) {
	for i := 0; i < 10; i++ {
		go func(i int) {
			fmt.Println(i) // Correct case, value pass. There is no competition between coroutines.
		}(i)

		// go func() {
		// fmt.println (I) // Error case, shared variable. The coroutines are competing
		/ /} ()
	}
	time.Sleep(time.Millisecond * 50)}Copy the code

Ii. Shared memory concurrency mechanism (coroutine security)

When it comes to coroutine security, the first thing that comes to mind is locking. The coroutine is secured by locking.

The same is true in Go. Let’s look at an example.

  • Coroutine concurrency, resulting in coroutine insecurity:
// Coroutines are unsafe demo
func TestCounterThreadUnsafe(t *testing.T) {
	counter := 0
	for i := 0; i < 5000; i++ {
		go func(a) {
			counter++
		}()
	}
	time.Sleep(1 * time.Second)
	t.Logf("counter = %d", counter)
}
Copy the code

The results are as follows:

=== RUN   TestCounterThreadUnsafe
--- PASS: TestCounterThreadUnsafe (1.00s)
    share_mem_test.go:18: counter = 4765
Copy the code

At this point, you will find that the calculation was wrong because of the concurrency that caused the missing value.

  • Solution 1: Common locking and delay waiting for coroutine execution to complete (not recommended)
// Coroutines wait for demo (stop 1 second, not recommended)
func TestCounterThreadSafe(t *testing.T) {
	var mut sync.Mutex
	counter := 0
	for i := 0; i < 5000; i++ {
		go func(a) {
			defer func(a) {
				mut.Unlock() // After the function call is complete: unlock to ensure the security of the coroutine
			}()
			mut.Lock() // Before the function is called: lock to keep the coroutine safe
			counter++
		}()
	}
	time.Sleep(1 * time.Second) // Wait one second for the coroutine to complete
	t.Logf("counter = %d", counter)
}
Copy the code

The results are as follows:

=== RUN   TestCounterThreadSafe
--- PASS: TestCounterThreadSafe (1.01s)
    share_mem_test.go:35: counter = 5000
Copy the code

It’s correct, but there’s a problem. Because there’s a one-second delay waiting for the coroutine to run before calling the result. So is there a better way to handle it? Now let’s optimize it a little bit.

  • Solution 2:Recommended!!!Using synchronous wait queues (WaitGroup) ensure sequential execution.
// Coroutine security Demo
func TestCounterWaitGroup(t *testing.T) {
	var mut sync.Mutex    / / the mutex
	var wg sync.WaitGroup // Wait for the queue
	counter := 0
	for i := 0; i < 5000; i++ {
		wg.Add(1) // Add a task
		go func(a) {
			defer func(a) {
				mut.Unlock() // After the function call is complete: unlock to ensure the security of the coroutine
			}()
			mut.Lock() // Before the function is called: lock to keep the coroutine safe
			counter++
			wg.Done() // Finish the task
		}()
	}
	wg.Wait() // Wait for all tasks to complete
	t.Logf("counter = %d", counter)
}
Copy the code

The running results are as follows:

=== RUN   TestCounterWaitGroup
--- PASS: TestCounterWaitGroup (0.00s)
    share_mem_test.go:55: counter = 5000
Copy the code

In this case, the Mutex and WaitGroup not only guarantee the security of the coroutine, but also prevent the result from being printed ahead of time. (✔ ️)


CSP concurrency mechanism

1. CSP

3. CSP (Communicating sequential processes) : Communicating in a pipeline. In simple terms, CSPS communicate through channels.

Channels in Go have capacity constraints and are independent of processing Groutine (coroutines).

2. Channel

There are two common channels in Go, corresponding to Channel and Buffer Channel respectively.

  • The first type: Channel (unbuffered)

First, the sender and receiver must be on both sides of a Channel to interact. If one party is absent, the other party blocks at one end and does not interact until both sides are present.

Make (chan [type])

retChannel := make(chan string) // Create an unbuffered channel and specify that the data in the channel is a string, double-ended wait
Copy the code

Input syntax: channel <-

channel <- object / / input channel
Copy the code

Get syntax: < -channel

object <- channel / / channel output
Copy the code
  • Second type: Buffer Channel

This is a slightly more advanced way of channeling (more loosely coupled).

First, set a capacity for the Channel and do not require the sender and receiver to be on both ends at the same time. The sender then keeps sending messages to the Channel as buffers. A Channel is blocked until its capacity is full. In this case, as long as the receiver receives the message (that is, the Channel has free capacity), the sender will continue to send the message.

Make (chan [type], Int)

retChannel := make(chan string.1) // Create a buffered channel and specify the data in the channel as string
Copy the code

Input syntax: channel <-

channel <- object / / input channel
Copy the code

Get syntax: < -channel

object <- channel / / channel output
Copy the code

Demo: Simulating the method call process of a network request, controlling the current coroutine to perform other tasks in the waiting process of network request through Channel.

// Simulate a network request
func serviceTask(a) string {
	fmt.Println("- start working on service task.")
	time.Sleep(time.Millisecond * 50)
	return "- service task is Done."
}

// Other tasks
func otherTask(a) {
	fmt.Println("start working on something else.")
	time.Sleep(time.Millisecond * 100)
	fmt.Println("other task is Done.")}// CSP asynchronous pipe
func AsyncService(a) chan string {
	retChannel := make(chan string) // Unbuffered channel, create and specify the data in the channel as string, double-ended wait
	RetChannel := make(chan string, 1
	go func(a) {
		ret := serviceTask()
		fmt.Println("returned result.")
		retChannel <- ret / / input channel
		fmt.Println("service exited.")
	}()
	return retChannel
}

func TestAsyncService(t *testing.T) {
	retCh := AsyncService()
	otherTask()
	fmt.Println(<-retCh) / / channel output
	time.Sleep(time.Second * 1)}Copy the code

4. Multi-path selection and timeout control

Use the select keyword to complete the “multiple selection” and “timeout control”.

  • Multiplexing: when returnedchannelWhen there may be more than one, select can be used to handle multiple response events.

Note: This is similar to switch, but note that it is not sequential. That is, if channel1 and channel2 meet the requirements at the same time, channel1 or Channel2 can be used instead of the same sequence as switch.

The Demo:

	select {
	case ret := <-channel1: 
		t.Log(ret)
	case ret:= <- channel2:
		t.Log(ret)
	case default:
		t.Error("No one returned.")}Copy the code
  • Timeout control:

At the same time, we can also set a branch of a timeout wait, when the channel timeout has not returned, we can execute the corresponding code.

The Demo:

	select {
	case ret := <-AsyncService(): // Return normal
		t.Log(ret)
	case <-time.After(time.Millisecond * 100) :// Wait for timeout
		t.Error("time out")}Copy the code

V. Closure and broadcasting of channel

The main points are as follows:

  1. To haveclosethechannelSending a message will cause the programpanic.
  2. v, ok <- channel. Among them,okforboolIf a value,ok==trueSaid,channelIn aopenState. ifok==falseSaid,channelIn acloseState.
  3. allchannelThe receiver inchannelWhen closed, it is immediately returned from the blocking wait, andokA value offalse. (PS: broadcast mechanism, usually used to send signals simultaneously to multiple subscribers. For example, exit signal.

The Demo:

// Message producer
func dataProducer(ch chan int, wg *sync.WaitGroup) {
	go func(a) {
		for i := 0; i < 10; i++ {
			ch <- i
		}

		fmt.Println("channel close.")
		close(ch) / / close the channel

		wg.Done()
	}()
}

// Message receiver
func dataReceiver(ch chan int, wg *sync.WaitGroup) {
	go func(a) {
		for {
			if data, ok := <-ch; ok { // Prints messages until the channel is closed.
				fmt.Println(data)
			} else {
				fmt.Println("Receiver close.")
				break / / channel is close
			}
		}
		wg.Done()
	}()
}

func TestCloseChannel(t *testing.T) {
	var wg sync.WaitGroup
	ch := make(chan int)
	wg.Add(1)
	dataProducer(ch, &wg) // Start the producer
	wg.Add(1)
	dataReceiver(ch, &wg) // Unlock the consumer
	wg.Wait()
}
Copy the code

6. Mission cancellation

By using the close channel (broadcast mechanism) above, we can extend this by telling all channels to cancel the current task through a close channel.

The Demo is as follows:

func isCancelled(cancelChan chan struct{}) bool {
	select {
	case <-cancelChan:
		return true
	default:
		return false}}// Only a single channel can be cancelled
func cancel_1(cancelChan chan struct{}) {
	cancelChan <- struct{}{}
}

// Cancel all channels
func cancel_2(cancelChan chan struct{}) {
	close(cancelChan)
}

func TestCancel(t *testing.T) {
	cancelChan := make(chan struct{}, 0) // create a channal to control event cancellation
	for i := 0; i < 5; i++ {             // Start 5 coroutines
		go func(i int, chanclCh chan struct{}) { // Every coroutine has an infinite loop waiting for a cancellation message
			for {
				if isCancelled(cancelChan) {
					break
				}
				time.Sleep(time.Millisecond * 5) // The simulation delay is 5 ms
			}
			fmt.Println(i, "Cancelled") // The system exits the infinite loop and prints logs
		}(i, cancelChan)
	}
	cancel_2(cancelChan) // Tell all channels to close.
	time.Sleep(time.Second * 1)}Copy the code

Context and associated tasks are cancelled

We just cancelled the task through the close channel, but there were some problems. For example, when a task is cancelled, its associated subtasks should also be cancelled immediately.

To solve this problem, after Go 1.9.0, Golang added context to ensure that associated tasks were cancelled.

1. Context

Context is the context used to manage tasks, including the passing of shared values, timeouts, and cancellation notifications.

The structure is as follows:

type Context interface {
    Deadline() (deadline time.Time, ok bool)
    Done() <-chan struct{}
    Err() error
    Value(key interface{}) interface{}}Copy the code
  1. DeadlineWill return a timeout,GoroutineAfter obtaining the timeout period, you can set the timeout period for certain I/O operations.
  2. DoneMethod returns a channel (channel), whenContextWhen revoked or expired, the channel is closed, that is, it is a representationContextWhether the signal is turned off.
  3. whenDoneAfter the channel is closed, the Err method indicates why the Context was removed.
  4. ValueYou can makeGoroutineShare some data, of course get the data is coroutine safe. But be careful to synchronize when using this data, such as returning onemapAnd thismapRead and write are locked.

Key points:

  • Root Context: Passcontext.Background()To create.
  • Subcontext: Passcontext.WithCancel(parentContext)To create.
  • When the current Context is canceled, all the other contexts based on it are canceled.
  • Receiving cancellation notifications:<-ctx.Done

2. The associated task is cancelled

Let’s tweak the example a little bit and use the context to cancel all associated tasks.

  • First, create onecontext:
ctx, cancel := context.WithCancel(context.Background()) // Create a child context
Copy the code
  • Write a cancel method that takescontextAs a parameter.
func isCancelled(ctx context.Context) bool {
    select {
    case <-ctx.Done():
        return true
    default:
        return false}}Copy the code
  • Open five coroutine infinite loops, each of which has an infinite loop waiting for a cancellation message. Call againcancelMethods.
for i := 0; i < 5; i++ {                                // Start 5 coroutines
        go func(i int, ctx context.Context) { // Every coroutine has an infinite loop waiting for a cancellation message
            for {
                if isCancelled(ctx) {
                    break
                }
                time.Sleep(time.Millisecond * 5) // The simulation delay is 5 ms
            }
            fmt.Println(i, "Cancelled") // The system exits the infinite loop and prints logs
        }(i, ctx)
    }
    cancel() / / cancel the CTX
Copy the code

The complete sample code is as follows:

func isCancelled(ctx context.Context) bool {
	select {
	case <-ctx.Done():
		return true
	default:
		return false}}func TestCancel(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background()) // Create a child context
	for i := 0; i < 5; i++ {                                // Start 5 coroutines
		go func(i int, ctx context.Context) { // Every coroutine has an infinite loop waiting for a cancellation message
			for {
				if isCancelled(ctx) {
					break
				}
				time.Sleep(time.Millisecond * 5) // The simulation delay is 5 ms
			}
			fmt.Println(i, "Cancelled") // The system exits the infinite loop and prints logs
		}(i, ctx)
	}
	cancel() / / cancel the CTX
	time.Sleep(time.Second * 1)}Copy the code

Viii. Common Concurrent tasks (actual combat)

1. Execute only once (singleton mode)

Scenario: In the case of multiple coroutines, certain code is guaranteed to be executed only once.

type Singleton struct {
	data string
}

var singleInstance *Singleton
var once sync.Once

func GetSingletonObj(a) *Singleton {
	once.Do(func(a) {
		fmt.Println("Create Obj")
		singleInstance = new(Singleton)
	})
	return singleInstance
}

func TestGetSingletonObj(t *testing.T) {
	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func(a) {
			obj := GetSingletonObj()
			fmt.Printf("%p\n", obj)
			wg.Done()
		}()
	}
	wg.Wait()
}
Copy the code

2. Just complete any task

Using the channel channel communication mechanism, we can send a message to the object whenever any coroutine completes its task.

func runTask(id int) string {
	time.Sleep(10 * time.Millisecond)
	return fmt.Sprintf("The result is from %d", id)
}

func firstResponse(a) string {
	numOfRunner := 10
	ch := make(chan string, numOfRunner) // Create bufferChannel. (If using channels causes coroutines to leak, the remaining nine channels will remain blocked in the system.)
	for i := 0; i < numOfRunner; i++ { // open 10 coroutines
		go func(i int) {
			ret := runTask(i) // Each coroutine performs the task
			ch <- ret
		}(i)
	}
	return <-ch // Return the first Response in the channel. (Because a channel is a fifO channel)
}

func TestFirstResponse(t *testing.T) {
	t.Log(firstResponse()) // Find that each run returns a different result, according to the order in which the coroutine completed the task.
}
Copy the code

3. All missions completed

So just now we introduced first response, so let’s see what we can do with all response. The idea is the same, just receive all the data returned by the channel, and then return.

func runTask(id int) string {
	time.Sleep(10 * time.Millisecond)
	return fmt.Sprintf("The result is from %d", id)
}

func allResponse(a) string {
	numOfRunner := 10
	ch := make(chan string, numOfRunner) // Create bufferChannel.
	for i := 0; i < numOfRunner; i++ {   // open 10 coroutines
		go func(i int) {
			ret := runTask(i) // Each coroutine performs the task
			ch <- ret
		}(i)
	}
	finalRet := ""
	for j := 0; j < numOfRunner; j++ {
		finalRet += <-ch + "\n"
	}
	return finalRet // Return all responses in the channel. (Because a channel is a fifO channel)
}

func TestAllResponse(t *testing.T) {
	t.Log("Before:", runtime.NumGoroutine()) // Print the current number of coroutines
	t.Log(allResponse())                     // Find that each run returns a different result, according to the order in which the coroutine completed the task.
	t.Log("After:", runtime.NumGoroutine()) // Print the current number of coroutines
}
Copy the code

4. The object pool

We can use the pipeline properties of buffer channels to create a pool of objects.

The Demo:

type ReusableObj struct{}type ObjPool struct {
	bufChan chan *ReusableObj // Buffers reusable objects
}

// Produces an object pool with a specified number of objects
func NewObjPool(numOfObj int) *ObjPool {
	ObjPool := ObjPool{}
	ObjPool.bufChan = make(chan *ReusableObj, numOfObj)
	for i := 0; i < numOfObj; i++ {
		ObjPool.bufChan <- &ReusableObj{}
	}
	return &ObjPool
}

// Get the object from the object pool
func (p *ObjPool) GetObj(timeout time.Duration) (*ReusableObj, error) {
	select {
	case ret := <-p.bufChan:
		return ret, nil
	case <-time.After(timeout): // Timeout control
		return nil, errors.New("time out")}}// Release objects from the object pool
func (p *ObjPool) ReleaseObj(obj *ReusableObj) error {
	select {
	case p.bufChan <- obj:
		return nil
	default:
		return errors.New("overflow")}}func TestObjPool(t *testing.T) {
	pool := NewObjPool(10) // Create an object pool of 10 capacity
	for i := 0; i < 10; i++ {
		if v, err := pool.GetObj(time.Second * 1); err ! =nil { / / get obj
			t.Error(err)
		} else {
			fmt.Printf("%T\n", v)                      // Obtain success, allow log.
			iferr := pool.ReleaseObj(v); err ! =nil { / / release obj
				t.Error(err)
			}
		}
	}
	fmt.Println("Done.")}Copy the code

5. Sync. pool Object cache

We can cache objects (create, fetch, cache policies) through sync.pool.

Object acquisition strategy:
  1. First, try to get it from a private object.

  2. Second, if the private object does not exist, try to fetch it from the shared pool of the current Process.

  3. If the current shared pool of the Process is empty, try to fetch it from the shared pool of another Process.

  4. If all shared pools of the Process are empty, “New” a New object is returned from the New method specified in sync.pool.

Sync. pool Cache object lifecycle:
  • Each GC (garbage collection) clears the sync.pool cache object.

  • Therefore, the object cache is valid until the next GC.

Basic use:

func TestSyncPool(t *testing.T) {
	pool := &sync.Pool{
		New: func(a) interface{} { // Create a new object
			fmt.Println("Create a new object.")
			return 100
		},
	}

	v := pool.Get().(int) // Get the object
	fmt.Println(v)
	pool.Put(3) // Put the object back
	// Runtime.gc () // GC is triggered to clear objects cached in sync.pool
	v1, _ := pool.Get().(int)
	fmt.Println(v1)
}
Copy the code

Use under multiple coroutines:

func TestSyncPoolInMultiGroutine(t *testing.T) {
	pool := &sync.Pool{
		New: func(a) interface{} {
			fmt.Println("Create a new object.")
			return 10
		},
	}

	pool.Put(100)
	pool.Put(100)
	pool.Put(100)

	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {// Create 10 coroutines
		wg.Add(1) 
		go func(id int) {
			fmt.Println(pool.Get()) // Get the object
			wg.Done() 
		}(i)
	}
	wg.Wait()
}
Copy the code
Advantages and problems of sync.pool:
  • Advantages: Sync.pool reduces complex object creation and GC costs.

  • Problem: Sync. pool is reclaimed by GC and locks need to be considered in concurrent use. Therefore, make trade-offs in the program. Is it expensive to create an object? Or is it expensive to reuse caches with sync.pool lock?


Finally, this series is summed up and completed in practice under the technical sharing of Teacher CAI Chao. Thank you for your technical sharing.

PS: Attached, sharing link: “Go Language from entry to actual combat” wish you all success in your study and work smoothly. Thank you very much!