Preface: This project is used to record my (647) study and accumulation in Go language direction. This series is fairly basic and is recommended for developers who want to get started with Go. The directory is as follows: Go language foundation (1) — Introduction, environment configuration, HelloWorld Go language foundation (2) — Basic common syntax Go language foundation (3) — Object-oriented programming Go language foundation (4) — high-quality fault tolerance Go language foundation (5) — Concurrent programming Go language foundation (6) — Testing, Reflection, Unsafe Go Basics (7) — Architecture & Common Tasks Go Basics (8) — Performance Tuning
This paper will introduce the following contents: 1. Coroutine mechanism (Groutine) 2. Shared memory concurrency (coroutine security) 3.CSP concurrency (Channel) 4. Multipath selection and timeout control (SELECT) 5. Channel closure and broadcast (channel) 6. Canceling tasks 7. Canceling Context and associated tasks 8. Common concurrent tasks (actual combat)
1. Coroutine mechanism
I’m sure you all know the concept of “thread” and “process”.
In Go, “coroutines” can be understood as more lightweight threads. The efficiency of system Kernel can be maximized by scheduling “coroutines”.
Using a table, let’s compare coroutines with threads.
- Thread vs. Groutine:
\ | Default stack size (when created) | Kernel Space Entity (KSE) |
---|---|---|
Thread the Thread | 1M | 1:1. |
Coroutines Groutine | 2K | M : N |
The advantages of coroutines vs. threads are:
- Switching between threads involves system threads in the kernel (
kernel entity
), which will cause a large cost. - While multiple coroutines in the same system thread (
kernel entity
) to reduce the number of switching system threads (kernel entity
). (As shown in the picture above)
Use of coroutines:
Syntax: go + func
func TestGroutine(t *testing.T) {
for i := 0; i < 10; i++ {
go func(i int) {
fmt.Println(i) // Correct case, value pass. There is no competition between coroutines.
}(i)
// go func() {
// fmt.println (I) // Error case, shared variable. The coroutines are competing
/ /} ()
}
time.Sleep(time.Millisecond * 50)}Copy the code
Ii. Shared memory concurrency mechanism (coroutine security)
When it comes to coroutine security, the first thing that comes to mind is locking. The coroutine is secured by locking.
The same is true in Go. Let’s look at an example.
- Coroutine concurrency, resulting in coroutine insecurity:
// Coroutines are unsafe demo
func TestCounterThreadUnsafe(t *testing.T) {
counter := 0
for i := 0; i < 5000; i++ {
go func(a) {
counter++
}()
}
time.Sleep(1 * time.Second)
t.Logf("counter = %d", counter)
}
Copy the code
The results are as follows:
=== RUN TestCounterThreadUnsafe
--- PASS: TestCounterThreadUnsafe (1.00s)
share_mem_test.go:18: counter = 4765
Copy the code
At this point, you will find that the calculation was wrong because of the concurrency that caused the missing value.
- Solution 1: Common locking and delay waiting for coroutine execution to complete (not recommended)
// Coroutines wait for demo (stop 1 second, not recommended)
func TestCounterThreadSafe(t *testing.T) {
var mut sync.Mutex
counter := 0
for i := 0; i < 5000; i++ {
go func(a) {
defer func(a) {
mut.Unlock() // After the function call is complete: unlock to ensure the security of the coroutine
}()
mut.Lock() // Before the function is called: lock to keep the coroutine safe
counter++
}()
}
time.Sleep(1 * time.Second) // Wait one second for the coroutine to complete
t.Logf("counter = %d", counter)
}
Copy the code
The results are as follows:
=== RUN TestCounterThreadSafe
--- PASS: TestCounterThreadSafe (1.01s)
share_mem_test.go:35: counter = 5000
Copy the code
It’s correct, but there’s a problem. Because there’s a one-second delay waiting for the coroutine to run before calling the result. So is there a better way to handle it? Now let’s optimize it a little bit.
- Solution 2:Recommended!!!Using synchronous wait queues (
WaitGroup
) ensure sequential execution.
// Coroutine security Demo
func TestCounterWaitGroup(t *testing.T) {
var mut sync.Mutex / / the mutex
var wg sync.WaitGroup // Wait for the queue
counter := 0
for i := 0; i < 5000; i++ {
wg.Add(1) // Add a task
go func(a) {
defer func(a) {
mut.Unlock() // After the function call is complete: unlock to ensure the security of the coroutine
}()
mut.Lock() // Before the function is called: lock to keep the coroutine safe
counter++
wg.Done() // Finish the task
}()
}
wg.Wait() // Wait for all tasks to complete
t.Logf("counter = %d", counter)
}
Copy the code
The running results are as follows:
=== RUN TestCounterWaitGroup
--- PASS: TestCounterWaitGroup (0.00s)
share_mem_test.go:55: counter = 5000
Copy the code
In this case, the Mutex and WaitGroup not only guarantee the security of the coroutine, but also prevent the result from being printed ahead of time. (✔ ️)
CSP concurrency mechanism
1. CSP
3. CSP (Communicating sequential processes) : Communicating in a pipeline. In simple terms, CSPS communicate through channels.
Channels in Go have capacity constraints and are independent of processing Groutine (coroutines).
2. Channel
There are two common channels in Go, corresponding to Channel and Buffer Channel respectively.
- The first type: Channel (unbuffered)
First, the sender and receiver must be on both sides of a Channel to interact. If one party is absent, the other party blocks at one end and does not interact until both sides are present.
Make (chan [type])
retChannel := make(chan string) // Create an unbuffered channel and specify that the data in the channel is a string, double-ended wait
Copy the code
Input syntax: channel <-
channel <- object / / input channel
Copy the code
Get syntax: < -channel
object <- channel / / channel output
Copy the code
- Second type: Buffer Channel
This is a slightly more advanced way of channeling (more loosely coupled).
First, set a capacity for the Channel and do not require the sender and receiver to be on both ends at the same time. The sender then keeps sending messages to the Channel as buffers. A Channel is blocked until its capacity is full. In this case, as long as the receiver receives the message (that is, the Channel has free capacity), the sender will continue to send the message.
Make (chan [type], Int)
retChannel := make(chan string.1) // Create a buffered channel and specify the data in the channel as string
Copy the code
Input syntax: channel <-
channel <- object / / input channel
Copy the code
Get syntax: < -channel
object <- channel / / channel output
Copy the code
Demo: Simulating the method call process of a network request, controlling the current coroutine to perform other tasks in the waiting process of network request through Channel.
// Simulate a network request
func serviceTask(a) string {
fmt.Println("- start working on service task.")
time.Sleep(time.Millisecond * 50)
return "- service task is Done."
}
// Other tasks
func otherTask(a) {
fmt.Println("start working on something else.")
time.Sleep(time.Millisecond * 100)
fmt.Println("other task is Done.")}// CSP asynchronous pipe
func AsyncService(a) chan string {
retChannel := make(chan string) // Unbuffered channel, create and specify the data in the channel as string, double-ended wait
RetChannel := make(chan string, 1
go func(a) {
ret := serviceTask()
fmt.Println("returned result.")
retChannel <- ret / / input channel
fmt.Println("service exited.")
}()
return retChannel
}
func TestAsyncService(t *testing.T) {
retCh := AsyncService()
otherTask()
fmt.Println(<-retCh) / / channel output
time.Sleep(time.Second * 1)}Copy the code
4. Multi-path selection and timeout control
Use the select keyword to complete the “multiple selection” and “timeout control”.
- Multiplexing: when returned
channel
When there may be more than one, select can be used to handle multiple response events.
Note: This is similar to switch, but note that it is not sequential. That is, if channel1 and channel2 meet the requirements at the same time, channel1 or Channel2 can be used instead of the same sequence as switch.
The Demo:
select {
case ret := <-channel1:
t.Log(ret)
case ret:= <- channel2:
t.Log(ret)
case default:
t.Error("No one returned.")}Copy the code
- Timeout control:
At the same time, we can also set a branch of a timeout wait, when the channel timeout has not returned, we can execute the corresponding code.
The Demo:
select {
case ret := <-AsyncService(): // Return normal
t.Log(ret)
case <-time.After(time.Millisecond * 100) :// Wait for timeout
t.Error("time out")}Copy the code
V. Closure and broadcasting of channel
The main points are as follows:
- To have
close
thechannel
Sending a message will cause the programpanic
. v, ok <- channel
. Among them,ok
forbool
If a value,ok==true
Said,channel
In aopen
State. ifok==false
Said,channel
In aclose
State.- all
channel
The receiver inchannel
When closed, it is immediately returned from the blocking wait, andok
A value offalse
. (PS: broadcast mechanism, usually used to send signals simultaneously to multiple subscribers. For example, exit signal.
The Demo:
// Message producer
func dataProducer(ch chan int, wg *sync.WaitGroup) {
go func(a) {
for i := 0; i < 10; i++ {
ch <- i
}
fmt.Println("channel close.")
close(ch) / / close the channel
wg.Done()
}()
}
// Message receiver
func dataReceiver(ch chan int, wg *sync.WaitGroup) {
go func(a) {
for {
if data, ok := <-ch; ok { // Prints messages until the channel is closed.
fmt.Println(data)
} else {
fmt.Println("Receiver close.")
break / / channel is close
}
}
wg.Done()
}()
}
func TestCloseChannel(t *testing.T) {
var wg sync.WaitGroup
ch := make(chan int)
wg.Add(1)
dataProducer(ch, &wg) // Start the producer
wg.Add(1)
dataReceiver(ch, &wg) // Unlock the consumer
wg.Wait()
}
Copy the code
6. Mission cancellation
By using the close channel (broadcast mechanism) above, we can extend this by telling all channels to cancel the current task through a close channel.
The Demo is as follows:
func isCancelled(cancelChan chan struct{}) bool {
select {
case <-cancelChan:
return true
default:
return false}}// Only a single channel can be cancelled
func cancel_1(cancelChan chan struct{}) {
cancelChan <- struct{}{}
}
// Cancel all channels
func cancel_2(cancelChan chan struct{}) {
close(cancelChan)
}
func TestCancel(t *testing.T) {
cancelChan := make(chan struct{}, 0) // create a channal to control event cancellation
for i := 0; i < 5; i++ { // Start 5 coroutines
go func(i int, chanclCh chan struct{}) { // Every coroutine has an infinite loop waiting for a cancellation message
for {
if isCancelled(cancelChan) {
break
}
time.Sleep(time.Millisecond * 5) // The simulation delay is 5 ms
}
fmt.Println(i, "Cancelled") // The system exits the infinite loop and prints logs
}(i, cancelChan)
}
cancel_2(cancelChan) // Tell all channels to close.
time.Sleep(time.Second * 1)}Copy the code
Context and associated tasks are cancelled
We just cancelled the task through the close channel, but there were some problems. For example, when a task is cancelled, its associated subtasks should also be cancelled immediately.
To solve this problem, after Go 1.9.0, Golang added context to ensure that associated tasks were cancelled.
1. Context
Context is the context used to manage tasks, including the passing of shared values, timeouts, and cancellation notifications.
The structure is as follows:
type Context interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key interface{}) interface{}}Copy the code
Deadline
Will return a timeout,Goroutine
After obtaining the timeout period, you can set the timeout period for certain I/O operations.Done
Method returns a channel (channel
), whenContext
When revoked or expired, the channel is closed, that is, it is a representationContext
Whether the signal is turned off.- when
Done
After the channel is closed, the Err method indicates why the Context was removed. Value
You can makeGoroutine
Share some data, of course get the data is coroutine safe. But be careful to synchronize when using this data, such as returning onemap
And thismap
Read and write are locked.
Key points:
- Root Context: Pass
context.Background()
To create. - Subcontext: Pass
context.WithCancel(parentContext)
To create. - When the current Context is canceled, all the other contexts based on it are canceled.
- Receiving cancellation notifications:
<-ctx.Done
2. The associated task is cancelled
Let’s tweak the example a little bit and use the context to cancel all associated tasks.
- First, create one
context
:
ctx, cancel := context.WithCancel(context.Background()) // Create a child context
Copy the code
- Write a cancel method that takes
context
As a parameter.
func isCancelled(ctx context.Context) bool {
select {
case <-ctx.Done():
return true
default:
return false}}Copy the code
- Open five coroutine infinite loops, each of which has an infinite loop waiting for a cancellation message. Call again
cancel
Methods.
for i := 0; i < 5; i++ { // Start 5 coroutines
go func(i int, ctx context.Context) { // Every coroutine has an infinite loop waiting for a cancellation message
for {
if isCancelled(ctx) {
break
}
time.Sleep(time.Millisecond * 5) // The simulation delay is 5 ms
}
fmt.Println(i, "Cancelled") // The system exits the infinite loop and prints logs
}(i, ctx)
}
cancel() / / cancel the CTX
Copy the code
The complete sample code is as follows:
func isCancelled(ctx context.Context) bool {
select {
case <-ctx.Done():
return true
default:
return false}}func TestCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) // Create a child context
for i := 0; i < 5; i++ { // Start 5 coroutines
go func(i int, ctx context.Context) { // Every coroutine has an infinite loop waiting for a cancellation message
for {
if isCancelled(ctx) {
break
}
time.Sleep(time.Millisecond * 5) // The simulation delay is 5 ms
}
fmt.Println(i, "Cancelled") // The system exits the infinite loop and prints logs
}(i, ctx)
}
cancel() / / cancel the CTX
time.Sleep(time.Second * 1)}Copy the code
Viii. Common Concurrent tasks (actual combat)
1. Execute only once (singleton mode)
Scenario: In the case of multiple coroutines, certain code is guaranteed to be executed only once.
type Singleton struct {
data string
}
var singleInstance *Singleton
var once sync.Once
func GetSingletonObj(a) *Singleton {
once.Do(func(a) {
fmt.Println("Create Obj")
singleInstance = new(Singleton)
})
return singleInstance
}
func TestGetSingletonObj(t *testing.T) {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(a) {
obj := GetSingletonObj()
fmt.Printf("%p\n", obj)
wg.Done()
}()
}
wg.Wait()
}
Copy the code
2. Just complete any task
Using the channel channel communication mechanism, we can send a message to the object whenever any coroutine completes its task.
func runTask(id int) string {
time.Sleep(10 * time.Millisecond)
return fmt.Sprintf("The result is from %d", id)
}
func firstResponse(a) string {
numOfRunner := 10
ch := make(chan string, numOfRunner) // Create bufferChannel. (If using channels causes coroutines to leak, the remaining nine channels will remain blocked in the system.)
for i := 0; i < numOfRunner; i++ { // open 10 coroutines
go func(i int) {
ret := runTask(i) // Each coroutine performs the task
ch <- ret
}(i)
}
return <-ch // Return the first Response in the channel. (Because a channel is a fifO channel)
}
func TestFirstResponse(t *testing.T) {
t.Log(firstResponse()) // Find that each run returns a different result, according to the order in which the coroutine completed the task.
}
Copy the code
3. All missions completed
So just now we introduced first response, so let’s see what we can do with all response. The idea is the same, just receive all the data returned by the channel, and then return.
func runTask(id int) string {
time.Sleep(10 * time.Millisecond)
return fmt.Sprintf("The result is from %d", id)
}
func allResponse(a) string {
numOfRunner := 10
ch := make(chan string, numOfRunner) // Create bufferChannel.
for i := 0; i < numOfRunner; i++ { // open 10 coroutines
go func(i int) {
ret := runTask(i) // Each coroutine performs the task
ch <- ret
}(i)
}
finalRet := ""
for j := 0; j < numOfRunner; j++ {
finalRet += <-ch + "\n"
}
return finalRet // Return all responses in the channel. (Because a channel is a fifO channel)
}
func TestAllResponse(t *testing.T) {
t.Log("Before:", runtime.NumGoroutine()) // Print the current number of coroutines
t.Log(allResponse()) // Find that each run returns a different result, according to the order in which the coroutine completed the task.
t.Log("After:", runtime.NumGoroutine()) // Print the current number of coroutines
}
Copy the code
4. The object pool
We can use the pipeline properties of buffer channels to create a pool of objects.
The Demo:
type ReusableObj struct{}type ObjPool struct {
bufChan chan *ReusableObj // Buffers reusable objects
}
// Produces an object pool with a specified number of objects
func NewObjPool(numOfObj int) *ObjPool {
ObjPool := ObjPool{}
ObjPool.bufChan = make(chan *ReusableObj, numOfObj)
for i := 0; i < numOfObj; i++ {
ObjPool.bufChan <- &ReusableObj{}
}
return &ObjPool
}
// Get the object from the object pool
func (p *ObjPool) GetObj(timeout time.Duration) (*ReusableObj, error) {
select {
case ret := <-p.bufChan:
return ret, nil
case <-time.After(timeout): // Timeout control
return nil, errors.New("time out")}}// Release objects from the object pool
func (p *ObjPool) ReleaseObj(obj *ReusableObj) error {
select {
case p.bufChan <- obj:
return nil
default:
return errors.New("overflow")}}func TestObjPool(t *testing.T) {
pool := NewObjPool(10) // Create an object pool of 10 capacity
for i := 0; i < 10; i++ {
if v, err := pool.GetObj(time.Second * 1); err ! =nil { / / get obj
t.Error(err)
} else {
fmt.Printf("%T\n", v) // Obtain success, allow log.
iferr := pool.ReleaseObj(v); err ! =nil { / / release obj
t.Error(err)
}
}
}
fmt.Println("Done.")}Copy the code
5. Sync. pool Object cache
We can cache objects (create, fetch, cache policies) through sync.pool.
Object acquisition strategy:
-
First, try to get it from a private object.
-
Second, if the private object does not exist, try to fetch it from the shared pool of the current Process.
-
If the current shared pool of the Process is empty, try to fetch it from the shared pool of another Process.
-
If all shared pools of the Process are empty, “New” a New object is returned from the New method specified in sync.pool.
Sync. pool Cache object lifecycle:
-
Each GC (garbage collection) clears the sync.pool cache object.
-
Therefore, the object cache is valid until the next GC.
Basic use:
func TestSyncPool(t *testing.T) {
pool := &sync.Pool{
New: func(a) interface{} { // Create a new object
fmt.Println("Create a new object.")
return 100
},
}
v := pool.Get().(int) // Get the object
fmt.Println(v)
pool.Put(3) // Put the object back
// Runtime.gc () // GC is triggered to clear objects cached in sync.pool
v1, _ := pool.Get().(int)
fmt.Println(v1)
}
Copy the code
Use under multiple coroutines:
func TestSyncPoolInMultiGroutine(t *testing.T) {
pool := &sync.Pool{
New: func(a) interface{} {
fmt.Println("Create a new object.")
return 10
},
}
pool.Put(100)
pool.Put(100)
pool.Put(100)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {// Create 10 coroutines
wg.Add(1)
go func(id int) {
fmt.Println(pool.Get()) // Get the object
wg.Done()
}(i)
}
wg.Wait()
}
Copy the code
Advantages and problems of sync.pool:
-
Advantages: Sync.pool reduces complex object creation and GC costs.
-
Problem: Sync. pool is reclaimed by GC and locks need to be considered in concurrent use. Therefore, make trade-offs in the program. Is it expensive to create an object? Or is it expensive to reuse caches with sync.pool lock?
Finally, this series is summed up and completed in practice under the technical sharing of Teacher CAI Chao. Thank you for your technical sharing.
PS: Attached, sharing link: “Go Language from entry to actual combat” wish you all success in your study and work smoothly. Thank you very much!