“This is the 10th day of my participation in the November Gwen Challenge. See details of the event: The Last Gwen Challenge 2021”.
Basic concepts needed for concurrent programming
::: Tip Basic concepts
- What is serial?
- What is parallelism?
- What is concurrency?
- What is a program?
- What is a process?
- What is a thread?
- What is a coroutine?
: : :
1.1 What is Serial?
In a computer, only one instruction can be executed on a CPU at a time, and subsequent instructions must wait until the previous instruction has been executed. Serial is to execute in order, just like the bank has only one window, there are three people to do things, so you have to queue up, only the front of the people finish to leave, you can turn
1.2 What is parallelism?
In a computer, multiple instructions are executed on multiple cpus at the same time, called parallelism. Parallelism is simultaneous execution, just like a bank has three Windows, three people to do things, just need to go to the empty window to do things immediately.
1.3 What is concurrency?
In a computer, only one instruction can be executed on a CPU at a time, but the CPU will quickly poll multiple instructions for execution, which is called concurrency. Concurrency is pseudo-parallelism, just like the bank has only one window, there are three people to do things, so when it is not the turn of the person behind, the person behind can use slippers to queue up first, go to have breakfast, buy something, feel almost to come back to do things
1.4 The difference between concurrency and parallelism
- Multithreaded programs running on a single core are concurrent
- Multithreaded programs running on multiple cores are parallel
1.5 What is a Program?
A program is a binary file that is compiled and stored on disk, occupying disk space but not system resources
1.6 What is a process?
A process is a basic unit of system resource allocation and scheduling. The following is an example
- Start notepad and create a Notepad process in your system
- Starting Notepad again creates a Notepad process in the system
1.7 What is a thread?
A thread is an instance of execution in a process. It is the smallest unit of program execution. It is a basic unit smaller than a process that can run independently. For example, the following
- Start this thunderbolt thing
The program
, the system will create oneThunderbolt process
And there is one by defaultThe main thread
, used to execute the default Xunlei business logic - When we use thunderbolt download
Multiple tasks
You will find that multiple tasks are presentAt the same time download
In order to be able toAt the same time to perform
Download operation, thunderbolt will create multiple threads, will be different download tasks in different threads to execute
1.8 Summary of Processes and threads
- Process is a program in the operating system of a process of execution, is the system for resource allocation and scheduling of the basic unit
- A thread is an execution instance of a process. It is the smallest unit of program execution. It is a basic unit smaller than a process that can run independently.
- A process can create multiple threads of nuclear destruction, and multiple threads in the same process can execute concurrently
- A program has at least one process, and a process has at least one thread
1.9 What is a coroutine?
- Coroutine is a user – mode lightweight thread, also known as microthread, English name Coroutine
- The biggest advantage of coroutines over traditional system-level processes and threads is that they are “lightweight “. Tens of thousands can be easily created without depleting system resources. Threads and processes rarely exceed 10,000. This is why coroutines are called “lightweight threads”
- There can be any number of coroutines in a thread, but only one can be running at any one time, and multiple coroutines share the computer resources allocated by the thread
- In coroutines, calling a task is like calling a function, consuming few system resources, but achieving the same concurrency effect as a process or thread
Goroutine Quick start
::: Danger Goroutine features
- The big advantage of coroutines over traditional system-level threads and processes is that they are “lightweight” and can easily be created in the millions without depleting system resources, and threads and processes are usually no more than 10,000. This is why coroutines are also called lightweight threads.
- Golang natively supports concurrent programming
- Lightweight thread
- Non-preemptive multitasking in which the coroutine voluntarily relinquishes control
- Multitasking at the compiler/interpreter/virtual machine level
- Multiple coroutines may run on one or more threads
: : :
A Go thread can have multiple coroutines on it. You can think of coroutines as lightweight threads optimized by the compiler
2.1 use goroutine
The use of goroutine in the Go language is very simple. You can create a Goroutine for a function by preempting the Go keyword when calling the function.
A goroutine must correspond to a function, and multiple Goroutines can be created to perform the same function
2.2 Starting a Single Goroutine
The way to start Goroutine is as simple as adding the go keyword in front of the functions called (normal and anonymous)
- Goroutine --Go implementation of coroutines -go+ function name: Starts a coroutine execution functionCopy the code
For example, the following
package main
import (
"fmt"
"time"
)
// Define a function helloGoroutine
func helloGoroutine(a) {
fmt.Println("helloGoroutine ")}func main(a) {
// Start a coroutine execution function
go helloGoroutine()
fmt.Println("Execute main")
// To avoid the program exiting immediately after concurrent execution, sleep 2 seconds
time.Sleep(2)}Copy the code
Output result:
Execute the main function helloGoroutineCopy the code
2.3 Starting Multiple Goroutines
Start multiple Goroutine + named functions
package main
import (
"fmt"
"time"
)
// Define a function helloGoroutine
func helloGoroutine(x int) {
fmt.Println("helloGoroutine ", x)
}
func main(a) {
for i := 1; i < 10; i++ {
// Start a coroutine execution function
go helloGoroutine(i)
}
// To avoid the program exiting immediately after concurrent execution, sleep 2 seconds
time.Sleep(2)}Copy the code
The order of the results of the execution changes
helloGoroutine 2
helloGoroutine 3
helloGoroutine 4
helloGoroutine 1
helloGoroutine 7
helloGoroutine 6
helloGoroutine 5
helloGoroutine 8
helloGoroutine 9
Copy the code
Use sync.waitGroup to synchronize goroutine
package main
import (
"fmt"
"sync"
)
var wg sync.WaitGroup
// Define a function hello
func hello(i int) {
defer wg.Done() // When goroutine is finished, register -1
fmt.Println("Hello Goroutine!", i)
}
func main(a) {
for i := 0; i < 10; i++ {
wg.Add(1) // Start a goroutine and register +1
// Start a coroutine execution function
go hello(i)
}
wg.Wait() // Wait for all registered goroutines to finish
}
Copy the code
Sync. WaitGroup instructions
3 GPM model
GPM is the implementation of Go language runtime level and a scheduling system implemented by Go language itself. Different from operating system scheduling OS threads.
Learn more ::: tip Explains what the GMP model means
- M structure is Machine, system thread, it is managed by the operating system, goroutine is running on M; M is a large structure that maintains a small object memory cache (McAche), a goroutine currently executed, a random number generator, and much more
- The P structure is the Processor, and its main purpose is to execute a Goroutine. It maintains a Goroutine queue, a runqueue. Let’s go from N:1 scheduling to the important part of M:N scheduling.
- G is the core structure of a Goroutine implementation. It contains the stack, instruction Pointers, and other information important to scheduling a Goroutine, such as the channel it blocks.
: : :
Note: The number of processors is set to the value of the environment variable GOMAXPROCS at startup or through the run time scheduling function GOMAXPROCS(). A fixed number of processors means that only GOMAXPROCS threads are running go code at any one time
3.1 Scenario Analysis
We use triangles, rectangles, and circles to represent the Machine Processor and Goroutine respectively.
3.2 Under normal conditions
All goroutines run in the same M system thread, and each M system thread maintains one Processor. At any given time, only one Goroutine exists in one Processor, and all other goroutines wait in a Runqueue. After a Goroutine has run its time slice, it cedes context and returns to the RunQueue. In a multi-core Processor scenario, each M system thread holds one Processor in order to run goroutines.
If both MS are running on the same CPU, this is concurrency; If two Ms are running on different cpus, this is parallel. The scheduler normally follows this process, assigning P to the next G when the goroutine’s time slice ends, but the thread blocks.
3.3 Thread Blocking
When the running Goroutine (G0) blocks, such as a system call, another system thread (M1) is created, the current M0 thread abandons its Processor (P), and P runs in the new thread.
3.4 RunQueue The execution is complete
When the runqueue of one of the processors is empty and there is no goroutine to dispatch, it steals half of the Goroutine from the other context.
First, a G object is created, and the G object is saved to the P local queue or global queue. P now wakes up an M. P continues its execution. M looks for a free P and moves the G object to itself if it does. Next M executes a scheduling loop (call G object -> execute -> clean up thread – continue to find a new Goroutine to execute).
Context switching can occur at any time during M execution. When a switchover occurs, you need to protect the execution site so that the execution site can be recovered when the execution is scheduled next time. The stack of THE Go scheduler M is stored on the G object. You only need to save the registers (SP, PC, etc.) required by M to the G object to achieve on-site protection. When the register data is secured, it is ready to do context switching, saving the scene before interrupting. If the task G is not finished, M can throw the task back to the task queue of P and wait for the next scheduled execution. When it is scheduled to execute again, M performs on-site recovery by accessing THE vdsoSP and vdsoPC registers of G (continuing execution from last interruption).
3.5 Simple EXAMPLES of GMP
package main
import (
"fmt"
"runtime"
"sync"
)
var wg sync.WaitGroup
func a(a) {
defer wg.Done() // When goroutine is finished, register -1
for i:=0; i<10; i++ { fmt.Println("A=",i)
}
}
func b(a) {
defer wg.Done() // When goroutine is finished, register -1
for i:=0; i<10; i++ { fmt.Println("B=",i)
}
}
func main(a) {
// Get the number of logical cpus on the local machine
cpu := runtime.NumCPU()
// Sets the maximum number of cpus that can be executed simultaneously
runtime.GOMAXPROCS(cpu- 1)
// Start one goroutine and register +1
wg.Add(2)
// Start two goroutines execute a() and b() respectively
go a()
go b()
wg.Wait() // Wait for all registered goroutines to finish
}
Copy the code
The running results are as follows:
B= 0
B= 1
B= 2
A= 0
A= 1
A= 2
A= 3
A= 4
A= 5
A= 6
A= 7
A= 8
A= 9
B= 3
B= 4
B= 5
B= 6
B= 7
B= 8
B= 9
Copy the code
4 Case Analysis
Calculate the factorial of each number from 1 to 20, and put the factorial of each number into the map, and finally display it, using Goroutine
package main
import (
"fmt"
)
//1. Map should be global
var(
myMap = make(map[int]int.20))// the test function evaluates n! , and put the results into myMap
func test(n int){
res :=1
for i := 1; i <= n; i++{
res *= i
}
// Put res into myMap
myMap[n] = res
}
func main(a) {
// Open multiple coroutines to complete the task
for i:=1; i<=20; i++{go test(i)
}
//time.Sleep(time.Second)
for i,v:= range myMap{
fmt.Printf("Factorial: %d! =%d\n",i,v)
}
}
Copy the code
5 channel
5.1 Why is channel needed
It doesn’t make sense to simply execute functions concurrently. Functions need to exchange data to make sense of executing functions concurrently.
Although shared memory can be used for data exchange, there are different Goroutines within the share that are prone to race problems. In order to ensure the correctness of data exchange, mutex must be used to lock the memory, which inevitably causes performance problems.
5.2 The concurrency model is CSP
::: Tip Channel Introduction A channel is a channel through which goroutine communicates. Data is sent from one end to the other and received over the channel. The concurrency model of Go language is CSP (Communicating parallel Processes), which advocates communication through shared memory rather than through shared memory. : : :
5.3 Channel Type Definition
A channel is a special type in the Go language. A channel, like a conveyor belt or queue, always follows a First In First Out rule, ensuring the order In which data is sent and received. Each channel is a conduit of a concrete type, which requires the element type to be specified when declaring a channel. A channel is a type, a reference type. The format for declaring channel types is as follows
varvariablechanThe element typeCopy the code
Take a chestnut
var ch1 chan int // Declare a channel for passing integers
var ch2 chan bool // Declare a channel for passing booleans
var ch3 chan []int // Declare a channel to pass int slices
Copy the code
5.4 Creating a Channel
The zero value of the channel is nil. Nil channels make no sense, so channels must be defined in a similar way to maps and slicing. Create a channel in the following format: The buffer size of a channel is optional.
make(chanElement type, [buffer size])Copy the code
5.5 the channel operations
A channel has three operations: send, receive, and close.
Both send and receive use the <- symbol.
5.5.1 Define an example channel
ch := make(chan int.10)
Copy the code
5.5.2 send
Sends a value to the channel
ch <- 10 // send 10 to channel CH
Copy the code
5.5.3 receive
Receives values from a channel.
x := <- ch // Receive a value from channel CH and assign it to variable x
<-ch // Receive the value from channel CH, ignoring the result
Copy the code
5.5.4 closed
We close the channel by calling the built-in close function. One thing to note about closing a channel is that you only need to close a channel if you notify the receiver goroutine that all data has been sent. Channels can be reclaimed by the garbage collection mechanism, which is not the same as closing files. Closing files after an operation is required, but closing channels is not required.
close(ch)
Copy the code
5.5.5 Characteristics of the Closed Channel
- Sending values to a closed channel can cause panic.
- Receiving on a closed channel fetches values until the channel is empty.
- A receive operation on a closed channel with no value results in a zero value of the corresponding type.
- Closing a closed channel can cause panic.
5.6 Unidirectional Channel
The Go language’s type system provides single-directional channel types, which, as the name implies, can only be used to send or receive data. A channel must support both reading and writing, or it cannot be used at all.
5.6.1 Declaration format of unidirectional Channels
When passing a channel variable to a function, we can specify it as a one-way channel variable to limit the operations that can be performed on that channel, such as only writing to or reading from that channel. The declaration of the one-way channel variable is very simple. The channel type that can only be sent is chan<- and the channel type that can only be received is <-chan, in the following format:
varChannel instancechan<- Element type// Only channels can be sent
varChannel instance <-chanThe element type// Can only receive channels
Copy the code
- Element type: The element type contained in the channel.
- Channel instance: declared channel variable.
var ch1 chan int //ch1 is a normal channel, not one-way
var ch2 chan <- float64 // CH2 is a one-way channel and is only used to write data in Float64
var ch3 <- chan int //ch3 is a one-way channel that only reads int data
ch4 := make(chan int.10) // Define and initialize a normal channel
send := make(chan<- int.10) // Define and initialize a just send channel
receive := make( <-chan int ,10) // Define and initialize a mere receive channel
Copy the code
5.6.2 Common Error Examples
package main
func main(a) {
// Create a channel, two-way
ch := make(chan int)
// Define a one-way write-only channel
var writech chan <- int = ch
// An error will be reported if it is written as follows
<-writech
// Define a one-way, read-only channel
var readch <- chan int = ch
// The following is a problem
readch<- 555.
// All the following will compile normally
writech <- Awesome!
<- readch
// One-way cannot be converted to bidirectional
var ch2 chan int = writech
}
Copy the code
5.6.3 Producer-consumer
package main
import "fmt"
// indicates that messages can only be sent to a channel, but not received
func producer(out chan<- int) {
// Loop to send I squared to channel out
for i := 0; i <= 10; i++ {
out <- i * i
}
// Close the channel
close(out)
}
// indicates that messages can only be sent to a channel
func consumer(in <-chan int) {
// Loop to read data for channel IN
for num := range in {
fmt.Println("num = ", num)
}
}
func main(a) {
// Create a two-way channel ch
ch := make(chan int.10)
// Producer, produce number, write channel
go producer(ch) //channel passes parameters and references
// The consumer reads the number from the channel and prints it
consumer(ch)
}
Copy the code
The following output is displayed:
num = 0
num = 1
num = 4
num = 9
num = 16
num = 25
num = 36
num = 49
num = 64
num = 81
num = 100
Copy the code
5.7 Simple Example for Channel
Example a
package main
import "fmt"
func main(a) {
ch1:=make(chan int.10) // Define an int channel with a buffer
ch1<- 10 // send 10 to ch1
fmt.Println(ch1) // 0xc000082000
x:=<-ch1 // The variable x receives the value from channel ch1
fmt.Println("The value of variable x taken out of channel CH1 is,x) // The variable x fetched from channel ch1 is 10
close(ch1) // Close the channel
}
Copy the code
Example 2
- Start a
goroutine
, generates 100 numbers and sends it to CH1- Start a
goroutine
Take the value from CH1, square it and put it in ch2- In main, print the values in ch2
package main
import (
"fmt"
"sync"
)
var (
ch1 = make(chan int.100) // Declare two pipes with buffers of 100
ch2 = make(chan int.100)
wg sync.WaitGroup // Wait for the end of a set of coroutines called goroutines
)
// Generate 100 numbers and send it to ch1
func write(a) {
defer wg.Done()
for i := 0; i < 100; i++ {
ch1 <- i
}
close(ch1) Goroutine XX [chan receive]:
}
// Take the value from ch1 and square it to ch2
func read(a) {
defer wg.Done() // When goroutine is finished, register -1
for {
x, ok := <-ch1
if! ok {// Check whether the channel is closed, closing the exit for loop
break
}
val := x * x // If you do not exit the for loop, calculate its square value and place it in ch2
ch2 <- val
}
close(ch2) Goroutine XX [chan receive]:
}
func main(a) {
wg.Add(2) // Start two Goroutine registrations +2
go write() // Start a coroutine execution function write()
go read() // Start a coroutine execution function read()
wg.Wait() // Wait for all registered goroutines to finish
// Use for range to read values in channel CH2
for x := range ch2 {
fmt.Printf("Value read as %d\n", x) // Print the output}}Copy the code
5.8 Worker Pool (Goroutine Pool)
Provide a pool of goroutines, with each goroutine loop blocked waiting for tasks to be executed from the pool; If external users continue to throw tasks into the task pool, multiple Goroutines in the goroutine pool will concurrently process those tasks. Learn more about the Goroutine pool
5.8.1, for example, a
package main
import (
"fmt"
"sync"
"time"
)
var wg sync.WaitGroup
func worker(id int, jobs <-chan int, results chan<- int) {
defer wg.Done()
for j := range jobs {
fmt.Printf("Worker :%d start job:%d\n", id, j)
time.Sleep(time.Second)
fmt.Printf("Worker :%d job:%d\n", id, j)
results <- j * 2}}func main(a) {
jobs := make(chan int.100)
results := make(chan int.100)
// Open 3 Goroutines
wg.Add(3)
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
// 5 tasks
for j := 1; j <= 5; j++ {
jobs <- j
}
close(jobs)
// Output the result
wg.Wait()
}
Copy the code
5.8.1 example 2
Use Goroutine and channel to implement a calculation of int64 random number bits and procedures.
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
var wg sync.WaitGroup
// Computes the sum of bits of a random 64-bit number
func randNumber(x int64) int64 {
var sum int64 = 0
for x > 0 {
a := x % 10
x = x / 10
sum += a
}
return sum
}
// Generate int64 random number into channel CH1
func createRand(ch1 chan<- int64) {
for {
int63 := rand.Int63()
ch1 <- int63
time.Sleep(1)}}// Read data from channel CH1, then calculate the sum of the digits and store it in CH2
func readRand(ch1 <-chan int64, ch2 chan<- int64) {
for {
value := <-ch1
number := randNumber(value)
ch2 <- number
fmt.Println(value, number)
}
}
func main(a) {
var jobChan = make(chan int64.100)
var resultChan = make(chan int64.100)
wg.Add(25)
go createRand(jobChan)
for i := 0; i < 24; i++ {
go readRand(jobChan, resultChan)
}
// The sum of bits in the random spanning tree
for value := range resultChan {
fmt.Println(value)
}
wg.Wait()
}
Copy the code
5.9 Select Multiplexing
::: TIP Select multiplexing instructions
- 1, solve if a
channel
The program will block immediately and cannot receive the second eventchannel
In the event - 2, and
switch
The statements are slightly similar, and there are severalcase
And the lastdefault
Choices, - 3. Every one
case
Represents a communication operation (in achannel
And will contain a block of statements, multiple statementscase
I’ll pick the one that works - 4,
default
Is executed by default, so it can be used as pollingchannel
To use - 5. A receive expression may contain only the receive expression itself, or it may contain a short variable declaration
- 6,
select
Will wait forcase
There is something that can be executedcase
After execution, no other communication will be executed - 7. Nothing
case
theselect
Will wait forever, writingselect{}
- 8. For one
nil
thechannel
Send and receive operations block forever - 9, in
select
In-statement operationnil
thechannel
Will never beselect
to
: : :
5.9.1 Specific Format
select{
case <-ch1:
...
case data := <-ch2:
...
case ch3<-data:
...
default: Default action}Copy the code
5.9.2 the select, for example,
package main
import (
"fmt"
)
func main(a) {
// Define a buffer channel of size 1
ch := make(chan int.1)
for i := 0; i < 10; i++ {
//select multiplexing
//1. The first case blocks, and the second case executes 0 to send to channel
//2. The first case prints the value in the channel, and the second case blocks
//3. The first execution blocks, and the second execution 2 is sent to channel cross execution
select {
case x := <-ch:
fmt.Println(x)
case ch <- i:
default:
fmt.Println("Operation performed when none of the case conditions are met")}}}Copy the code
The output
0
2
4
6
8
Copy the code
5.10 the channel to summarize
- 1
channel
Is used forgoroutine
message-passing - 2 channel
channel
, each with an associated data type,nil chan
Unavailable, similar tonil map
Cannot store key-value pairs directly - 3 Using channels to transfer data:
<-
According to the arrow method for data transfer - Four blocks:
- 4.1 Sending Data:
chan <- data
, blocked until the other onegoroutine
Read data to unblock - 4.2 Reading Data:
data <- chan
, blocked until the other onegoroutine
Write out data to unblock - 5 itself
channel
It’s synchronous, which means there can only be one at a timegoroutine
To operate - Six channels are
goroutine
Between the connection, so the channel send and receive must be at differentgoroutine
In the. - 7
channel
The common exceptions are summarized as follows:
6 sync.Once
Sync. Once indicates that the function is executed only Once. To do this, you need two things :: tip sync.once Introduction
- 1) counter, counting The Times of function execution;
- 2) Thread safety, guarantee in multiple
goroutine
In the case of a lock, the function is still executed only once.
: : :
6.1 sync. Once the source code
import (
"sync/atomic"
)
// Once is an object that will perform exactly one action.
type Once struct {
m Mutex
done uint32
}
// Do calls the function f if and only if Do is being called for the
// first time for this instance of Once. In other words, given
// var once Once
// if once.Do(f) is called multiple times, only the first call will invoke f,
// even if f has a different value in each invocation. A new instance of
// Once is required for each function to execute.
//
// Do is intended for initialization that must be run exactly once. Since f
// is niladic, it may be necessary to use a function literal to capture the
// arguments to a function to be invoked by Do:
// config.once.Do(func() { config.init(filename) })
//
// Because no call to Do returns until the one call to f returns, if f causes
// Do to be called, it will deadlock.
//
// If f panics, Do considers it to have returned; future calls of Do return
// without calling f.
//
func (o *Once) Do(f func(a)) {
if atomic.LoadUint32(&o.done) == 1 {
return
}
// Slow-path.
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}
Copy the code
6.2 Do method
The Do method is fairly simple, but there are lessons to be learned. If I write in general, I’ll just lock it first, and then compare the number of times the function is executed. And here you can use atoms to improve performance.
Some flag bits can be represented by atomic operations to avoid locking and improve performance. The Do method has the following characteristics
- First of all, the load function is executed the number of times, if it has been executed, return
- lock
- Executive function
- The atomic store function is executed 1 times
- unlock
For example, a 6.3
package main
import (
"fmt"
"sync"
"time"
)
var once sync.Once
var onceBody = func(a) {
fmt.Println("Only once")}func main(a) {
for i := 0; i < 10; i++ {
go func(i int) {
once.Do(onceBody)
fmt.Println("i=",i)
}(i)
}
time.Sleep(time.Second) // Sleep 1s is used to execute the go process. Note that sleep time is not too short} -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- the output -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - Only once I =0
i= 1
i= 2
i= 4
i= 5
i= 6
i= 3
i= 7
i= 8
i= 9
Copy the code
As you can see from the output, although the for loop is called every timeonce.Do()
Methods, but functionsonceBody()
It only gets executed once
7 sync.Map
The thread-safe Map in GO is sync.map. In the case of single coroutine access, we can use map, but in the case of concurrent access of multiple coroutines, we need to use the coroutine safe sync. map, and the native map will panic seriously during concurrent read and write.
Sync.map aims for better performance and stability. The implementation is mainly aimed at reading more than writing, so the write performance is relatively mediocre. Sync.map source code interpretation
7.1 Overall structure of Sync. Map
7.2 Sync. Map structure Description
type Map struct {
mu Mutex
read atomic.Value // readOnly
dirty map[interface{}]*entry
misses int
}
Copy the code
::: tip sync.map Property description
mu
是map
Internally held locks to synchronize operations between coroutines.read
Contain part ofmap
The coroutine is secure (with or without locking).read
Because it’s an atomic variable, it’s inherently coroutine safe.read
Stored in theentry
Can be in nomu
Update concurrently, but the update needs to be deleted before it is updatedentry
Copied to thedirty
In, and can be held inmu
In the case of recovery.dirty
It also preserves part of itmap
Information required for operationmu
To ensure thatdirty
Can be quickly upgraded toread map
It also includesread map
All undeleted entries in.- The deleted
entry
Not stored in thedirty
In the.clean map
Deleted fromentry
Must be recoverable and stored to thedirty
In the. missed
The record is deadread
The number of times.entry
It holds the value of a pointer to the data, but with two special valuesnil&expunged
.nil
Said inread
“Has been deleted, butdirty
Is still there, so you can update the value directly,expunged
Represents the data inditry
“Has been deleted, so update this value firstentry
Copied to thedirty
.
: : :
7.3 Sync. Map Common Operation Functions
Store
savekey,value
LoadOrStore
take&
Save – see the codeLoad
takekey
The correspondingvalue
Range
Go through all of themkey
.value
Delete
deletekey
, and itsvalue
7.4 Interpretation of operation function source code
As for the operation functions, there are only three core functions of write, read and delete:
Write a function
// Store sets the value for a key.
func (m *Map) Store(key, value interface{}) {
// Check to see if the element already exists. If so, update the value directly via entry in read.
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) {
// tryStore uses atomic CAS to resolve conflicts. If data is set to expung, tryStore does not write data and returns false
return
}
P ==expunged = nil, copy entry to dirty, write value to nil; Otherwise, write the value directly without substitution. 3. Not present in dirty: If dirty is empty, copy "read" into dirty and write the new value to dirty. DirtyLocked () is called when copied, and elements that are nil in read are updated to expunged when copied to dirty, and are not copied to dirty. * * /
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
m.dirty[key] = e
}
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
e.storeLocked(&value)
} else {
if! read.amended { m.dirtyLocked() m.read.Store(readOnly{m: read.m, amended:true})
}
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}
Copy the code
Read function
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
// 1
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if! ok && read.amended {// 2, lock read dirty
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if! ok && read.amended { e, ok = m.dirty[key]// Call missLocked, incrementing misses, and if misses>len(dirty), then raise dirty to read and clear the original dirty
m.missLocked()
}
m.mu.Unlock()
}
if! ok {return nil.false
}
return e.load()
}
Copy the code
delete
// Delete deletes the value for a key.
func (m *Map) Delete(key interface{}) {
// Check if read exists
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if! ok && read.amended { m.mu.Lock()/ / lock
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key] / / double check
if! ok && read.amended {delete(m.dirty, key) // Delete data in dirty if not directly
}
m.mu.Unlock()
}
if ok {
e.delete(a)// If present, pointer in read is set to nil and dirty data is deleted}}Copy the code
7.5 sync. The example of the Map
package main
import (
"fmt"
"sync"
)
func main(a) {
var m sync.Map
//Store
m.Store(1."a")
m.Store(2."b")
//LoadOrStore
// If the key does not exist, store the key and value, and return false and the input value
v,ok := m.LoadOrStore("1"."aaa")
fmt.Println(ok,v) //false aaa
// If the key already exists, return true and the value corresponding to the key without modifying the original value
v,ok = m.LoadOrStore(1."aaa")
fmt.Println(ok,v) //false aaa
//Load
v,ok = m.Load(1)
if ok{
fmt.Println("it's an existing key,value is ",v)
} else {
fmt.Println("it's an unknown key")}//Range
// Iterate over sync.map and ask for a func as a parameter
f := func(k, v interface{}) bool {
// The input and output types of this function are fixed and cannot be changed
// You can write your own code inside the function body, calling k,v in map
fmt.Println(k,v)
return true
}
m.Range(f)
//Delete
m.Delete(1)
fmt.Println(m.Load(1))}Copy the code
8 Atomic operation
Atomic operations are operations that cannot be interrupted in progress. That is, while an atomic operation on a value is being performed, the CPU will never perform another operation on that value. This is true whether these other operations are atomic or not. To achieve this rigor, atomic operations are represented and performed only by a single CPU instruction. Only in this way can we guarantee the absolute safety of atomic operation in concurrent environment.
The atomic operations provided by the Go language are non-invasive. They are represented by a number of functions in the library code package Sync/Atomic. We can call these functions to perform atomic operations on several simple types of values.
8.1 Types of atomic operations
The value can be int32, INT64, uint32, uint64, uintptr, and unsafe.Pointer
8.2 What are the atomic operations
There are 5 types, namely: Add or subtract Add, CompareAndSwap CompareAndSwap, Swap Swap, Load, Store. Sync/Atomic package API details
8.3 Atomic Operation Examples
The following example is used to compare the performance of mutex and atomic operations
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
type Counter interface {
Inc()
Load() int64
}
/ / normal version
type CommonCounter struct {
counter int64
}
func (c CommonCounter) Inc(a) {
c.counter++
}
func (c CommonCounter) Load(a) int64 {
return c.counter
}
// Mutex version
type MutexCounter struct {
counter int64
lock sync.Mutex
}
func (m *MutexCounter) Inc(a) {
m.lock.Lock()
defer m.lock.Unlock()
m.counter++
}
func (m *MutexCounter) Load(a) int64 {
m.lock.Lock()
defer m.lock.Unlock()
return m.counter
}
// Atomic operation version
type AtomicCounter struct {
counter int64
}
func (a *AtomicCounter) Inc(a) {
atomic.AddInt64(&a.counter, 1)}func (a *AtomicCounter) Load(a) int64 {
return atomic.LoadInt64(&a.counter)
}
func test(c Counter) {
var wg sync.WaitGroup
start := time.Now()
for i := 0; i < 1000; i++ {
wg.Add(1)
go func(a) {
c.Inc()
wg.Done()
}()
}
wg.Wait()
end := time.Now()
fmt.Println(c.Load(), end.Sub(start))
}
func main(a) {
c1 := CommonCounter{} // Non-concurrent security
test(c1)
c2 := MutexCounter{} // Use mutex for concurrency security
test(&c2)
c3 := AtomicCounter{} // Concurrency is secure and more efficient than mutexTest (& c3)} -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- the output -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -0 2.0009ms
1000 1.0007ms
1000 0s
Copy the code