Preface – From Kevin Wan Junfeng

The average latency of our service is about 30ms, which is very large because we use MapReduce technology extensively, so that even though our service calls many services, most of the time it only depends on the duration of the slowest request.

For your existing services, without optimizing DB operations, without optimizing caching, without rewriting business logic, you can drastically reduce service response times by parallelizing orthogonal (unrelated) requests through MapReduce.

This article ouyang will give you a careful analysis of the implementation details of MapReduce.

Why MapReduce

In actual business scenarios, we often need to obtain corresponding attributes from different RPC services to assemble complex objects.

For example, to query product details:

  1. Goods and Services – Query the attributes of goods
  2. Inventory Service – Query inventory properties
  3. Price services – Query price properties
  4. Marketing Services – Query marketing properties

If it is a serial call, the response time will increase linearly with the number of RPC calls, so we will generally change the serial call to parallel to optimize the performance.

A simple scenario using waitGroup is fine, but what if we need to validate, process, transform, and summarize the data returned from RPC calls? Continuing to use waitGroup is a bit too much, as it’s not available in the official go library (CompleteFuture is available in Java), Go-zero author implements mapReduce concurrent tool class for in-process data batch processing based on mapReduce architecture idea.

Design ideas

Let’s try to put ourselves in the writer’s shoes and tease out the possible business scenarios for concurrency tools:

  1. Query product details: support concurrent calls to multiple services to combine product attributes, support call errors can be immediately ended.
  2. Product details page automatically recommend user card vouchers: support concurrent verification card vouchers, verification failure automatically eliminated, return all cards.

All of the above are actually processing the input data and finally output the cleaned data. There is a very classical asynchronous mode for data processing: producer consumer mode. So we can abstract the life cycle of data batch processing, which can be roughly divided into three phases:

  1. Data generation
  2. Data processing mapper
  3. Data aggregation reducer

Data production is an indispensable stage, data processing and data aggregation are optional stages, data production and processing support concurrent calls, data aggregation basically belongs to pure memory operation single coroutine.

Consider how data should flow between different stages. Since data processing at different stages is performed by different Goroutines, it is natural to consider using channels to communicate between goroutines.

How do you implement termination at any time?

Goroutine simply listens for a global terminating channel.

Go – Zero code implementation

core/mr/mapreduce.go

Detailed source code can be viewed at github.com/Ouyangan/go…

Pre-knowledge – Basic usage of channel

Because MapReduce source code uses a large number of channels for communication, the basic usage of channel is briefly mentioned:

  1. Close the channel after writing it
ch := make(chan interface{})
// 写入完毕需要主动关闭channel
defer func(a) {
    close(ch)
}()
go func(a) {
    // v,ok mode reads channel
    for {
        v, ok := <-ch
        if! ok {return
        }
        t.Log(v)
    }

    // for range mode reads channel, channel closes loop automatically exits
    for i := range ch {
        t.Log(i)
    }

    // Empty the channel, the channel close loop automatically exits
    for range ch {
    }
}()
for i := 0; i < 10; i++ {
    ch <- i
    time.Sleep(time.Second)
}
Copy the code
  1. Closed channels still support reading
  2. Restrict the read and write permissions of a channel
/ / read only channel
func readChan(rch <-chan interface{}) {
    for i := range rch {
        log.Println(i)
    }
}

/ / write channel only
func writeChan(wch chan<- interface{}) {
    wch <- 1
}
Copy the code

The interface definition

Let’s start with the three core function definitions:

  1. Data production
  2. The data processing
  3. Data aggregation
// Data production func
// source - Data is written to source after production
GenerateFunc func(source chan<- interface{})

// Data processing func
// item - Produced data
// writer.write () can be passed back to the reducer
// cancel - Terminates the process func
MapperFunc func(item interface{}, writer Writer, cancel func(error))

// Data aggregation func
// pipe - the processed data
// writerwriter - Call writer.write () to return aggregated data to the user
// cancel - Terminates the process func
ReducerFunc func(pipe <-chan interface{}, writer Writer, cancel func(error))
Copy the code

User-oriented method definition

The usage method can be viewed in the official document, which will not be described here

There are many user-oriented methods, which are mainly divided into two categories:

  1. There is no return
    1. If an error occurs, the execution will be terminated immediately
    2. Execution does not pay attention to errors
  2. Returns a value
    1. Manually write source, manually read aggregate data channel
    2. Manually write source and automatically read aggregate data channel
    3. External incoming source automatically reads aggregate data channel
// Execute func concurrently, and any errors will immediately terminate the process
func Finish(fns ...func(a) error) error

// Execute func concurrently, without terminating the process even if an error occurs
func FinishVoid(fns ...func(a))

// The user manually writes production data to the source and returns a channel for reading
Opts - This parameter is optional. Currently, it contains the number of coroutines in the data processing stage
func Map(generate GenerateFunc, mapper MapFunc, opts ... Option)

// No return value, no concern for errors
func MapVoid(generate GenerateFunc, mapper VoidMapFunc, opts ... Option)

// No return value, attention error
func MapReduceVoid(generate GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ... Option)

// Requires the user to manually write production data to the source and return the aggregated data
/ / the generate production
/ / mapper processing
/ / reducer for aggregation
Opts - This parameter is optional. Currently, it contains the number of coroutines in the data processing stage
func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc, opts ... Option) (interface{}, error)

// Support passing in data source channels and returning aggregated data
// source-data source channel
// mapper - reads the source content and processes it
// Reducer - Data is processed and sent to reducer aggregation
func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc, opts ... Option) (interface{}, error)
Copy the code

The core methods are MapReduceWithSource and Map, and the other methods call both internally. You can figure out the MapReduceWithSource method Map.

MapReduceWithSource source code implementation

It’s all in this picture

// Support passing in data source channels and returning aggregated data
// source-data source channel
// mapper - reads the source content and processes it
// Reducer - Data is processed and sent to reducer aggregation
func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc, opts ... Option) (interface{}, error) {
    // Optional parameter Settings
    options := buildOptions(opts...)
    // Aggregate data channel, which needs to be written to output manually by calling write method
    output := make(chan interface{})
    // Output is read only once
    defer func(a) {
        // If there are more than one write, it will block and cause coroutine leakage
        // Use for range to check whether the data can be read
        // Why do you use panic? A display that reminds the user of a usage error is better than an automatic fix
        for range output {
            panic("more than one element written in reducer")}} ()// Create buffered chan with capacity of workers
    // Indicates that workers can process data at most at the same time
    collector := make(chan interface{}, options.workers)
    // Data aggregation task completion flag
    done := syncx.NewDoneChan()
    // Support blocking writer writing to chan
    writer := newGuardedWriter(output, done.Done())
    // The singleton is closed
    var closeOnce sync.Once
    var retErr errorx.AtomicError
    // The data aggregation task has finished
    finish := func(a) {
        // Can only be closed once
        closeOnce.Do(func(a) {
            // Send a signal that the aggregation task is complete, and close will write a zero value to chan
            done.Close()
            // Disable data aggregation chan
            close(output)
        })
    }
    // Cancel the operation
    cancel := once(func(err error) {
        / / set the error
        iferr ! =nil {
            retErr.Set(err)
        } else {
            retErr.Set(ErrCancelWithNil)
        }
        // Clear the source channel
        drain(source)
        // Call the completion method
        finish()
    })

    go func(a) {
        defer func(a) {
            // Clear the aggregation task channel
            drain(collector)
            / / capture panic
            if r := recover(a); r ! =nil {
                // Call the cancel method, ending immediately
                cancel(fmt.Errorf("%v", r))
            } else {
                // Normal end
                finish()
            }
        }()
        // Perform data processing
        Writer. write writes the processed data to output
        reducer(collector, writer, cancel)
    }()
    // Execute data processing asynchronously
    // source - Data production
    // collector - Data collection
    // done - End flag
    // workers - Number of concurrent requests
    go executeMappers(func(item interface{}, w Writer) {
        mapper(item, w, cancel)
    }, source, collector, done.Done(), options.workers)
    // reducer writes data to output.
    // Read output when data is returned
    // If output is written more than twice
    // The data will still be read from the original defer func
    // It can be detected that the user has called the write method multiple times
    value, ok := <-output
    iferr := retErr.Load(); err ! =nil {
        return nil, err
    } else if ok {
        return value, nil
    } else {
        return nil, ErrReduceNoOutput
    }
}
Copy the code
// Data processing
func executeMappers(mapper MapFunc, input <-chan interface{}, collector chan<- interface{},
    done <-chan lang.PlaceholderType, workers int) {
    // Goroutine coordinates synchronization semaphores
    var wg sync.WaitGroup
    defer func(a) {
        // Wait for the data processing task to complete
        // The coroutine that prevents data processing exits directly before it has finished processing data
        wg.Wait()
        // Close the data processing channel
        close(collector)
    }()
    // Channel with buffer size of workers
    // Control the number of coroutines for data processing
    pool := make(chan lang.PlaceholderType, workers)
    // Data processing writer
    writer := newGuardedWriter(collector, done)
    for {
        select {
        // The external end signal is detected
        case <-done:
            return
        // Control the number of data processing coroutines
        // Buffer capacity -1
        // If there is no capacity, it will be blocked, waiting to release capacity
        case pool <- lang.Placeholder:
            // Block waiting for the production data channel
            item, ok := <-input
            // If OK is false, input is closed or cleared
            // Execute exit after data processing is complete
            if! ok {// Buffer capacity +1
                <-pool
                // End the loop
                return
            }
            Wargaming synchronizes semaphore +1
            wg.Add(1)
            // better to safely run caller defined method
            // Perform data processing asynchronously to prevent panic errors
            threading.GoSafe(func(a) {
                defer func(a) {
                    Wargaming synchronizes semaphore -1
                    wg.Done()
                    // Buffer capacity +1
                    <-pool
                }()

                mapper(item, writer)
            })
        }
    }
}
Copy the code

conclusion

I looked at the source code of mapReduce for about two nights. On the one hand, I am not very familiar with the GO language, especially the use of channel, so I have to stop frequently to look up relevant documents to understand the author’s writing method. On the other hand, it is really mind-numbling to realize cooperation between multiple Goroutines through channel communication (I admire the author’s thinking ability).

Second look at the source code for the first time will certainly be more meng, in fact, it does not matter to find the entrance of the program (common basic components are generally oriented method) first along the main line all the way to see each line of code to understand with annotations, and then look at the branch code.

If you don’t understand something, look it up in the code’s submission history. There’s a good chance it’s a bug fix, like the code below that I’ve read many times but can’t understand.

// Aggregate data channel, which needs to be written to output manually by calling write method
output := make(chan interface{})
// Output is read only once
defer func(a) {
    // If there are more than one write, it will block and cause coroutine leakage
    // Use for range to check whether the data can be read
    // Why do you use panic? A display that reminds the user of a usage error is better than an automatic fix
    for range output {
        panic("more than one element written in reducer")}} ()Copy the code

Finally draw the flow chart basically can read the source code, for me this method is more stupid but effective.

data

Go Channel: colobu.com/2016/04/14/…

Go-zero MapReduce Documentation: go-zero.dev/cn/mapreduc…

The project address

Github.com/zeromicro/g…

Welcome to Go-Zero and star support us!

Wechat communication group

Pay attention to the public account of “micro-service Practice” and click on the exchange group to obtain the QR code of the community group.