This is the 25th day of my participation in the August Genwen Challenge.More challenges in August

Go language concurrent programming

  • Communication Seuential Process (CSP) model is adopted
  • No locks required, no callback required
  • Parallel programming vs parallel computing

1.1 CSP concurrency model

The CSP model was proposed in the 1970s to describe the concurrency model in which two independent concurrent entities communicate through a shared communication channel(pipeline). A channel is the first type of object in a CSP, which focuses not on the entity that sends the message, but on the channel that is used to send the message.

1.2 Golang CSP

Golang uses some concepts of THE CSP model to realize concurrency for theoretical support. In fact, go language has not fully realized all theories of the CSP model, but only borrowed the two concepts of Process and channel. A Process is represented in the GO language as a Goroutine that is actually executed concurrently, with each entity sharing data through channel communication.

\

1.3 No locks, no callback required

Go uses the CSP model to communicate without locks. In fact, lock free means that users do not need to use locks when communicating with go language concurrently. Do not use a callback. However, the go layer actually uses a lock and a callback. \

2.1 Simulate server startup and print content to page

package main

import (
    "fmt"
    "net/http"
)

func main() {
    http.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
        fmt.Fprintf(writer, "<h1>Hello World %s</h1>", request.FormValue("name"))
    })

    http.ListenAndServe(":8888", nil)
}
Copy the code

One thing to note here is how to start a server

2.2 Main method communicates with Hello World method using channel

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan string)
    for i := 0; i< 5000; i++ {
        go hello(i, ch)
    }

    for {
        str := <-ch
        fmt.Println(str)

    }

    time.Sleep(time.Millisecond)
}

func hello(i int, ch chan string) {
    for {
        ch <- fmt.Sprintf("hello world, %d", i)
    }
}
Copy the code

2.3 Internal Sorting

func main() {

    arr := []int {2, 5, 1, 9, 23, 01}
    sort.Ints(arr)
    for i, v := range arr  {
        fmt.Printf("%d, %d\n", i, v)
    }
}
Copy the code

Internal sort uses built-in functions

 

2.4 GO implements external sorting pipeline

When we use external sort, we’re going to use merge sort, so what is merge sort?

Divide the data into two halves, merge and sort them separately, and then merge the two ordered data

 

The figure above illustrates a two-way merge in merge sort. Actually, merge sort can have three merge ways, N merge ways

If you have a large set, first divide it into two smaller sets and sort the two smaller sets internally using internal sort

You get two sorted sets, and then you merge sort

Step 1: Extract the first element of both sets. Compare, if the same, the left element —->1

Step 2: take out the first element of the two sets again, compare, small on the right –>1

Step 3: Take out the first element of the two sets again, compare, the left small –>2

.

So on

Next, our external sort is implemented by merging two ways.

 

Data sources, from multiple sources. For example, multiple HDFS in Hadoop

Data is then sent to node 1 for two-way merging. The result of merging is sent to node 2 for another two-way merging……. One analogy, all the way to the end with only one line of data, that’s what we want

 

 


3. Code implementation

We use cases to illustrate the use of pipes

 

3.1 A channel is the communication between goroutine and Goroutine

First, put the data into the pipe….. Here’s the question, why put data in pipes at all?

Suppose: each array is an object, a large object, processing links are long. At this point, place it in the pipe. It can be processed concurrently. The subsequent process is not affected. We’ll do what we have to do.

package pipeline func ArraySource(arr ... int) chan int { out := make(chan int) go func() { for a := range arr { out <- a } close(out) }() return out }Copy the code

This is the process of putting an array into a pipe. There are two

  • Create a channel. Then return the channel
  • Remember: A channel is a communication between goroutine and Goroutine. When we assign a channel, we do not assign it directly; we place it in a separate Goroutine.
  • I put the data into the pipe. There is no more data. Please don’t take any more data

So, a separate Goroutine is defined to store data into the pipe

 

So let’s write a demo and get the data

package main

import (
    "fmt"
    "test007/pipeline"
    "time"
)

func main() {
    sourceChan := pipeline.ArraySource(1, 4, 8, 2, 19, 5)

    for {
        if data, ok := <- sourceChan; ok {
            fmt.Println("data:", data)
        } else {
            break
        }
    }

    time.Sleep(time.Second * 10)
}
Copy the code

First, I put an array in the pipe.

Then, data is continuously fetched from the pipe. So there’s a for loop. Ok is used to determine if there is any more data to fetch and exit if there is none

Note: close is used to close the pipe, so we don’t keep fetching data.

There is another way, directly have the data out, we don’t have to judge manually

package main

import (
    "fmt"
    "test007/pipeline"
    "time"
)

func main() {
    sourceChan := pipeline.ArraySource(1, 4, 8, 2, 19, 5)
    for v := range sourceChan { 
            fmt.Println("data:", v)
    }

    time.Sleep(time.Second * 10)
}
Copy the code

If range is used to fetch data from a pipe,…. You must close manually; otherwise, the for loop will not know when to exit and a deadlock will occur.

Why deadlocks?

Because the for loop doesn’t know when to exit, it waits, and the code behind it can’t execute, so a deadlock occurs.

 

So: normally, our pipes are not closed manually. When there is data in the pipe, we take it out, and if there is no data in the pipe, we wait. How do I do this? That is, we don’t close manually, so how do we make Goroutine wait until data arrives? The receiver also uses a separate Goroutine to receive data separately. This will not block the main Goroutine.

package main

import (
    "fmt"
    "test007/pipeline"
    "time"
)

func main() {
    sourceChan := pipeline.ArraySource(1, 4, 8, 2, 19, 5)
   go func() { for v := range sourceChan {
            fmt.Println("data:", v)
        }
    }()
    time.Sleep(time.Second * 10)
}
Copy the code

Why does adding a separate Goroutine not cause a deadlock?

My guess: since a single Goroutine is sent to sourceChan to receive data,…. It won’t block the main thread and run down. A single Goroutine is: if you have data, process it, if you don’t, wait. Even if wait a year, two years…. It can all wait.

Key points to remember from the demo above:

  1. A channel is a communication between goroutine and Goroutine

  2. If you don’t want deadlocks to occur, putting and fetching data into and out of a channel should be done in a separate Goroutine. It is possible that these two Goroutines will never work, but live, and will not affect the main Goroutine


3.2 Pipes are directional

The input of a pipe, and the return value of a pipe, are directional.

If the ArraySource returns a < -chan int, then it returns a pipe from which data can be fetched. The variable that later receives the return value cannot put data into it

package pipeline import "time" func ArraySource(arr ... int) <- chan int { out := make(chan int) go func() { for a := range arr { out <- a } time.Sleep(time.Second * 2) out <- 9 //close(out) }() return out }Copy the code

Note that the return value type pipe here is a pipe that can fetch data.

It is not ok to put data in the pipe like the following

 

 


 

3.3. Learn to define a func that returns channel

Learn the teacher’s template for writing plumbing methods

Func sort (in <-chan int) <-chan int{// Goroutine go func() {}() {return this channel return out}Copy the code

Step 1: Define a pipe variable of type chan of int

Step 2: Return the pipe variable

Step 3: Put data into the pipe. What kind of data to put, that is the business logic.

Defined this way, deadlocks do not occur. Because it’s quick to open a Goroutine.


3.4 Defining sorting methods

Func InMemSort(in <-chan int) <-chan int {// Go func() {var arr []int For v := range in {arr = append(arr, v)} ftt. Println("arr: Println("arr: ",arr) for _, v := range arr {out < -v}}() //Copy the code

This sorting method uses the three steps that the teacher used to define a channel method

In step 3: Deal with the business. First, fetch the data from the input in pipe.

The data is fetched using a for loop. The loop fetches data from the pipe. There’s going to be a block. Until all data is fetched and in pipe close. Otherwise, an infinite loop waits.

 


3.5 Merge algorithm: merges data in two arrays

The three-step approach is still used,

Step 1: Specify a channel variable,

Step 2: Return the channel variable

Step 3: Define a Goroutine to put data into

Func Merge(in1, in2 <-chan int) <-chan int {make(chan int) V1, ok1 := < -in1 v2, ok2 := < -in2 // If you can, to retrieve data from an arbitrary tube handles for ok1 | | ok2 {if! Ok2 | | (ok1 && v1 < = {out < - v1 v1, v2) ok1 = < - in1} else {out < - v2, v2, ok2 = < - in2}} / / step 6: // Close (out)}() return out}Copy the code

Here the business logic is in Goroutine. Because it is a pipe, one data is fetched from the pipe each time, and the data is recycled from the pipe for comparison.

Next, we build two arrays to test merging the two arrays

package main

import (
"fmt"
"test007/pipeline"
"time"
)

func main() {
    mergeChan : = pipeline.Merge(
        pipeline.InMemSort(pipeline.ArraySource(1, 4, 8, 2, 19, 5)),
        pipeline.InMemSort(pipeline.ArraySource(0, 29, 43, 1, 7, 9 ))) 

    go func() {
        for v := range mergeChan {
            fmt.Println("data:", v)
        }
    }()
    time.Sleep(time.Second * 10)
}
Copy the code

Output result:

GOROOT=/usr/local/go #gosetup
GOPATH=/Users/luoxiaoli/go #gosetup
/usr/local/go/bin/go build -o /private/var/folders/g2/74np978j3971l2864zdk7lgc0000gn/T/___go_build_main_go_darwin /Users/luoxiaoli/test007/pipeline/pipelinedemo/main.go #gosetup
/private/var/folders/g2/74np978j3971l2864zdk7lgc0000gn/T/___go_build_main_go_darwin #gosetup
arr:  [1 4 8 2 19 5]
arr:  [0 29 43 1 7 9]
arr:  [1 2 4 5 8 19]
arr:  [0 1 7 9 29 43]
data: 0
data: 1
data: 1
data: 2
data: 4
data: 5
data: 7
data: 8
data: 9
data: 19
data: 29
data: 43

Process finished with exit code 0
Copy the code

Here’s a problem:

 

The data in in1 is finished, and the following code is not executed. There is no deadlock, but the output of subsequent data is prevented.

Here’s why:

 

So when you’re doing internal sort, when you’re done sorting, you don’t have a close. And that leads to a range pipe, if you don’t have any data, you’re waiting, and if you don’t have any data, you’re stuck.


3.6 Changing the data source to read from a file

Previously, our data source was a self-defined array ArraySource. We pass in an array, and we pipe the array for processing. The following situation:

func ArraySource(arr ... int) <- chan int { out := make(chan int) go func() { for _, a := range arr { out <- a } close(out) }() return out }Copy the code

The truth is, we usually read data sources from files, such as log files. Also, there may be more than one data source. For example, there are n HDFS data sources, and we need to read n HDFS data sources.

Next, we read the data source from the file. Here we define a generic way to read. Regardless of the data source, we use Reader to read. The read content is put into the pipe.

ReaderSource(reader IO.Reader) < -chan int{out := make(chan int) go func() {// Buffer := make([]byte, 8) for {// Reader returns two arguments, the first is the number of bytes read, the second is the err exception n, Err := read. Read(buffer) if n > 0 { U := binary.bigendian. Uint64(buffer) out < -int (u)} if err! = nil { break } } close(out) }() return out }Copy the code

 

And then we have the data source that we read, and then finally we have the Sink that doesn’t come out, the Sink reads the data from the pipe, and the data that we read, output

/** * Read only data, do not write data, read data to print * can be printed to the console, can also write to a file. */ func WriteSink(writer IO.Writer, in <-chan int) {for v := range in {b := make([]byte, 8) binary.BigEndian.PutUint64(b, uint64(v)) writer.Write(b) } }Copy the code

Finally, for easy operation, we define a random number generation code. Write random numbers to files

/** * generate a random number */ func RandomSource(count int) chan int {make(chan int) go func() {// generate a random number for I := 0; i<count ; i++ { out <- rand.Int() } close(out) }() return out }Copy the code

The following is an integration of the above operations. The integration code is as follows:

Const fileName = "small. In "const count = 100 // // Create the first data source file, e := os.create (fileName) if e! = nil { panic(e) } defer file.Close() dataSource := pipeline.RandomSource(count) writer := bufio.NewWriter(file) Pipeline.WriteSink(writer. dataSource) writer.Flush() // F, e := os.open (fileName) if e! = nil { panic(e) } defer f.Close() readerSource := pipeline.ReaderSource(bufio.NewReader(f)) var num = 0 for rs := range  readerSource{ fmt.Println(rs) num ++ if num > 100 { break } } }Copy the code

Finally, generating random numbers is a process of generating data. We can use this main method, and we have two types of data, one is small data and one is big data.

In fact, this part is to practice channel. Go language how to communicate through channel. Each of these parts, they’re channels


 

3.7 Merging multiple routes

Next, we implement multiplexing, which is actually a combination of multiple data sources that are passed down through the ages, but are eventually merged in pairs.

/** * N inputs MergeN(inputs... <-chan int) <-chan int{if len(inputs) == 1 {return inputs[0]} middle := len(inputs) / 2 / Merge(MergeN(inputs[:middle]...) , MergeN(inputs[middle:]...) )}Copy the code

The input is divided into two halves, and then the two halves are recursively removed until the two channels are finally merged.

 


 

3.8 External Single-node Sorting

External sorting of single machine is divided into three parts:

  1. Read the data and merge the data

  2. Write data to a file

  3. Read out the data written to the file

 

Let’s start with the first part: reading data from a file, combining data sources in pairs, and finally returning to the merged data channel

/** * @param fileSize: fileSize * @param chunkCount: Func createPipeline(fileName string, fileSize, ChunkCount int) <-chan int{pipeline.init () // Bytes of each read chunkSize := fileSize/chunkCount sortResult := []<-chan int{} for i := 0; i < chunkCount; i++ { file, e := os.Open(fileName) if e ! = nil {panic(e)} // offset: where to start from whence: File. Seek(int64(chunkSize* I), 0) // read the contents of the file source := pipeline.readerSource (bufio.newreader (file), ChunkSize) // Sort the contents in memory sortResult = append(sortResult, Pipeline.inmemsort (source))} return pipeline.mergen (sortResult...) }Copy the code

 

Part two: Writing data to a file

*/ func writeToFile(p < -chan int, fileName string) {file, e := os.create (fileName) if e! = nil {panic(e)} defer file.close () writer := bufio.newwriter (file) defer writer.flush () // This is the step to write the read to the file pipeline.WriteSink(writer, p) }Copy the code

 

Part three: Printing the data from the file to the console

func printFile(fileName string) { file, e := os.Open(fileName) if e ! = nil { panic(e) } defer file.Close() reader := bufio.NewReader(file) source := pipeline.ReaderSource(reader, -1) count := 0 for v := range source { fmt.Println(v) count ++ if count > 100 { break } } }Copy the code

Here, we generated a 512K file on top, so the final merged data should be 512K as well

Big data is 8000000 bytes, so the final merge should be 8000000 bytes.

Phase summary:

So with the demo above, you can see that all of the ways, all of the ways, are through pipes. Pass a pipe into the past, there may be no data in the pipe now, so wait until there is data, then pull out

The demo above consists of two parts:

Part ONE: Data generation. Generate data randomly and save it to a file

The second part: read the data in the file in fragments, divided into 4 pieces. Each piece of data is sorted internally. After sorting, n pieces of data are merged and sorted in pairs. Finally, one piece of data is output. The data is then output to a file

 

As the above analysis:

As you can see, it’s almost all piped communication, so when you read data, it’s not like you put in a piece of data, and then at the end, you put out a piece of data, and there’s a wait in between.

As long as there is a wait, a deadlock can occur, so be sure to call close. In this way, the fetch party will not keep waiting.

This is the way the pipes are built

 

The other part is the buffered pipe. It turns out that if the pipe doesn’t have a buffer, it just, you know, waits. I put one in, someone takes it away, I put another one in, go away and go away again, I put another one in, until the other side says, I’m done. And then, the fetch is over, it’s not over

This is a bit inefficient 1 to 1, so we will add a buffer to the pipe, for example, 1000 data buffer, that is, 1000 data can be placed in the pipe, so that the efficiency is greatly improved

Func InMemSort(in <-chan int) <-chan int {make(chan int, 1000) Go func() {var arr []int For v := range in {arr = append(arr, v)} fmt.Println("read data: ", time.Now().Sub(startTime)) sort.Ints(arr) fmt.Println("sorted data: ", time.now ().sub (startTime)) for _, v := range arr {out < -v} close(out)}() Func Merge(in1, in2 <-chan int) <-chan int {// Out := make(chan int, 1000) // A data from two pipes v1, ok1: = < - in1 v2, ok2: = < - in2 / / step 5: if you can, to retrieve data from an arbitrary tube handles for ok1 | | ok2 {if! Ok2 | | (ok1 && v1 < = {out < - v1 v1, v2) ok1 = < - in1} else {out < - v2, v2, ok2 = < - in2}} / / step 6: Merged close(out) FMT.Println("merged data: ", time.now ().sub (startTime))}() // next step: step 1 Merged data: */ func ReaderSource(reader IO.Reader, TrunkSize int) < -chan int{out := make(chan int, 1000) go func() { Buffer := make([]byte, 8) readered := 0 for {// Record the number of bytes read // Reader returns two parameters, the first is the number of bytes read, the second is an err exception n, Err := read. Read(buffer) readered += n if n > 0 { U := binary.bigendian. Uint64(buffer) out < -int (u)} if err! = nil || (trunkSize ! Println(" Read characters ", readered)} close(out)}() return out} = 1 && Readered >= trunkSize){break} // ftt.println (" read characters ", readered)} close(out)}() return out}Copy the code

The part marked red increases the pipeline buffer and improves the efficiency of pipeline processing

3.9 Sorting External Network Versions

According to the sorting result above, we can see that the sorting time of an 800M file is about 40-50 seconds. That’s not really fast, or it would be faster if you didn’t have pipes. Pipes make it slower, so why do we use pipes?

First of all, why does it slow down? For communication between pipes, there is a waiting process. It’s definitely slower than direct processing.

Second: although using pipes is slow, we still use them. Why? So this is 4 way parallel processing. It’s 800 megabytes, so what if it’s 8 gigabytes? 800 g? Can we do it on a single thread? Obviously not. You have to do it in parallel.


Typically, server logs are kept on different machines, and certain machines receive log files. It is then transmitted to other machines for data processing. After the data is processed, it is sent to other machines for data merging and finally stored in the library. Each of these steps may occur on a different machine. So let’s actually simulate how this data is transferred between servers.

Here’s what we’re going to do

 

Merge InMemSort in memory and ReaderSource to read the data separately. Execute on both servers.

Principle: Merge nodes to merge servers. Merge nodes to merge servers.


Now there are two things to do

  1. The data read from the file is put into the server and sent over the network to the client connected to the client

  2. The client merges data and outputs the data to a file

Extract the first part: read the files from the data source and send them to the server

Func NetWorkSink(addr string, in < -chan int) {// Start the listener on the server, e := net.listen (" TCP ", addr) if e! = nil {panic(e)} go func() {defer listener.close () // = nil {panic(e)} defer conn.close () Writer := bufio.newwriter (conn) defer writer.Flush() WriteSink(writer, in)}()}Copy the code

Sends the data to the connected client

 

The second part: the client receives the data, reads the data, and sends it to the channel

func NetWorkSource(addr string) <-chan int{ out := make(chan int) go func() { conn, e := net.Dial("tcp", addr) if e ! = nil { panic(e) } defer conn.Close() reader := bufio.NewReader(conn) source := ReaderSource(reader, -1) for s := range source { out <- s } close(out) }() return out }Copy the code

Next create a network pipeline

/** * @param fileSize: fileSize * @param chunkCount: */ func createNetWorkPipeline(fileName string, fileSize, ChunkCount int) <-chan int{pipeline.init () // Bytes of each read chunkSize := fileSize/chunkCount sortResult := []<-chan int{} sortAddr := []string{} for i := 0; i < chunkCount; i++ { file, e := os.Open(fileName) if e ! = nil {panic(e)} // offset: where to start from whence: File. Seek(int64(chunkSize* I), 0) // read the contents of the file source := pipeline.readerSource (bufio.newreader (file), chunkSize) sort := pipeline.InMemSort(source) addr := ":" + strconv.Itoa(7000 + i) pipeline.NetWorkSink(addr, SortResult = appEnd (sortResult, pipeline.inmemsort (source)) sortAddr = appEnd (sortAddr, addr) } for _, s := range sortAddr { sortResult = append(sortResult, Pipeline.Net WorkSource(s))} return pipeline.mergen (sortResult...) }Copy the code

Final test: the online version of the file receiving communication,

P := createNetWorkPipeline("large. In ", 800000000, 4) // time.sleep (time.hour) // WriteToFile (p, "large.out") // Step 3: Print printFile("large.out")}Copy the code

Running results:

 

Conclusion:

What is it that the web version is doing? In the simulation of real use scenarios.

 

 

Summary: Once again experienced how the entire GO communicates using Chan. Almost every movie uses Chan to communicate.

The last version of the online version is just a simple simulation, but it’s probably true. Modify it to work on both client and server sides.

 


 

3.10 the attached source

The project structure

 

1.  nodes.go

package pipeline import ( "encoding/binary" "fmt" "io" "math/rand" "sort" "time" ) var startTime time.Time func Init() {  startTime = time.Now() } func ArraySource(arr ... int) <- chan int { out := make(chan int, 1000) go func() { for _, A := range arr {out < -a} close(out)}() return out} Func InMemSort(in <-chan int) <-chan int {make(chan int, 1000) Go func() {var arr []int For v := range in {arr = append(arr, v)} fmt.Println("read data: ", time.Now().Sub(startTime)) sort.Ints(arr) fmt.Println("sorted data: ", time.now ().sub (startTime)) for _, v := range arr {out < -v} close(out)}() Func Merge(in1, in2 <-chan int) <-chan int {// Out := make(chan int, 1000) // A data from two pipes v1, ok1: = < - in1 v2, ok2: = < - in2 / / step 5: if you can, to retrieve data from an arbitrary tube handles for ok1 | | ok2 {if! Ok2 | | (ok1 && v1 < = {out < - v1 v1, v2) ok1 = < - in1} else {out < - v2, v2, ok2 = < - in2}} / / step 6: Merged close(out) FMT.Println("merged data: ", time.now ().sub (startTime))}() // next step: step 1 Merged data: */ func ReaderSource(reader IO.Reader, TrunkSize int) < -chan int{out := make(chan int, 1000) go func() { Buffer := make([]byte, 8) readered := 0 for {// Record the number of bytes read // Reader returns two parameters, the first is the number of bytes read, the second is an err exception n, Err := read. Read(buffer) readered += n if n > 0 { U := binary.bigendian. Uint64(buffer) out < -int (u)} if err! = nil || (trunkSize ! Println(" Number of characters read ", readered)} close(out)}() return out} /** * Read-only data, If no data is written, the read data is printed * either to the console or to a file. */ func WriteSink(writer IO.Writer, in <-chan int) {for v := range in {b := make([]byte, 8) binary.BigEndian.PutUint64(b, Write(b)}} /** * generate a random number */ func RandomSource(count int) chan int {out := make(chan int, 100) go func() {// Generate count for a random number for I := 0; i<count ; I++ {out < -rand.Int()} close(out)}() return out} /** * N MergeN(inputs... <-chan int) <-chan int{if len(inputs) == 1 {return inputs[0]} middle := len(inputs) / 2 / Merge(MergeN(inputs[:middle]...) , MergeN(inputs[middle:]...) )}Copy the code

 **2. main.**go

Package main import ("bufio" "FMT" "OS" "test007/pipeline" "time") func main() { Const fileName = "large. In "const count = 100000000 // // Create the first data source file, e := os.create (fileName) if e! = nil { panic(e) } defer file.Close() dataSource := pipeline.RandomSource(count) writer := bufio.NewWriter(file) Pipeline.WriteSink(writer. dataSource) writer.Flush() // F, e := os.open (fileName) if e! = nil { panic(e) } defer f.Close() readerSource := pipeline.ReaderSource(bufio.NewReader(f), -1) var num = 0 for rs := range readerSource{ fmt.Println(rs) num ++ if num > 100 { break } } } func MergeDemo() { mergeChan := pipeline.Merge( pipeline.InMemSort(pipeline.ArraySource(1, 4, 8, 2, 19, 5)), pipeline.InMemSort(pipeline.ArraySource(0, 29, 43, 1, 7, 9))) go func() { for v := range mergeChan { fmt.Println("data:", v) } }() time.Sleep(time.Second * 10) }Copy the code

Step 3: net_nodes.go\

Package pipeline import ("bufio" "net") func NetWorkSink(addr string, in < -chan int) { Enable the server listener, e := net.Listen(" TCP ", addr) if e! = nil {panic(e)} go func() {defer listener.close () // = nil {panic(e)} defer conn.close () Writer := bufio.newwriter (conn) defer writer.flush () WriteSink(writer, in) }() } func NetWorkSource(addr string) <-chan int{ out := make(chan int) go func() { conn, e := net.Dial("tcp", addr) if e ! = nil { panic(e) } defer conn.Close() reader := bufio.NewReader(conn) source := ReaderSource(reader, -1) for s := range source { out <- s } close(out) }() return out }Copy the code

4.  sort.go

Package main import ("bufio" "FMT" "OS" "strconv" "test007/pipeline") P := createNetWorkPipeline("large. In ", 800000000, 4) // time.sleep (time.hour) // WriteToFile (p, "large. Out ") // Step 3: PrintFile ("large. Out ")} func printFile(fileName string) {file, e := os.open (fileName) if e! = nil { panic(e) } defer file.Close() reader := bufio.NewReader(file) source := pipeline.ReaderSource(reader, -1) count := 0 for v := range source {fmt.println (v) count ++ if count > 100 {break}}} /** * Write the merge result to the file */ func writeToFile(p <- chan int, fileName string) { file, e := os.Create(fileName) if e ! = nil {panic(e)} defer file.close () writer := bufio.newwriter (file) defer writer.flush () // This is the step to write the read to the file Pipeline.WriteSink(writer, p)} /** * @param fileName: fileName * @param fileSize: fileSize * @param chunkCount: Func createPipeline(fileName string, fileSize, ChunkCount int) <-chan int{pipeline.init () // Bytes of each read chunkSize := fileSize/chunkCount sortResult := []<-chan int{} for i := 0; i < chunkCount; i++ { file, e := os.Open(fileName) if e ! = nil {panic(e)} // offset: where to start from whence: File. Seek(int64(chunkSize* I), 0) // read the contents of the file source := pipeline.readerSource (bufio.newreader (file), ChunkSize) // Sort the contents in memory sortResult = append(sortResult, Pipeline.inmemsort (source))} return pipeline.mergen (sortResult...) } /** * @param fileName: fileName * @param fileSize: fileSize * @param chunkCount: */ func createNetWorkPipeline(fileName string, fileSize, ChunkCount int) <-chan int{pipeline.init () // Bytes of each read chunkSize := fileSize/chunkCount sortResult := []<-chan int{} sortAddr := []string{} for i := 0; i < chunkCount; i++ { file, e := os.Open(fileName) if e ! = nil {panic(e)} // offset: where to start from whence: File. Seek(int64(chunkSize* I), 0) // read the contents of the file source := pipeline.readerSource (bufio.newreader (file), chunkSize) sort := pipeline.InMemSort(source) addr := ":" + strconv.Itoa(7000 + i) pipeline.NetWorkSink(addr, SortResult = appEnd (sortResult, pipeline.inmemsort (source)) sortAddr = appEnd (sortAddr, addr) } for _, s := range sortAddr { sortResult = append(sortResult, Pipeline.Net WorkSource(s))} return pipeline.mergen (sortResult...) }Copy the code

 

References:

1. www.jianshu.com/p/36e246c61…