MapReduce is a software architecture proposed by Google for parallel computing of large data sets (larger than 1TB). In short, it is to divide the task into small tasks and then execute them one by one. This is just like our teachers often taught us when we were children, that big things are small, small things are small. (Suddenly I felt that the teacher was so concise and comprehensive!!) The idea is such an idea, so in accordance with this idea in the modern software defined everything in the world, how we use such a way to solve the processing of massive data, this article will tell you a simple implementation of such a Go language.

Get on the

A few concepts:

The concepts “Map” and “Reduce,” and their main ideas, are borrowed from functional programming languages, as well as features borrowed from vector programming languages. Current software implementations specify a Map function that maps a set of key-value pairs to a new set of key-value pairs, and a concurrent Reduce function that ensures that each of all mapped key-value pairs shares the same key set.

Let’s start with a simple example:

Word frequency statistics (WorldCount). On the top of the realistic requirement, we may have such a requirement, which is to calculate the number of each word in an article. When it comes to life, even if the result of Top N is held, for example, the whole school will hold a commendation conference, there are many examples of Top N such as finding 10 good students, and the World Count is one of his implementation, but the final result is only the first result.

Given the above needs of finding 10 good students, let’s think about how to realize it. Obviously, this need may be raised by the principal at the meeting, so the specific realization is whether each grade leader should find out the top 10 students in each grade, and then the leader of the grade leader, The top 10 students are grouped together. What about each grade? Similarly, find the top 10 students in each class and add them to the grade department.

Start the

Now that we have the basic overview and ideas, we are ready to build the MapReduce framework. The first idea is to divide tasks into appropriate sizes, calculate them, and then combine the results of each step. So these two processes are defined as Map and Reduce processes respectively.

Again, take the World Count example:

The Map process reads a given file and initializes the frequency of each word in the file to 1.

The process of Reduce is to accumulate the same words and data. So, the purpose of our MapReduce framework is to invoke the process of invoking this Map and Reduce when appropriate. In common_map.go, the doMap method is given a file, reads the data and calls the Map process. The code is annotated.

  1. Read files;
  2. Will read the contents of the file, call the user Map function, produce the value of the KeyValue;
  3. Finally, partition according to the Key in the KeyValue and write the content into the file for subsequent Reduce process execution.
func doMap( jobName string, // // the name of the MapReduce job mapTaskNumber int, // which map task this is inFile string, nReduce int, // the number of reduce task that will be run mapF func(file string, contents string) []KeyValue, ) { //setp 1 read file contents, err := ioutil.ReadFile(inFile) if err ! = nil { log.Fatal("do map error for inFile ",err) } //setp 2 call user user-map method ,to get kv kvResult := mapF(inFile, string(contents)) /** * setp 3 use key of kv generator nReduce file ,partition * a. create tmpFiles * b. create encoder for tmpFile to write contents * c. partition by key, then write tmpFile */ var tmpFiles [] *os.File = make([] *os.File, nReduce) var encoders [] *json.Encoder = make([] *json.Encoder, nReduce) for i := 0; i < nReduce; i++ { tmpFileName := reduceName(jobName,mapTaskNumber,i) tmpFiles[i],err = os.Create(tmpFileName) if err! =nil { log.Fatal(err) } defer tmpFiles[i].Close() encoders[i] = json.NewEncoder(tmpFiles[i]) if err! =nil { log.Fatal(err) } } for _ , kv := range kvResult { hashKey := int(ihash(kv.Key)) % nReduce err := encoders[hashKey].Encode(&kv) if err! =nil { log.Fatal("do map encoders ",err) } } }Copy the code

DoReduce function in common_reduce.go, the main steps:

  1. Intermediate files generated during doMap reading;
  2. Sort lexicographically by reading keys from the same file;
  3. The KeyValue read is iterated, and the user’s Reduce method is called to continue to write the calculated results into the file;
func doReduce( jobName string, // the name of the whole MapReduce job reduceTaskNumber int, // which reduce task this is nMap int, // the number of map tasks that were run ("M" in the paper) reduceF func(key string, values []string) string, ) { // file.Close() //setp 1,read map generator file ,same key merge put map[string][]string kvs := make(map[string][]string) for i := 0; i < nMap; i++ { fileName := reduceName(jobName, i, reduceTaskNumber) file, err := os.Open(fileName) if err ! = nil { log.Fatal("doReduce1: ", err) } dec := json.NewDecoder(file) for { var kv KeyValue err = dec.Decode(&kv) if err ! = nil { break } _, ok := kvs[kv.Key] if ! ok { kvs[kv.Key] = []string{} } kvs[kv.Key] = append(kvs[kv.Key], kv.Value) } file.Close() } var keys []string for k := range kvs { keys = append(keys, k) } //setp 2 sort by keys sort.Strings(keys) //setp 3 create result file p := mergeName(jobName, reduceTaskNumber) file, err := os.Create(p) if err ! = nil { log.Fatal("doReduce2: ceate ", err) } enc := json.NewEncoder(file) //setp 4 call user reduce each key of kvs for _, k := range keys { res := reduceF(k, kvs[k]) enc.Encode(KeyValue{k, res}) } file.Close() }Copy the code

The Merge process

Of course, the last step is to Merge the results generated by each Reduce. During the Merge process, the results are also sorted in dictionary order by Key, and then written into the final file. The code is similar to Reduce, so I won’t bore you here.

Go multithreading is used to implement distributed task execution. Here are the schedule methods in Schedule. go, which are mainly steps:

  1. Obtain how many Maps (Reduce) need to be executed through different stages (Map or Reduce), and then call the remote DoTask method in worker.go.
  2. Wait for all tasks to be completed before finishing. The main features of the GO language are Go RPC Documentation and Concurrency in Go.
func (mr *Master) schedule(phase jobPhase) { var ntasks int var nios int // number of inputs (for reduce) or outputs (for map) switch phase { case mapPhase: ntasks = len(mr.files) nios = mr.nReduce case reducePhase: ntasks = mr.nReduce nios = len(mr.files) } fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, nios) //use go routing,worker rpc executor task, done := make(chan bool) for i := 0; i < ntasks; i++ { go func(number int) { args := DoTaskArgs{mr.jobName, mr.files[ntasks], phase, number, nios} var worker string reply := new(struct{}) ok := false for ok ! = true { worker = <- mr.registerChannel ok = call(worker, "Worker.DoTask", args, reply) } done <- true mr.registerChannel <- worker }(i) } //wait for all task is complate for i := 0; i< ntasks; i++ { <- done } fmt.Printf("Schedule: %v phase done\n", phase) }Copy the code

At the station in a minute

  • Run tests:

  • Test results:

  • Test inverted index results:

Source repository address

Github.com/happyer/dis…