“This is the 10th day of my participation in the November Gwen Challenge. See details of the event: The Last Gwen Challenge 2021”.

Basic concepts needed for concurrent programming

::: Tip Basic concepts

  • What is serial?
  • What is parallelism?
  • What is concurrency?
  • What is a program?
  • What is a process?
  • What is a thread?
  • What is a coroutine?

: : :

1.1 What is Serial?

In a computer, only one instruction can be executed on a CPU at a time, and subsequent instructions must wait until the previous instruction has been executed. Serial is to execute in order, just like the bank has only one window, there are three people to do things, so you have to queue up, only the front of the people finish to leave, you can turn

1.2 What is parallelism?

In a computer, multiple instructions are executed on multiple cpus at the same time, called parallelism. Parallelism is simultaneous execution, just like a bank has three Windows, three people to do things, just need to go to the empty window to do things immediately.

1.3 What is concurrency?

In a computer, only one instruction can be executed on a CPU at a time, but the CPU will quickly poll multiple instructions for execution, which is called concurrency. Concurrency is pseudo-parallelism, just like the bank has only one window, there are three people to do things, so when it is not the turn of the person behind, the person behind can use slippers to queue up first, go to have breakfast, buy something, feel almost to come back to do things

1.4 The difference between concurrency and parallelism

  • Multithreaded programs running on a single core are concurrent
  • Multithreaded programs running on multiple cores are parallel

1.5 What is a Program?

A program is a binary file that is compiled and stored on disk, occupying disk space but not system resources

1.6 What is a process?

A process is a basic unit of system resource allocation and scheduling. The following is an example

  • Start notepad and create a Notepad process in your system
  • Starting Notepad again creates a Notepad process in the system

1.7 What is a thread?

A thread is an instance of execution in a process. It is the smallest unit of program execution. It is a basic unit smaller than a process that can run independently. For example, the following

  • Start this thunderbolt thingThe program, the system will create oneThunderbolt processAnd there is one by defaultThe main thread, used to execute the default Xunlei business logic
  • When we use thunderbolt downloadMultiple tasksYou will find that multiple tasks are presentAt the same time downloadIn order to be able toAt the same time to performDownload operation, thunderbolt will create multiple threads, will be different download tasks in different threads to execute

1.8 Summary of Processes and threads

  • Process is a program in the operating system of a process of execution, is the system for resource allocation and scheduling of the basic unit
  • A thread is an execution instance of a process. It is the smallest unit of program execution. It is a basic unit smaller than a process that can run independently.
  • A process can create multiple threads of nuclear destruction, and multiple threads in the same process can execute concurrently
  • A program has at least one process, and a process has at least one thread

1.9 What is a coroutine?

  • Coroutine is a user – mode lightweight thread, also known as microthread, English name Coroutine
  • The biggest advantage of coroutines over traditional system-level processes and threads is that they are “lightweight “. Tens of thousands can be easily created without depleting system resources. Threads and processes rarely exceed 10,000. This is why coroutines are called “lightweight threads”
  • There can be any number of coroutines in a thread, but only one can be running at any one time, and multiple coroutines share the computer resources allocated by the thread
  • In coroutines, calling a task is like calling a function, consuming few system resources, but achieving the same concurrency effect as a process or thread

Goroutine Quick start

::: Danger Goroutine features

  • The big advantage of coroutines over traditional system-level threads and processes is that they are “lightweight” and can easily be created in the millions without depleting system resources, and threads and processes are usually no more than 10,000. This is why coroutines are also called lightweight threads.
  • Golang natively supports concurrent programming
  • Lightweight thread
  • Non-preemptive multitasking in which the coroutine voluntarily relinquishes control
  • Multitasking at the compiler/interpreter/virtual machine level
  • Multiple coroutines may run on one or more threads

: : :

A Go thread can have multiple coroutines on it. You can think of coroutines as lightweight threads optimized by the compiler

2.1 use goroutine

The use of goroutine in the Go language is very simple. You can create a Goroutine for a function by preempting the Go keyword when calling the function.

A goroutine must correspond to a function, and multiple Goroutines can be created to perform the same function

2.2 Starting a Single Goroutine

The way to start Goroutine is as simple as adding the go keyword in front of the functions called (normal and anonymous)

- Goroutine --Go implementation of coroutines -go+ function name: Starts a coroutine execution functionCopy the code

For example, the following

package main

import (
	"fmt"
	"time"
)
// Define a function helloGoroutine
func helloGoroutine(a) {
	fmt.Println("helloGoroutine ")}func main(a) {
	// Start a coroutine execution function
	go helloGoroutine()
	fmt.Println("Execute main")
	// To avoid the program exiting immediately after concurrent execution, sleep 2 seconds
	time.Sleep(2)}Copy the code

Output result:

Execute the main function helloGoroutineCopy the code

2.3 Starting Multiple Goroutines

Start multiple Goroutine + named functions

package main

import (
	"fmt"
	"time"
)
// Define a function helloGoroutine
func helloGoroutine(x int) {
	fmt.Println("helloGoroutine ", x)
}
func main(a) {
	for i := 1; i < 10; i++ {
		// Start a coroutine execution function
		go helloGoroutine(i)
	}
	// To avoid the program exiting immediately after concurrent execution, sleep 2 seconds
	time.Sleep(2)}Copy the code

The order of the results of the execution changes

helloGoroutine  2
helloGoroutine  3
helloGoroutine  4
helloGoroutine  1
helloGoroutine  7
helloGoroutine  6
helloGoroutine  5
helloGoroutine  8
helloGoroutine  9
Copy the code

Use sync.waitGroup to synchronize goroutine

package main

import (
	"fmt"
	"sync"
)
var wg sync.WaitGroup

// Define a function hello
func hello(i int) {
	defer wg.Done() // When goroutine is finished, register -1
	fmt.Println("Hello Goroutine!", i)
}
func main(a) {

	for i := 0; i < 10; i++ {
		wg.Add(1) // Start a goroutine and register +1
		// Start a coroutine execution function
		go hello(i)
	}
	wg.Wait() // Wait for all registered goroutines to finish
}
Copy the code

Sync. WaitGroup instructions

3 GPM model

GPM is the implementation of Go language runtime level and a scheduling system implemented by Go language itself. Different from operating system scheduling OS threads.

Learn more ::: tip Explains what the GMP model means

  • M structure is Machine, system thread, it is managed by the operating system, goroutine is running on M; M is a large structure that maintains a small object memory cache (McAche), a goroutine currently executed, a random number generator, and much more
  • The P structure is the Processor, and its main purpose is to execute a Goroutine. It maintains a Goroutine queue, a runqueue. Let’s go from N:1 scheduling to the important part of M:N scheduling.
  • G is the core structure of a Goroutine implementation. It contains the stack, instruction Pointers, and other information important to scheduling a Goroutine, such as the channel it blocks.

: : :

Note: The number of processors is set to the value of the environment variable GOMAXPROCS at startup or through the run time scheduling function GOMAXPROCS(). A fixed number of processors means that only GOMAXPROCS threads are running go code at any one time

3.1 Scenario Analysis

We use triangles, rectangles, and circles to represent the Machine Processor and Goroutine respectively.

3.2 Under normal conditions

All goroutines run in the same M system thread, and each M system thread maintains one Processor. At any given time, only one Goroutine exists in one Processor, and all other goroutines wait in a Runqueue. After a Goroutine has run its time slice, it cedes context and returns to the RunQueue. In a multi-core Processor scenario, each M system thread holds one Processor in order to run goroutines.

If both MS are running on the same CPU, this is concurrency; If two Ms are running on different cpus, this is parallel. The scheduler normally follows this process, assigning P to the next G when the goroutine’s time slice ends, but the thread blocks.

3.3 Thread Blocking

When the running Goroutine (G0) blocks, such as a system call, another system thread (M1) is created, the current M0 thread abandons its Processor (P), and P runs in the new thread.

3.4 RunQueue The execution is complete

When the runqueue of one of the processors is empty and there is no goroutine to dispatch, it steals half of the Goroutine from the other context.

First, a G object is created, and the G object is saved to the P local queue or global queue. P now wakes up an M. P continues its execution. M looks for a free P and moves the G object to itself if it does. Next M executes a scheduling loop (call G object -> execute -> clean up thread – continue to find a new Goroutine to execute).

Context switching can occur at any time during M execution. When a switchover occurs, you need to protect the execution site so that the execution site can be recovered when the execution is scheduled next time. The stack of THE Go scheduler M is stored on the G object. You only need to save the registers (SP, PC, etc.) required by M to the G object to achieve on-site protection. When the register data is secured, it is ready to do context switching, saving the scene before interrupting. If the task G is not finished, M can throw the task back to the task queue of P and wait for the next scheduled execution. When it is scheduled to execute again, M performs on-site recovery by accessing THE vdsoSP and vdsoPC registers of G (continuing execution from last interruption).

3.5 Simple EXAMPLES of GMP

package main

import (
	"fmt"
	"runtime"
	"sync"
)
var wg sync.WaitGroup

func  a(a)  {
	defer wg.Done() // When goroutine is finished, register -1
	for i:=0; i<10; i++ { fmt.Println("A=",i)
	}
}


func  b(a)  {
	defer wg.Done() // When goroutine is finished, register -1
	for i:=0; i<10; i++ { fmt.Println("B=",i)
	}
}
func main(a) {
	 // Get the number of logical cpus on the local machine
	cpu := runtime.NumCPU()
	// Sets the maximum number of cpus that can be executed simultaneously
	runtime.GOMAXPROCS(cpu- 1)
	// Start one goroutine and register +1
	wg.Add(2)
	// Start two goroutines execute a() and b() respectively
	go a()
	go b()
	wg.Wait() // Wait for all registered goroutines to finish

}
Copy the code

The running results are as follows:

B= 0
B= 1
B= 2
A= 0
A= 1
A= 2
A= 3
A= 4
A= 5
A= 6
A= 7
A= 8
A= 9
B= 3
B= 4
B= 5
B= 6
B= 7
B= 8
B= 9
Copy the code

4 Case Analysis

Calculate the factorial of each number from 1 to 20, and put the factorial of each number into the map, and finally display it, using Goroutine

package main
import (
	"fmt"
)
//1. Map should be global
var(
	myMap = make(map[int]int.20))// the test function evaluates n! , and put the results into myMap
func test(n int){
	res :=1
	for i := 1; i <= n; i++{
		res *= i
	}
	// Put res into myMap
	myMap[n] = res
}
func main(a) {
	// Open multiple coroutines to complete the task
	for i:=1; i<=20; i++{go test(i)
	}
	//time.Sleep(time.Second)
	for i,v:= range myMap{
		fmt.Printf("Factorial: %d! =%d\n",i,v)
	}
}
Copy the code

5 channel

5.1 Why is channel needed

It doesn’t make sense to simply execute functions concurrently. Functions need to exchange data to make sense of executing functions concurrently.

Although shared memory can be used for data exchange, there are different Goroutines within the share that are prone to race problems. In order to ensure the correctness of data exchange, mutex must be used to lock the memory, which inevitably causes performance problems.

5.2 The concurrency model is CSP

::: Tip Channel Introduction A channel is a channel through which goroutine communicates. Data is sent from one end to the other and received over the channel. The concurrency model of Go language is CSP (Communicating parallel Processes), which advocates communication through shared memory rather than through shared memory. : : :

5.3 Channel Type Definition

A channel is a special type in the Go language. A channel, like a conveyor belt or queue, always follows a First In First Out rule, ensuring the order In which data is sent and received. Each channel is a conduit of a concrete type, which requires the element type to be specified when declaring a channel. A channel is a type, a reference type. The format for declaring channel types is as follows

varvariablechanThe element typeCopy the code

Take a chestnut

var ch1 chan int   // Declare a channel for passing integers
var ch2 chan bool  // Declare a channel for passing booleans
var ch3 chan []int // Declare a channel to pass int slices
Copy the code

5.4 Creating a Channel

The zero value of the channel is nil. Nil channels make no sense, so channels must be defined in a similar way to maps and slicing. Create a channel in the following format: The buffer size of a channel is optional.

make(chanElement type, [buffer size])Copy the code

5.5 the channel operations

A channel has three operations: send, receive, and close.

Both send and receive use the <- symbol.

5.5.1 Define an example channel

ch := make(chan int.10)
Copy the code

5.5.2 send

Sends a value to the channel

ch <- 10 // send 10 to channel CH
Copy the code

5.5.3 receive

Receives values from a channel.

 x := <- ch // Receive a value from channel CH and assign it to variable x
  <-ch       // Receive the value from channel CH, ignoring the result
Copy the code

5.5.4 closed

We close the channel by calling the built-in close function. One thing to note about closing a channel is that you only need to close a channel if you notify the receiver goroutine that all data has been sent. Channels can be reclaimed by the garbage collection mechanism, which is not the same as closing files. Closing files after an operation is required, but closing channels is not required.

close(ch)          
Copy the code

5.5.5 Characteristics of the Closed Channel

  • Sending values to a closed channel can cause panic.
  • Receiving on a closed channel fetches values until the channel is empty.
  • A receive operation on a closed channel with no value results in a zero value of the corresponding type.
  • Closing a closed channel can cause panic.

5.6 Unidirectional Channel

The Go language’s type system provides single-directional channel types, which, as the name implies, can only be used to send or receive data. A channel must support both reading and writing, or it cannot be used at all.

5.6.1 Declaration format of unidirectional Channels

When passing a channel variable to a function, we can specify it as a one-way channel variable to limit the operations that can be performed on that channel, such as only writing to or reading from that channel. The declaration of the one-way channel variable is very simple. The channel type that can only be sent is chan<- and the channel type that can only be received is <-chan, in the following format:

varChannel instancechan<- Element type// Only channels can be sent
varChannel instance <-chanThe element type// Can only receive channels
Copy the code
  • Element type: The element type contained in the channel.
  • Channel instance: declared channel variable.
var ch1 chan int            //ch1 is a normal channel, not one-way
var ch2 chan <- float64        // CH2 is a one-way channel and is only used to write data in Float64
var ch3  <- chan  int        //ch3 is a one-way channel that only reads int data

ch4 := make(chan int.10)         // Define and initialize a normal channel
send := make(chan<- int.10)      // Define and initialize a just send channel
receive := make( <-chan int ,10) // Define and initialize a mere receive channel
Copy the code

5.6.2 Common Error Examples

package main

func main(a) {
	// Create a channel, two-way
	ch := make(chan int)
	// Define a one-way write-only channel
	var writech chan <- int = ch
	// An error will be reported if it is written as follows
	<-writech
	// Define a one-way, read-only channel
	var readch <- chan int = ch
	// The following is a problem
	readch<- 555.
	// All the following will compile normally
	writech <- Awesome!
	<- readch
	// One-way cannot be converted to bidirectional
	var ch2  chan int = writech
}
Copy the code

5.6.3 Producer-consumer

package main

import "fmt"

// indicates that messages can only be sent to a channel, but not received
func producer(out chan<- int) {
	// Loop to send I squared to channel out
	for i := 0; i <= 10; i++ {
		out <- i * i
	}
	// Close the channel
	close(out)
}

// indicates that messages can only be sent to a channel
func consumer(in <-chan int) {
	// Loop to read data for channel IN
	for num := range in {
		fmt.Println("num = ", num)
	}
}

func main(a) {
	// Create a two-way channel ch
	ch := make(chan int.10)
	// Producer, produce number, write channel
	go producer(ch) //channel passes parameters and references
	// The consumer reads the number from the channel and prints it
	consumer(ch)
}
Copy the code

The following output is displayed:

num =  0
num =  1
num =  4
num =  9
num =  16
num =  25
num =  36
num =  49
num =  64
num =  81
num =  100
Copy the code

5.7 Simple Example for Channel

Example a

package main

import "fmt"

func main(a) {

	ch1:=make(chan int.10) // Define an int channel with a buffer
	ch1<- 10 // send 10 to ch1
	fmt.Println(ch1) // 0xc000082000

	x:=<-ch1 // The variable x receives the value from channel ch1
	fmt.Println("The value of variable x taken out of channel CH1 is,x) // The variable x fetched from channel ch1 is 10

    close(ch1) // Close the channel
}
Copy the code

Example 2

  • Start agoroutine, generates 100 numbers and sends it to CH1
  • Start agoroutineTake the value from CH1, square it and put it in ch2
  • In main, print the values in ch2
package main

import (
	"fmt"
	"sync"
)

var (
	ch1 = make(chan int.100) // Declare two pipes with buffers of 100
	ch2 = make(chan int.100)
	wg  sync.WaitGroup // Wait for the end of a set of coroutines called goroutines
)

// Generate 100 numbers and send it to ch1
func write(a) {
	defer wg.Done()
	for i := 0; i < 100; i++ {
		ch1 <- i
	}
	close(ch1) Goroutine XX [chan receive]:
}

// Take the value from ch1 and square it to ch2
func read(a) {
	defer wg.Done() // When goroutine is finished, register -1
	for {
		x, ok := <-ch1
		if! ok {// Check whether the channel is closed, closing the exit for loop
			break
		}
		val := x * x // If you do not exit the for loop, calculate its square value and place it in ch2
		ch2 <- val
	}
	close(ch2)  Goroutine XX [chan receive]:
}

func main(a) {
	wg.Add(2) // Start two Goroutine registrations +2
	go write() 	// Start a coroutine execution function write()
	go read()	// Start a coroutine execution function read()
	wg.Wait() // Wait for all registered goroutines to finish
	// Use for range to read values in channel CH2
	for x := range ch2 {
		fmt.Printf("Value read as %d\n", x) // Print the output}}Copy the code

5.8 Worker Pool (Goroutine Pool)

Provide a pool of goroutines, with each goroutine loop blocked waiting for tasks to be executed from the pool; If external users continue to throw tasks into the task pool, multiple Goroutines in the goroutine pool will concurrently process those tasks. Learn more about the Goroutine pool

5.8.1, for example, a

package main

import (
	"fmt"
	"sync"
	"time"
)

var wg sync.WaitGroup
func worker(id int, jobs <-chan int, results chan<- int) {
	defer wg.Done()
	for j := range jobs {
		fmt.Printf("Worker :%d start job:%d\n", id, j)
		time.Sleep(time.Second)
		fmt.Printf("Worker :%d job:%d\n", id, j)
		results <- j * 2}}func main(a) {
	jobs := make(chan int.100)
	results := make(chan int.100)
	// Open 3 Goroutines
	wg.Add(3)
	for w := 1; w <= 3; w++ {
		go worker(w, jobs, results)
	}
	// 5 tasks
	for j := 1; j <= 5; j++ {
		jobs <- j
	}
	close(jobs)
	// Output the result
	wg.Wait()
}
Copy the code

5.8.1 example 2

Use Goroutine and channel to implement a calculation of int64 random number bits and procedures.

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

var wg sync.WaitGroup

// Computes the sum of bits of a random 64-bit number
func randNumber(x int64) int64 {
	var sum int64 = 0
	for x > 0 {
		a := x % 10
		x = x / 10
		sum += a
	}
	return sum
}

// Generate int64 random number into channel CH1
func createRand(ch1 chan<- int64) {
	for {
		int63 := rand.Int63()
		ch1 <- int63
		time.Sleep(1)}}// Read data from channel CH1, then calculate the sum of the digits and store it in CH2
func readRand(ch1 <-chan int64, ch2 chan<- int64) {
	for {
		value := <-ch1
		number := randNumber(value)
		ch2 <- number
		fmt.Println(value, number)
	}
}
func main(a) {
	var jobChan = make(chan int64.100)
	var resultChan = make(chan int64.100)
	wg.Add(25)
	go createRand(jobChan)

	for i := 0; i < 24; i++ {
		go readRand(jobChan, resultChan)
	}
	// The sum of bits in the random spanning tree
	for value := range resultChan {
		fmt.Println(value)
	}
	wg.Wait()

}
Copy the code

5.9 Select Multiplexing

::: TIP Select multiplexing instructions

  • 1, solve if achannelThe program will block immediately and cannot receive the second eventchannelIn the event
  • 2, andswitchThe statements are slightly similar, and there are severalcaseAnd the lastdefaultChoices,
  • 3. Every onecaseRepresents a communication operation (in achannelAnd will contain a block of statements, multiple statementscaseI’ll pick the one that works
  • 4,defaultIs executed by default, so it can be used as pollingchannelTo use
  • 5. A receive expression may contain only the receive expression itself, or it may contain a short variable declaration
  • 6,selectWill wait forcaseThere is something that can be executedcaseAfter execution, no other communication will be executed
  • 7. NothingcasetheselectWill wait forever, writingselect{}
  • 8. For onenilthechannelSend and receive operations block forever
  • 9, inselectIn-statement operationnilthechannelWill never beselectto

: : :

5.9.1 Specific Format

select{
    case <-ch1:
        ...
    case data := <-ch2:
        ...
    case ch3<-data:
        ...
    default: Default action}Copy the code

5.9.2 the select, for example,

package main

import (
	"fmt"
)

func main(a) {
	// Define a buffer channel of size 1
	ch := make(chan int.1)
	for i := 0; i < 10; i++ {
		//select multiplexing
		//1. The first case blocks, and the second case executes 0 to send to channel
		//2. The first case prints the value in the channel, and the second case blocks
		//3. The first execution blocks, and the second execution 2 is sent to channel cross execution
		select {
		case x := <-ch:
			fmt.Println(x)
		case ch <- i:
	    default:
			fmt.Println("Operation performed when none of the case conditions are met")}}}Copy the code

The output

0
2
4
6
8
Copy the code

5.10 the channel to summarize

  • 1 channelIs used forgoroutinemessage-passing
  • 2 channelchannel, each with an associated data type,nil chanUnavailable, similar tonil mapCannot store key-value pairs directly
  • 3 Using channels to transfer data:<-According to the arrow method for data transfer
  • Four blocks:
  • 4.1 Sending Data:chan <- data, blocked until the other onegoroutineRead data to unblock
  • 4.2 Reading Data:data <- chan, blocked until the other onegoroutineWrite out data to unblock
  • 5 itselfchannelIt’s synchronous, which means there can only be one at a timegoroutineTo operate
  • Six channels aregoroutineBetween the connection, so the channel send and receive must be at differentgoroutineIn the.
  • 7 channelThe common exceptions are summarized as follows:

6 sync.Once

Sync. Once indicates that the function is executed only Once. To do this, you need two things :: tip sync.once Introduction

  • 1) counter, counting The Times of function execution;
  • 2) Thread safety, guarantee in multiplegoroutineIn the case of a lock, the function is still executed only once.

: : :

6.1 sync. Once the source code

import (
   "sync/atomic"
)

// Once is an object that will perform exactly one action.
type Once struct {
   m    Mutex
   done uint32
}
// Do calls the function f if and only if Do is being called for the
// first time for this instance of Once. In other words, given
// var once Once
// if once.Do(f) is called multiple times, only the first call will invoke f,
// even if f has a different value in each invocation. A new instance of
// Once is required for each function to execute.
//
// Do is intended for initialization that must be run exactly once. Since f
// is niladic, it may be necessary to use a function literal to capture the
// arguments to a function to be invoked by Do:
// config.once.Do(func() { config.init(filename) })
//
// Because no call to Do returns until the one call to f returns, if f causes
// Do to be called, it will deadlock.
//
// If f panics, Do considers it to have returned; future calls of Do return
// without calling f.
//
func (o *Once) Do(f func(a)) {
   if atomic.LoadUint32(&o.done) == 1 {
      return
   }
   // Slow-path.
   o.m.Lock()
   defer o.m.Unlock()
   if o.done == 0 {
      defer atomic.StoreUint32(&o.done, 1)
      f()
   }
}
Copy the code

6.2 Do method

The Do method is fairly simple, but there are lessons to be learned. If I write in general, I’ll just lock it first, and then compare the number of times the function is executed. And here you can use atoms to improve performance.

Some flag bits can be represented by atomic operations to avoid locking and improve performance. The Do method has the following characteristics

  • First of all, the load function is executed the number of times, if it has been executed, return
  • lock
  • Executive function
  • The atomic store function is executed 1 times
  • unlock

For example, a 6.3

package main

import (
	"fmt"
	"sync"
	"time"
)

var once sync.Once
var onceBody = func(a) {
	fmt.Println("Only once")}func main(a) {

	for i := 0; i < 10; i++ {
		go func(i int) {
			once.Do(onceBody)
			fmt.Println("i=",i)
		}(i)
	}
	time.Sleep(time.Second) // Sleep 1s is used to execute the go process. Note that sleep time is not too short} -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- the output -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - Only once I =0
i= 1
i= 2
i= 4
i= 5
i= 6
i= 3
i= 7
i= 8
i= 9
Copy the code

As you can see from the output, although the for loop is called every timeonce.Do()Methods, but functionsonceBody()It only gets executed once

7 sync.Map

The thread-safe Map in GO is sync.map. In the case of single coroutine access, we can use map, but in the case of concurrent access of multiple coroutines, we need to use the coroutine safe sync. map, and the native map will panic seriously during concurrent read and write.

Sync.map aims for better performance and stability. The implementation is mainly aimed at reading more than writing, so the write performance is relatively mediocre. Sync.map source code interpretation

7.1 Overall structure of Sync. Map

7.2 Sync. Map structure Description

type Map struct {
    mu Mutex
    read atomic.Value // readOnly
    dirty map[interface{}]*entry
    misses int
}
Copy the code

::: tip sync.map Property description

  • mumapInternally held locks to synchronize operations between coroutines.
  • readContain part ofmapThe coroutine is secure (with or without locking).readBecause it’s an atomic variable, it’s inherently coroutine safe.readStored in theentryCan be in nomuUpdate concurrently, but the update needs to be deleted before it is updatedentryCopied to thedirtyIn, and can be held inmuIn the case of recovery.
  • dirtyIt also preserves part of itmapInformation required for operationmuTo ensure thatdirtyCan be quickly upgraded toread mapIt also includesread mapAll undeleted entries in.
  • The deletedentryNot stored in thedirtyIn the.clean mapDeleted fromentryMust be recoverable and stored to thedirtyIn the.
  • missed The record is deadreadThe number of times.
  • entry It holds the value of a pointer to the data, but with two special valuesnil&expunged.nilSaid inread“Has been deleted, butdirtyIs still there, so you can update the value directly,expungedRepresents the data inditry“Has been deleted, so update this value firstentryCopied to thedirty.

: : :

7.3 Sync. Map Common Operation Functions

  • Storesavekey,value
  • LoadOrStoretake&Save – see the code
  • LoadtakekeyThe correspondingvalue
  • RangeGo through all of themkey.value
  • Deletedeletekey, and itsvalue

7.4 Interpretation of operation function source code

As for the operation functions, there are only three core functions of write, read and delete:

Write a function

// Store sets the value for a key.
func (m *Map) Store(key, value interface{}) {
    // Check to see if the element already exists. If so, update the value directly via entry in read.
    read, _ := m.read.Load().(readOnly)
    if e, ok := read.m[key]; ok && e.tryStore(&value) {
    // tryStore uses atomic CAS to resolve conflicts. If data is set to expung, tryStore does not write data and returns false
        return
    }
P ==expunged = nil, copy entry to dirty, write value to nil; Otherwise, write the value directly without substitution. 3. Not present in dirty: If dirty is empty, copy "read" into dirty and write the new value to dirty. DirtyLocked () is called when copied, and elements that are nil in read are updated to expunged when copied to dirty, and are not copied to dirty. * * /
    m.mu.Lock()
    read, _ = m.read.Load().(readOnly)
    if e, ok := read.m[key]; ok {
        if e.unexpungeLocked() {
            m.dirty[key] = e
        }
        e.storeLocked(&value)
    } else if e, ok := m.dirty[key]; ok {
        e.storeLocked(&value)
    } else {
        if! read.amended { m.dirtyLocked() m.read.Store(readOnly{m: read.m, amended:true})
        }
        m.dirty[key] = newEntry(value)
    }
    m.mu.Unlock()
}
Copy the code

Read function

func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
// 1
    read, _ := m.read.Load().(readOnly)
    e, ok := read.m[key]
    if! ok && read.amended {// 2, lock read dirty
        m.mu.Lock()
        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key]
        if! ok && read.amended { e, ok = m.dirty[key]// Call missLocked, incrementing misses, and if misses>len(dirty), then raise dirty to read and clear the original dirty
            m.missLocked()
        }
        m.mu.Unlock()
    }
    if! ok {return nil.false
    }
    return e.load()
}
Copy the code

delete

// Delete deletes the value for a key.
func (m *Map) Delete(key interface{}) {
// Check if read exists
    read, _ := m.read.Load().(readOnly) 
    e, ok := read.m[key]
    if! ok && read.amended { m.mu.Lock()/ / lock
        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key] / / double check
        if! ok && read.amended {delete(m.dirty, key) // Delete data in dirty if not directly
        }
        m.mu.Unlock()
    }
    if ok {
        e.delete(a)// If present, pointer in read is set to nil and dirty data is deleted}}Copy the code

7.5 sync. The example of the Map

package main
 
import (
    "fmt"
    "sync"
)
 
func main(a) {
    var m sync.Map
 
    //Store
    m.Store(1."a")
    m.Store(2."b")
 
    //LoadOrStore
    // If the key does not exist, store the key and value, and return false and the input value
    v,ok := m.LoadOrStore("1"."aaa")
    fmt.Println(ok,v) //false aaa
 
    // If the key already exists, return true and the value corresponding to the key without modifying the original value
    v,ok = m.LoadOrStore(1."aaa")
    fmt.Println(ok,v) //false aaa
 
    //Load
    v,ok = m.Load(1)
    if ok{
        fmt.Println("it's an existing key,value is ",v)
    } else {
        fmt.Println("it's an unknown key")}//Range
    // Iterate over sync.map and ask for a func as a parameter
    f := func(k, v interface{}) bool {
        // The input and output types of this function are fixed and cannot be changed
        // You can write your own code inside the function body, calling k,v in map
 
            fmt.Println(k,v)
            return true
        }
    m.Range(f)
 
    //Delete
    m.Delete(1)
    fmt.Println(m.Load(1))}Copy the code

8 Atomic operation

Atomic operations are operations that cannot be interrupted in progress. That is, while an atomic operation on a value is being performed, the CPU will never perform another operation on that value. This is true whether these other operations are atomic or not. To achieve this rigor, atomic operations are represented and performed only by a single CPU instruction. Only in this way can we guarantee the absolute safety of atomic operation in concurrent environment.

The atomic operations provided by the Go language are non-invasive. They are represented by a number of functions in the library code package Sync/Atomic. We can call these functions to perform atomic operations on several simple types of values.

8.1 Types of atomic operations

The value can be int32, INT64, uint32, uint64, uintptr, and unsafe.Pointer

8.2 What are the atomic operations

There are 5 types, namely: Add or subtract Add, CompareAndSwap CompareAndSwap, Swap Swap, Load, Store. Sync/Atomic package API details

8.3 Atomic Operation Examples

The following example is used to compare the performance of mutex and atomic operations

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)

type Counter interface {
	Inc()
	Load() int64
}

/ / normal version
type CommonCounter struct {
	counter int64
}

func (c CommonCounter) Inc(a) {
	c.counter++
}

func (c CommonCounter) Load(a) int64 {
	return c.counter
}

// Mutex version
type MutexCounter struct {
	counter int64
	lock    sync.Mutex
}

func (m *MutexCounter) Inc(a) {
	m.lock.Lock()
	defer m.lock.Unlock()
	m.counter++
}

func (m *MutexCounter) Load(a) int64 {
	m.lock.Lock()
	defer m.lock.Unlock()
	return m.counter
}

// Atomic operation version
type AtomicCounter struct {
	counter int64
}

func (a *AtomicCounter) Inc(a) {
	atomic.AddInt64(&a.counter, 1)}func (a *AtomicCounter) Load(a) int64 {
	return atomic.LoadInt64(&a.counter)
}

func test(c Counter) {
	var wg sync.WaitGroup
	start := time.Now()
	for i := 0; i < 1000; i++ {
		wg.Add(1)
		go func(a) {
			c.Inc()
			wg.Done()
		}()
	}
	wg.Wait()
	end := time.Now()
	fmt.Println(c.Load(), end.Sub(start))
}

func main(a) {
	c1 := CommonCounter{} // Non-concurrent security
	test(c1)
	c2 := MutexCounter{} // Use mutex for concurrency security
	test(&c2)
	c3 := AtomicCounter{} // Concurrency is secure and more efficient than mutexTest (& c3)} -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- the output -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -0 2.0009ms
1000 1.0007ms
1000 0s
Copy the code