The main attraction of the Go language is its built-in concurrency support. The theory of concurrent system of Go language is CSP (Communicating Sequential Process) proposed by C.A. Rorare in 1978. The CSP has a precise mathematical model that Hoare helped design for the T9000 general-purpose computer. From NewSqueak, Alef, Limbo and now Go, Pike, who has spent more than 20 years working with CSPS, is more concerned with the potential of CSP as a general-purpose programming language. As the core of Go concurrent programming, CSP theory has only one core concept: synchronous communication. We have covered the topic of synchronous communication in the previous section. In this section, we will briefly introduce the concurrency mode common in Go.

First, make it clear that concurrency is not parallelism. Concurrency is more concerned with the design aspect of programs. Concurrent programs can be executed sequentially and only run at the same time on true multicore cpus. Parallelism pays more attention to the running level of the program. Parallelism generally refers to a large number of simple repetitions. For example, there will be a large number of parallel operations for image processing in GPU. In order to better write concurrent programs, Go language pays attention to how to design a simple, safe and efficient abstract model at the programming language level from the beginning of design, so that programmers can focus on the decomposition of problems and combination solutions, and do not have to be distracted by the tedious operation of thread management and signal mutual exclusion.

In concurrent programming, the right to access to Shared resources requires precise control, in the present most of the language, by locking and thread synchronization scheme to solve this difficult problem, and the language is a different approach, it will be Shared value through the Channel transmission (actually more independent thread of execution rarely take the initiative to share resources). It is desirable that only one Goroutine should have the resource at any given time. Data competition is eliminated at the design level. To promote this way of thinking, the Go language turns its concurrent programming philosophy into a slogan:

Do not communicate by sharing memory; instead, share memory by communicating.

Do not communicate by sharing memory, but share memory by communicating.

This is a higher level of concurrent programming philosophy (passing values through pipes is recommended by the Go language). While simple concurrency problems like reference counting can be handled well with atomic operations or mutexes, controlling access through channels allows you to write cleaner, more accurate programs.

Concurrent version of Hello World

We’ll warm up with a simple concurrent program that prints “Hello World” in a new Goroutine, and main waits for the background thread to finish output and then exits.

The core concept of concurrent programming is synchronous communication, but there are many ways of synchronization. Let’s start with the familiar Mutex sync.mutex for synchronous communication. According to the documentation, we cannot directly unlock an unlocked sync.mutex, which would cause a runtime exception. There is no guarantee that the following will work:

func main(a) {
	var mu sync.Mutex

	go func(a){
		fmt.Println("Hello world.")
		mu.Lock()
	}()

	mu.Unlock()
}
Copy the code

Because mu.lock () and mu.unlock () are not in the same Goroutine, the sequentially consistent memory model is not satisfied. They also have no other synchronization events to refer to, and these two events are not collated, that is, they can be concurrent. Because it can be a concurrent event, mu.unlock () in main is likely to occur first while the MU mutex is still unlocked, resulting in a runtime exception.

Here is the code after the fix:

func main(a) {
	var mu sync.Mutex

	mu.Lock()
	go func(a){
		fmt.Println("Hello world.")
		mu.Unlock()
	}()

	mu.Lock()
}
Copy the code

The fix is to execute mu.lock () twice in the thread of main. The second time the Lock is locked because it is already occupied (not a recursive Lock), and the blocking state of main drives the background thread to continue executing. When the background thread reaches mu.unlock (), it unlocks the second Mu.lock () block in the main function. The background thread and the main thread have no reference to any other synchronization events, and their exit events are concurrent: The background thread may or may not have exited when main exits, causing the program to exit. It is impossible to determine when the two threads will exit, but the printing work will be done correctly.

Using sync.Mutex is a low-level approach. We now use a cache-free pipe for synchronization:

func main(a) {
	done := make(chan int)

	go func(a){
		fmt.Println("Hello world.")
		<-done
	}()

	done <- 1
}
Copy the code

According to the Go language memory model specification, receiving from an unbuffered Channel occurs before sending to that Channel is complete. Therefore, the done < -1 send from the main thread is not possible until the background thread <-done receives (and thus exits main and the program), and the printing is complete.

The code above, while correctly synchronized, is too sensitive to the cache size of the pipe: if the pipe has a cache, there is no guarantee that the background thread will print properly before main exits. It would be better to switch the direction of the pipe’s send and receive so that synchronous events are not affected by the pipe’s cache size:

func main(a) {
	done := make(chan int.1) // Pipe with cache

	go func(a){
		fmt.Println("Hello world.")
		done <- 1
	}()

	<-done
}
Copy the code

For a buffered Channel, the KTH receive completion for a Channel occurs before the K+C send completion, where C is the Channel’s cache size. Although the pipe is cached, the main thread receives when the background thread sends but has not finished, and the printing work is completed.

Based on cached pipes, we can easily scale up to N print threads. The following example opens 10 background threads to print separately:

func main(a) {
	done := make(chan int.10) // With 10 caches

	// Start N background print threads
	for i := 0; i < cap(done); i++ {
		go func(a){
			fmt.Println("Hello world.")
			done <- 1(1)}}// Wait for N background threads to complete
	for i := 0; i < cap(done); i++ {
		<-done
	}
}
Copy the code

An easy way to wait N threads for the next synchronization operation is to use sync.waitGroup to wait for a set of events:

func main(a) {
	var wg sync.WaitGroup

	// Start N background print threads
	for i := 0; i < 10; i++ {
		wg.Add(1)

		go func(a) {
			fmt.Println("Hello world.")
			wg.Done()
		}()
	}

	// Wait for N background threads to complete
	wg.Wait()
}
Copy the code

Wg.add (1) is used to increase the number of waiting events, which must be executed before the background thread starts. When the background thread has finished printing, wG.done () is called to indicate completion of an event. The wg.wait () function of main waits for all events to complete.

Producer-consumer model

The most common example of concurrent programming is the producer-consumer pattern, which increases the overall processing speed of the program by balancing the working power of the producer and consumer threads. In simple terms, the producer produces some data, puts it into the outcome queue, and the consumer retrieves it from the outcome queue. This makes production and consumption two asynchronous processes. When there is no data in the fruit queue, consumers will enter the hungry waiting; When the data in the queue is full, the producer will be laid off due to CPU deprivation caused by product extrusion.

The Go language is simple to implement producer-consumer concurrency:

// Producer: generates a sequence of integer multiples of factor
func Producer(factor int, out chan<- int) {
	for i := 0; ; i++ {
		out <- i*factor
	}
}

/ / consumer
func Consumer(in <-chan int) {
	for v := range in {
		fmt.Println(v)
	}
}
func main(a) {
	ch := make(chan int.64) // Result queue

	go Producer(3, ch) // Generate a sequence of multiples of 3
	go Producer(5, ch) // Generate a sequence of multiples of 5
	go Consumer(ch)    // Consume the queue generated

	// Exit after running for a certain time
	time.Sleep(5 * time.Second)
}
Copy the code

We started two Producer production lines, which were used to generate sequences of multiples of 3 and 5 respectively. Then start a Consumer Consumer thread and print the results. We let producers and consumers work for a certain amount of time by sleeping for a certain amount of time in the main function. As mentioned in the previous section, this hibernation approach does not guarantee stable output.

We can make the main function hold the blocking state and not exit the program until the user enters Ctrl-c:

func main(a) {
	ch := make(chan int.64) // Result queue

	go Producer(3, ch) // Generate a sequence of multiples of 3
	go Producer(5, ch) // Generate a sequence of multiples of 5
	go Consumer(ch)    // Consume the queue generated

	/ / Ctrl + C to exit
	sig := make(chan os.Signal, 1)
	signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
	fmt.Printf("quit (%v)\n", <-sig)
}
Copy the code

We have two producers in this example, and there are no synchronous events between the two producers, they are concurrent. Thus, there is no problem that the sequence of results produced by consumers is uncertain, and producers and consumers can still work together.

Publish and subscribe model

The publish-and-subscribe model is often abbreviated to the PUB /sub model. In this model, message producers become publishers and message consumers become subscribers, and the relationship between producer and consumer is M:N. In the traditional producer and consumer model, messages are sent to a queue, whereas in the publish-subscribe model, messages are published to a topic.

To this end, we built a publish-subscribe model support package called PubSub:

// Package pubsub implements a simple multi-topic pub-sub library.
package pubsub

import (
	"sync"
	"time"
)

type (
	subscriber chan interface{}         // The subscriber is a pipe
	topicFunc  func(v interface{}) bool// Theme is a filter) // publisher objecttype Publisher struct {
	m           sync.RWMutex             / / read/write locks
	buffer      int                      // The cache size of the subscription queue
	timeout     time.Duration            // Publish timeout
	subscribers map[subscriber]topicFunc // Subscriber information
}

// Build a publisher object that can set the publishing timeout and the length of the cache queue
func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher {
	return &Publisher{
		buffer:      buffer,
		timeout:     publishTimeout,
		subscribers: make(map[subscriber]topicFunc),
	}
}

// Add a new subscriber to subscribe to all topics
func (p *Publisher) Subscribe(a) chan interface{} {
	return p.SubscribeTopic(nil)}// Add a new subscriber to subscribe to the filtered topics
func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {
	ch := make(chan interface{}, p.buffer)
	p.m.Lock()
	p.subscribers[ch] = topic
	p.m.Unlock()
	return ch
}

// Exit the subscription
func (p *Publisher) Evict(sub chan interface{}) {
	p.m.Lock()
	defer p.m.Unlock()

	delete(p.subscribers, sub)
	close(sub)
}

// Publish a theme
func (p *Publisher) Publish(v interface{}) {
	p.m.RLock()
	defer p.m.RUnlock()

	var wg sync.WaitGroup
	for sub, topic := range p.subscribers {
		wg.Add(1)
		go p.sendTopic(sub, topic, v, &wg)
	}
	wg.Wait()
}

// Close the publisher object, and close all subscriber channels.
func (p *Publisher) Close(a) {
	p.m.Lock()
	defer p.m.Unlock()

	for sub := range p.subscribers {
		delete(p.subscribers, sub)
		close(sub)
	}
}

// Send the topic, which can tolerate a certain timeout
func (p *Publisher) sendTopic(
	sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup,
) {
	defer wg.Done()
	iftopic ! =nil && !topic(v) {
		return
	}

	select {
	case sub <- v:
	case <-time.After(p.timeout):
	}
}
Copy the code

In the following example, there are two subscribers who subscribe to all topics and the topics that contain “golang” :

import "path/to/pubsub"

func main(a) {
	p := pubsub.NewPublisher(100*time.Millisecond, 10)
	defer p.Close()

	all := p.Subscribe()
	golang := p.SubscribeTopic(func(v interface{}) bool {
		if s, ok := v.(string); ok {
			return strings.Contains(s, "golang")}return false
	})

	p.Publish("hello, world!")
	p.Publish("hello, golang!")

	go func(a) {
		for  msg := range all {
			fmt.Println("all:", msg)
		}
	} ()

	go func(a) {
		for  msg := range golang {
			fmt.Println("golang:", msg)
		}
	} ()

	// Exit after running for a certain time
	time.Sleep(3 * time.Second)
}
Copy the code

In the publish-subscribe model, each message is delivered to multiple subscribers. Publishers typically do not know or care which subscriber is receiving a topic message. Subscribers and publishers can be added dynamically at run time, a loose coupling that allows the complexity of the system to grow over time. In the real world, applications such as weather forecasting can apply this concurrency pattern.

Control concurrency

Many users, once accustomed to the powerful concurrency features of the Go language, tend to write programs with maximum concurrency because it seems to provide maximum performance. In reality as we are in a hurry, but sometimes we need to slow down and enjoy life, concurrent program is the same: sometimes we need to appropriately control the amount of concurrency because it not only can leave some room for other applications/task/reserve a certain amount of CPU resources, can also be appropriate to reduce power consumption to alleviate pressure of the battery.

In goDoc, there is a VFS package corresponding to the virtual file system, and a gatefs subpackage under the VFS package. The purpose of the gatefs subpackage is to control the maximum number of concurrent access to the virtual file system. The use of the Gatefs package is simple:

import (
	"golang.org/x/tools/godoc/vfs"
	"golang.org/x/tools/godoc/vfs/gatefs"
)

func main(a) {
	fs := gatefs.New(vfs.OS("/path"), make(chan bool.8))
	// ...
}
Copy the code

Vfs.os (“/path”) constructs a virtual file system based on the local file system, and gatefs.new constructs a concurrent controlled virtual file system based on the existing virtual file system. The principle of concurrency control, described in the previous section, is to achieve maximum concurrency blocking through the send and receive rules with cached pipes:

var limit = make(chan int.3)

func main(a) {
	for _, w := range work {
		go func(a) {
			limit <- 1
			w()
			<-limit
		}()
	}
	select{}}Copy the code

However, Gatefs makes an abstract type gate for this, adding the enter and leave methods corresponding to the entry and exit of concurrent code respectively. When the maximum number of concurrent requests is exceeded, the Enter method blocks until the number of concurrent requests drops.

type gate chan bool

func (g gate) enter(a) { g <- true }
func (g gate) leave(a) { <-g }
Copy the code

The new virtual file system wrapped by Gatefs simply adds the enter and leave calls to methods that need to control concurrency:

type gatefs struct {
	fs vfs.FileSystem
	gate
}

func (fs gatefs) Lstat(p string) (os.FileInfo, error) {
	fs.enter()
	defer fs.leave()
	return fs.fs.Lstat(p)
}
Copy the code

We can not only control the maximum number of concurrent applications, but also judge the concurrency rate of program execution by the ratio of the usage of cached channels to the maximum capacity. When the pipe is empty, it can be considered as idle state, and when the pipe is full, the task is busy state, which is of reference value for the running of some low-level tasks in the background.

The winner is king

There are many motivations for adopting concurrent programming: it can simplify problems, for example, it is easier to have a single processing thread for a class of problems; Concurrent programming also improves performance, as running two threads on a multi-core CPU is generally faster than running one thread. In fact, in terms of improving performance, the application is not simply running fast means that the user experience is good; In many cases, it is important that the application be able to respond quickly to user requests, and it is appropriate to handle low-priority background tasks when there are no user requests to process.

If we want to quickly search for “Golang” related topics, we might open multiple search engines like Bing, Google, or Baidu at the same time. When one search returns results first, you can close the other search pages. Because of the influence of network environment and search engine algorithm, some search engines may return search results quickly, and some search engines may wait until their company goes bankrupt and do not complete the search. We can use a similar strategy to write this program:

func main(a) {
	ch := make(chan string.32)

	go func(a) {
		ch <- searchByBing("golang")
	}()
	go func(a) {
		ch <- searchByGoogle("golang")
	}()
	go func(a) {
		ch <- searchByBaidu("golang")
	}()

	fmt.Println(<-ch)
}
Copy the code

First, we create a pipe with a cache that is large enough not to cause unnecessary blocking because of the cache size. We then start multiple background threads to submit search requests to different search engines. When either search engine has the first result, it immediately sends the result to the pipe (because the pipe has enough caching, the process does not block). But ultimately we only take the first result from the pipe, which is the first result returned.

By appropriately opening some redundant threads and trying different ways to solve the same problem, the corresponding performance of the program is improved in a winner-take way.

Prime sieve

In the “Revolution of the Hello World” section, to demonstrate the concurrency features of Newsqueak, we present an implementation of the concurrent version of prime sieve. The concurrent version of prime sieve is a classic example of concurrency that allows us to better understand the concurrency nature of the Go language. The principle of “prime sieve” is shown as follows:

[image upload failed…(image-b304D-1554179460547)]

Figure 1-13 Prime sieve

We need mister to be the original 2, 3, 4… Sequence of natural numbers (excluding leading 0 and 1) :

// Return a pipe that generates a sequence of natural numbers: 2, 3, 4...
func GenerateNatural(a) chan int {
	ch := make(chan int)
	go func(a) {
		for i := 2; ; i++ {
			ch <- i
		}
	}()
	return ch
}
Copy the code

The GenerateNatural function starts a Goroutine production sequence internally and returns the corresponding pipe.

The next step is to construct a sieve for each prime: pull out the numbers in the input sequence that are multiples of prime numbers and return the new sequence, a new pipe.

// Pipe filter: remove numbers divisible by prime numbers
func PrimeFilter(in <-chan int, prime int) chan int {
	out := make(chan int)
	go func(a) {
		for {
			ifi := <-in; i%prime ! =0 {
				out <- i
			}
		}
	}()
	return out
}
Copy the code

The PrimeFilter function also internally starts a Goroutine production sequence and returns the pipe corresponding to the filtered sequence.

Now we can drive the concurrent prime sieve in main:

func main(a) {
	ch := GenerateNatural() // Sequence of natural numbers: 2, 3, 4...
	for i := 0; i < 100; i++ {
		prime := <-ch // The new prime number
		fmt.Printf("%v: %v\n", i+1, prime)
		ch = PrimeFilter(ch, prime) // Filter constructed based on new prime numbers}}Copy the code

We call GenerateNatural() to generate the original sequence of natural numbers starting at 2. Then you start a loop of 100 iterations, hoping to generate 100 primes. At the beginning of each iteration of the loop, the first number in the pipe must be a prime, which we read and print first. Then, based on the remaining sequence in the pipe, the primes taken out at present are used as the sieve to filter the prime numbers behind. The pipes corresponding to the different prime sieve are connected in series.

Prime sifters demonstrate an elegant structure for concurrent programs. However, the overall performance of the program is not ideal because of the fine granularity of tasks handled by each concurrency body. For fine-grained concurrent programs, the messaging costs inherent in the CSP model are too high (the multithreaded concurrency model also faces the cost of thread initiation).

Concurrent safe exit

Sometimes we need to tell goroutine to stop what it’s doing, especially if it’s working in the wrong direction. The Go language does not provide a way to terminate a Goroutine directly, since this would result in shared variables between goroutines being in an undefined state. But what if we want to exit two or any number of Goroutines?

The communication and synchronization between different Goroutines in THE Go language mainly depend on pipes. To handle multiple pipe send or receive operations at the same time, we need to use the SELECT keyword (which behaves like the select function in network programming). When the select has multiple branches, a random available pipe branch is selected. If no pipe branch is available, the default branch is selected. Otherwise, the blocking state is kept.

The timeout judgment of the pipeline based on select implementation:

select {
case v := <-in:
	fmt.Println(v)
case <-time.After(time.Second):
	return / / timeout
}
Copy the code

Non-blocking pipe send or receive via the default branch of SELECT:

select {
case v := <-in:
	fmt.Println(v)
default:
	// There is no data
}
Copy the code

Prevent main from exiting by select:

func main(a) {
	// do some thins
	select{}}Copy the code

When multiple pipes are available, select randomly selects a pipe. Based on this feature we can use select to achieve a random number sequence generation program:

func main(a) {
	ch := make(chan int)
	go func(a) {
		for {
			select {
			case ch <- 0:
			case ch <- 1:}}} ()for v := range ch {
		fmt.Println(v)
	}
}
Copy the code

We can easily implement a Goroutine exit control with the select and default branches:

func worker(cannel chan bool) {
	for {
		select {
		default:
			fmt.Println("hello")
			// It works fine
		case <-cannel:
			/ / exit}}}func main(a) {
	cannel := make(chan bool)
	go worker(cannel)

	time.Sleep(time.Second)
	cannel <- true
}
Copy the code

However, there is a one-to-one correspondence between a pipe’s send and receive operations, and if you want to stop multiple Goroutines you may need to create the same number of pipes, which is too costly. We can broadcast by closing a pipe, and all operations received from the closed pipe will receive a zero value and an optional failure flag.

func worker(cannel chan bool) {
	for {
		select {
		default:
			fmt.Println("hello")
			// It works fine
		case <-cannel:
			/ / exit}}}func main(a) {
	cancel := make(chan bool)

	for i := 0; i < 10; i++ {
		go worker(cancel)
	}

	time.Sleep(time.Second)
	close(cancel)
}
Copy the code

We close the Cancel pipe to broadcast the exit instruction to multiple Goroutines. Still, the program isn’t robust enough: Each Goroutine exits with an exit command and does some cleanup, but the cleanup isn’t guaranteed to complete because the main thread doesn’t have a mechanism for waiting for each working Goroutine to exit. We can combine this with sync.waitGroup to improve:

func worker(wg *sync.WaitGroup, cannel chan bool) {
	defer wg.Done()

	for {
		select {
		default:
			fmt.Println("hello")
		case <-cannel:
			return}}}func main(a) {
	cancel := make(chan bool)

	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go worker(&wg, cancel)
	}

	time.Sleep(time.Second)
	close(cancel)
	wg.Wait()
}
Copy the code

Each worker concurrency is now created, run, paused, and exited under the safe control of the main function.

The context package

When Go1.7 was released, the standard library added a context package to simplify data, timeouts, and exits between multiple Goroutines handling a single request and the request field, which is described in a blog post. We can use the context package to re-implement the previous thread-safe exit or timeout control:

func worker(ctx context.Context, wg *sync.WaitGroup) error {
	defer wg.Done()

	for {
		select {
		default:
			fmt.Println("hello")
		case <-ctx.Done():
			return ctx.Err()
		}
	}
}

func main(a) {
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)

	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go worker(ctx, &wg)
	}

	time.Sleep(time.Second)
	cancel()

	wg.Wait()
}
Copy the code

When the concurrent body times out or Main actively stops the worker Goroutine, each worker can safely exit.

Go is a language with automatic memory reclamation features, so memory generally does not leak. In the previous PrimeFilter example, both GenerateNatural and PrimeFilter started a new Goroutine inside, and there was a risk that the background Goroutine would leak when the main function stopped using the pipe. We can avoid this problem by using the Context package. Here is an improved prime sieve implementation:

// Return a pipe that generates a sequence of natural numbers: 2, 3, 4...
func GenerateNatural(ctx context.Context) chan int {
	ch := make(chan int)
	go func(a) {
		for i := 2; ; i++ {
			select {
			case <- ctx.Done():
				return
			case ch <- i:
			}
		}
	}()
	return ch
}

// Pipe filter: remove numbers divisible by prime numbers
func PrimeFilter(ctx context.Context, in <-chan int, prime int) chan int {
	out := make(chan int)
	go func(a) {
		for {
			ifi := <-in; i%prime ! =0 {
				select {
				case <- ctx.Done():
					return
				case out <- i:
				}
			}
		}
	}()
	return out
}

func main(a) {
	// Use Context to control the background Goroutine state
	ctx, cancel := context.WithCancel(context.Background())

	ch := GenerateNatural(ctx) // Sequence of natural numbers: 2, 3, 4...
	for i := 0; i < 100; i++ {
		prime := <-ch // The new prime number
		fmt.Printf("%v: %v\n", i+1, prime)
		ch = PrimeFilter(ctx, ch, prime) // Filter constructed based on new prime numbers
	}

	cancel()
}
Copy the code

When main is done, the background Goroutine is notified to exit by calling cancel(), which prevents the Goroutine from leaking.

Concurrency is a very big topic, and we’re just showing a few very basic examples of concurrent programming. There are also a lot of discussions about concurrent programming in official documents, and there are also books about concurrent programming in Go language in China. Readers can consult relevant literature according to their own needs.