You’ve already learned about channel, synchronization primitives for locking shared resources, and Context tracking coroutines/arguments. These are the basic elements of concurrent programming. Today we’ll show you how to use these basic elements to create a concurrency pattern for better writing concurrent programs.
For select infinite loop mode
This pattern is a common one, used in the examples in the previous article, and is usually combined with a channel to accomplish a task. The format is:
For {//for infinite loop, or use for range loop select {// control case <-done: return default: // through channel}}
- This is the concurrent mode of for + select multiplexing, in which the corresponding branch is executed for any case that meets the conditions, and the loop is not exited until an exit condition is met.
- If no exit condition is met, the default branch is always executed
For range select finite loop mode
for _,s:=range []int{}{
select {
case <-done:
return
case resultCh <- s:
}
- Typically, the content of an iteration is sent to a channel
- The Done Channel is used to exit the for loop
- The Resultch Channel is used to receive the values of the loop, which can be passed to other callers via the Resultch
Select a timeout mode
If a request needs to access the server for data, but the response may not be received due to network problems, then a timeout should be set:
package main import ( "fmt" "time" ) func main() { result := make(chan string) timeout := time.After(3 * time.Second) // }() for {select (case v := <-result: result);}() for {select v := <-result: result; FMT.Println(v) case <-timeout: FMT.Println(" Network access timed out ") return default: FMT.Println(" Network access timed out ") ) time.Sleep(1 * time.Second) } } }
Running results:
Wait for... Wait for... Wait for... Network access timed out
- The core of the SELECT TIMEOUT mode is the timeout set by the time.After function, which prevents the SELECT statement from waiting indefinitely because of an exception
Note: don’t write it like this
for { select { case v := <-result: fmt.Println(v) case <-time.After(3 * time.Second): Println(" network access timed out ") return default: FMT.Println(" wait for...") ) time.Sleep(1 * time.Second) } }
Case < -time. After(time.Second) case < -time. After(time.Second) case < -time. After(time.Second) case < -time. After(time.Second) case < -time. After(time.Second) case < -time. After(time.Second) case < -time. After(time.Second) case < -time. After(time.Second) case < -time. After(time.Second)
The withTimeout function of the Context timed out
Package main import ("context" "FMT" "time") func main() {// create a child node context, timeout automatically after 3 seconds // CTX, stop := context.WithCancel(context.Background()) ctx, stop := context.WithTimeout(context.Background(), 3* time.second) go func() {worker(CTX, "worker 1")}() go func() {worker(CTX," worker 1"); }() time.sleep (5* time.second)}() time.sleep (5* time.second)} ) } func worker(ctx context.Context, name string){ for { select { case <- ctx.Done(): Return default: FMT.Println(name, "Please do not disturb...") return default: FMT.Println(name," Please do not disturb...") ) } time.Sleep(1 * time.Second) } }
Running results:
Worker 2 seriously touch the fish, do not disturb... Worker 1 Carefully touch the fish, do not disturb... Worker 1 Carefully touch the fish, do not disturb... Worker 2 seriously touch the fish, do not disturb... Worker 2 seriously touch the fish, do not disturb... Worker 1 Carefully touch the fish, do not disturb... Off duty ~~~ Off duty ~~~ // Two seconds later??
- In the example above, we used the withTimeout function to cancel the timeout, which is the preferred way to use it
Pipeline mode
The PIPELINE pattern is also known as the PIPELINE pattern, which simulates real-world Pipeline generation. Let’s take the example of assembling a mobile phone. Suppose there are only three processes: procurement of parts, assembly, and packaging of the finished product:
Parts procurement (Step 1) – Assembly (Step 2) – Packing (Step 3)
Package main import (" FMT ") func main() {coms := buy(10) // Phone := build(coms) // Packs := build(coms) // Packs := Pack (Phones) // Pack (Phones) to sell // Output tests, For P := Range Packs {fmt.println (p)}} // Step 1 Buy Func Buy (n int) < -Chan String {out := make(Chan String) go func() { defer close(out) for i := 1; i <= n; I++ {out < -fmt.Sprint(" parts ", I)}}() return out} // make(chan string) go func() {defer := make(chan string) go func() {defer Close (out) for c := range in {out <- "assemble (" + c + ")"}}() return out String {out := make(chan string) go func() {defer close(out) for c := range in {out <- "pack (" + c + ")"}}() return out }
Running results:
Packaging (assembly (part 1)), packaging, assembly (part 2) packaging (assembly parts (3)) packing (assembly (4) parts) packaging (assembly (5) parts), packaging, assembly parts (6)) packaging (assembly (7) parts) packing (8) (parts) assembly packaging (assembly (9) parts), packaging, assembly parts (10))
Fan in Fan out mode
After the mobile phone assembly line was running, it was found that the assembly process of accessories was time-consuming, which led to the correspondingly slow down of Procedure 1 and 3. In order to improve the performance, two shifts of manpower were added in Procedure 2:
- As can be seen from the schematic diagram, the red part is fan-out and the blue part is fan-in
Improved assembly line:
Package main import (" FMT ""sync") func main() {coms := buy(10) // Phones1 := build(coms) Phones3 := build(coms) Phones3 := build(coms) // Pool three channels into one phone := merge(Phones1, Phones2, Phones3) packs := Pack (Phones) // Pack (Phones) to sell // Output tests, For P := Range Packs {fmt.println (p)}} // Step 1 Buy Func Buy (n int) < -Chan String {out := make(Chan String) go func() { defer close(out) for i := 1; i <= n; I++ {out < -fmt.Sprint(" parts ", I)}}() return out} // make(chan string) go func() {defer := make(chan string) go func() {defer Close (out) for c := range in {out <- "assemble (" + c + ")"}}() return out String {out := make(chan string) go func() {defer close(out) for c := range in {out <- "pack (" + c + ")"}}() return Out} // Fan in function (component) that sends data from multiple Chanels to a channel func merge(ins... <-chan string) <-chan string {var wg sync.waitGroup out := make(chan string) // Send data from a channel to out p:=func(in <-chan) String) {defer wg.done () for c := range in {out < -c}} wg.add (len(ins)) // fan in, defer wg.done () for c := range in {out < -c}} wg.add (len(ins)) // fan in, For _,cs:=range ins{go p(cs)} // wait for all incoming ins to be processed, Out go func() {wg.wait () close(out)}() return out}
Running results:
Packaging (assembly (part 2) packing (assembly parts (3)) packaging (assembly (part 1)), packaging, assembly parts (5)) packaging (assembly (7) parts) packing (assembly (4) parts) packaging (assembly (part 6) packing (8) (parts) assembly packaging (assembly (9) parts), packaging, assembly parts (10))
- Merge is business agnostic and should not be treated as a process; instead, we should call it a component
- Components can be reused, and Fan In processes like this can use the Merge component
Futures model
In the Pipeline model, the processes are interdependent and cannot begin until the previous process is completed. However, some tasks do not need to depend on each other, so these independent tasks can be executed concurrently to improve performance.
Futures mode can be understood as future mode. Instead of waiting for the result of subcoroutine, the main coroutine can do other things first and fetch the result when the subcoroutine is needed in the future. If the subcoroutine has not returned the result, it will just wait.
Let’s take hot pot as an example. There is no dependence between the two steps of washing vegetables and boiling water. They can be done at the same time
Example:
Package main import (" FMT ""time") func main() {vegetablesCh := washVegetables() // Waterch := boilWater() // boilWater Println(" I'm going to start a game ") time.Sleep(2 * time.Second) FMT.Println(" I'm going to start a game ") See if the vegetables and water are ready ") Vegetablesch := < -vegetablesch water := < -waterch FMT. } // WashWashVegetables () <-chan string {-chan string = make(chan string) go func() { Time.sleep (5 * time.second) cooking <- "cooking"}() return cooking} func boilWater() <-chan string {water := Make (chan string) go func() {time.sleep (5 * time.second) water <- "water"}() return water}
Running results:
Have arranged to wash dishes and boil water, I first open a bureau to do hot pot, look at the food and water are ready, can do hot pot: wash the dishes boil water
- The biggest difference between a coroutine in Futures mode and a normal coroutine is that it can return a result that will be used at some point in the future. So an operation that retrieves the result in the future must be a blocking operation that waits until the result is obtained.
- Futures mode can be used if your large tasks can be broken down into smaller tasks that run independently and concurrently, and the results of those smaller tasks can be used to generate the final result of the large tasks.