The second paper from the Google troika was first published at OSDI (the top operating system conference in the world) in 2004. It is also the most important data processing scheme of big data.

MapReduce: Simplified Data Processing on Large Clusters

Jeffrey Dean and Sanjay Ghemawat

Abstract:

MapReduce is technically a programming model, or an implementation for processing and producing large data sets. The user processes a K/V key-value pair by specifying a map function that generates a series of intermediate key-value pairs. Using the reduce function, we then merge the values associated with the same intermediate key. This paper shows some real-world tasks under the model.

Programs that are written functionally will be executed automatically, in parallel, over a large cluster. The run-time system takes care of the details of how to split the incoming data, schedule the program’s resources, handle machine failures, and manage the necessary communication between machines. This allows programmers with no experience working with parallel computing or distributed systems to easily take advantage of the resources of a large distributed system.

Our cluster is highly scalable: a typical MapReduce program can process terabytes of data on thousands of machines. Programmers will find the system useful: hundreds of MapReduce applications are implemented at Google every day, and thousands of MapReduce jobs run on clusters.

1. Introduction

Over the past five years, the author and many others at Google have implemented hundreds of computations for special purposes: These calculations are used to process a large amount of original data, such as documents captured by crawlers, Web request logs, etc., so as to calculate various derived data, such as inverted indexes, various analysis image representations of Web documents, result summary of the number of pages crawled by each host, and collection of the most frequent requests in a given date. Most of these calculations are conceptually straightforward. However, the amount of input data is usually large, and these calculations must be distributed across hundreds or thousands of machines in order to complete in a reasonable amount of time. How to parallelize the calculation, data distribution and troubleshooting requires a lot of complex code to deal with it, which makes the original simple calculation not so easy.

To deal with this complexity, we designed a new abstraction that allowed us to express only the simplest computation I wanted to perform, but hid the complex implementation details of computation parallelization, fault tolerance, data distribution, and load balancing in the underlying layer. The inspiration for this abstraction comes from the Map and Reduce primitives in Lisp and many other functional languages. We realize that most of our calculations involve using map operations on each logical “record” in the input data to generate temporary KV key pairs in the middle of the columns, and then we perform reduce operations on the same key values in these key pairs to appropriately combine derived data. Using a functional model of user-specified map and Reduce operations makes it easy to parallelize large calculations and use re-execution as the primary fault-tolerant mechanism.

The main contribution of our work is to provide a simple and powerful interface for automatic parallelization and distribution of large-scale computing, and to achieve high performance on a large number of commercial PC clusters through this interface implementation.

Section 2 of this article describes the basic programming model and gives several examples. Section 3 describes an implementation of a MapReduce interface customized for a cluster-based computing environment. Section 4 describes several programming models that we found useful. Section 5 performs a performance test of our implementation using various tasks. Section 6 explores the use of MapReduce in Google, including our experience with using it to rewrite the production indexing system. Section 7 discusses relevant and future work.

2. Programming Model

The calculation takes a series of key and value pairs and generates a series of key and value pairs. Users of MapReduce libraries represent calculations as two functions: Map and Reduce.

Map, written by the user to generate a new series of intermediate key-value pairs from input key-value pairs. The MapReduce library groups all intermediate values that have the same intermediate Key: I together and passes them to the Reduce function.

The Reduce function is also written by the user. It takes an intermediate Key: I and a series of values corresponding to that Key. It’s going to combine sets of these values and probably produce smaller sets. Typically, only zero or one output result is generated after each Reduce call. These intermediate values are provided to the user’s reduce function through iterators. This allows us to deal with lists of values that are too large to be stored entirely in memory.

2.1 Example

Consider counting the number of occurrences of each word in a large number of documents. The user might write code that looks like the following pseudo-code:

map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, "1");

reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));
Copy the code

The map function adds an association number to each word: the number of times that word occurs (1 in this simple example). The reduce function aggregates all the count results for a given word.

In addition, code written by the user with the input and output file names and some optional parameters writes the data into the MapReduce convention object, and the user can then call the MapReduce method directly and pass the specific object generated above to MapReduce. This allows the user’s code to be linked to the MapReduce library (implemented in C++). Appendix A contains the complete program for this example.

2.2 Types

Although the previous pseudocode was written based on the fact that the input and output are both string types, conceptually, the user-provided Map and Reduce functions have the following type associations:

Map (k1,v1) → list(k2,v2) reduce (k2,list(v2)) → list(v2)Copy the code

In other words, the input keys and values are not in the same field as the output keys and values. . In addition, the middle keys and values are in the same domain as the keys and values of the result. “V2”

Our C++ implementation is to use strings to interact with user-defined functions, and the conversion of the string to the desired type is left to the user’s own code to convert.

More Examples

Here are some simple examples of interesting programs that can easily represent MapReduce computation.

Distributed grep: If a row of data matches a given pattern, the map function will issue a new row, and the Reduce function is an identification function that only copies the intermediate data into the resulting output.

Calculate URL access frequency: The map function processes the request log of a Web page and outputs {URL,1}. The reduce function then adds the values of the same URL and issues a key-value pair of {URL, sum}.

Invert the Web link graph: The map function outputs all available links on a page called source and their target URLS as {target, source}. The reduce function then aggregates the sources of the same target and issues the new key-value pair {target, list(source)}.

Term-vectors for each host: a term-vector is a list of {word, frequency} key-value pairs that summarize the most important words in a document or set of documents. The map function processes each input document (whose hostname is extracted from the document’s URL) as {hostname, term vector} and outgoing. The reduce function passes a term-vector for all documents on the host, then adds the vectors, drops the less commonly used terms, and finally emits {hostname, term vector} key-value pairs.

Inverted index: The map function parses each document and issues a series of {word, document ID} key-value pairs. The Reduce function accepts all key-value pairs for a given word, then sorts the document ID of that word and issues {word, list(document ID)} key-value pairs. The set of all the output key-value pairs forms a simple inverted index. It is easy to add this calculation to track the position of words.

Distributed sorting: The map function extracts the key from each record and issues the key-value pair {key, record}. The reduce function sends all key-value pairs unmodified. This calculation depends on the partitioning tool described in Section 4.1 and the sort properties described in Section 4.2.

3 Implementation

There are many different implementations of the MapReduce interface. The right choice depends on the circumstances. For example, one implementation might be suitable for small shared memory machines, another for large NUMA multiprocessors, and another for an even larger set of networked machines.

This section describes an implementation of a computing environment that is widely used at Google: a large cluster of business PCS connected together by switched Ethernet every day [4]. In our environment:

(1) The general machine is running the Linux system dual-processor x86 processor, each machine has 2-4 GB of memory.

(2) use commercial network hardware – typically 100 mbit/s or 1G/ s at the single-machine level, but the overall average bandwidth is much less than the median bandwidth.

(3) Clusters consist of hundreds or thousands of machines, so machine failures are common.

(4) Storage is provided by inexpensive IDE disks that are directly connected to each machine. Internally developed distributed file systems [8] are used to manage the data stored on these disks. File systems provide availability and reliability over unreliable hardware through replication.

(5) The user submits the job to the scheduling system. Each job consists of a set of tasks mapped by the scheduler to a set of available computers in the cluster.

3.1 Execution Overview

The call to the Map function will be distributed across multiple machines automatically dividing the input data into M pieces. These segmented inputs can be processed in parallel on different machines. The Reduce function is also distributed across multiple machines. It uses a partitioning function (such as hash(key) mod R) to slice the temporary key generated in the map into R fragments. The number of partitions (R) and partition functions are specified by the user.

Figure 1 shows the overall flow of MapReduce operations in our implementation. When a user program calls a MapReduce function, the following sequence of actions occurs (the numbered labels in Figure 1 correspond to the numbers in the following list) :

  1. The MapReduce library in the user program first divides the input file into M blocks, typically between 16M and 64MB (which can be controlled by the user with optional parameters). MapReduce then starts many copies of the program on a set of computers [the process of fork].

  2. One of the copies of this program is special -master. The rest are workers who are assigned work by their owners. There are M Map jobs and R Reduce jobs to be assigned. The master will select idle workers and assign a Map or Reduce task to each worker.

  3. The worker assigned with the Map task should read the content of the corresponding input block that is split. It parses the key/value pair from the input data and passes the key/value pair to the user-defined Map function. The intermediate key/value pairs generated by the Map function are cached in memory.

  4. Periodically, cached key-value pairs are written to the local disk, and the data is divided into R regions using partitioning functions. The cached data is serialized to the local disk and the disk locations are passed back to the master, who is responsible for forwarding these locations to reduce workers.

  5. When the master server notifies Reduce Workers of these locations, it uses remote procedure calls (RPC) to read cached data from the map’s workers’ local disks. When the Reduce worker reads all the intermediate data, it sorts the data by key so that all the same keys that appear are grouped together. Sorting is required because often many different keys are mapped to the same Reduce job. If the amount of intermediate data is too large to store in memory, an external sort is used.

  6. The Reduce worker iterates over the sorted intermediate data, and for each unique key encountered, it passes the set of intermediate values corresponding to the key to the user’s Reduce function. The output of the Reduce function is attached to a final output file on the Reduce partition.

  7. When all map and Reduce jobs are completed, the master wakes up the user program. At this point, the MapReduce call in the user program returns to the user code.

After successful completion, the MapReduce execution results are in the R output files (one for each Reduce job, with the name specified by the user). Typically, users do not need to combine these R output files into a single file; they usually pass them as input to another call to a MapReduce process, or use them from another distributed application that can handle multiple partitioned files as input.

3.2 Master Data Structures

The master owns several data structures. For each Map and Reduce task, it stores their state (idle, in progress, or completed) and the identity of the working machine (for non-idle tasks).

The master is the conduit that propagates the location of these file areas from the Map job to the Reduce job. Therefore, for each completed MAP task, the Master stores the location and size of the R intermediate file blocks generated by the map task. When the map task is complete, these changes to the file location and size are also accepted by the master. The information is gradually pushed to workers who are performing reduce tasks.

3.3 Fault Tolerance

Because MapReduce libraries are designed to help handle large amounts of data with hundreds or thousands of computers, the libraries must be able to gracefully tolerate computer failures.

The Worker failed

The master periodically pings each worker. If no worker response is received within a specified time range, the master marks the worker node as invalid. All Map tasks completed by this invalid worker are set back to the initial idle state, so that these tasks can be arranged to other workers. Similarly, the Map or Reduce jobs that are running on the invalid worker will be reset to idle state and wait for rescheduling.

When the worker fails, the completed MAP task needs to be re-executed because the output of these tasks is stored on this machine, so the output becomes inaccessible. The output of the completed Reduce job is stored on the global file system (which is safe on systems like GFS) and therefore does not need to be executed again.

When A Map task is first executed by Worker A and then scheduled to be executed by Worker B (because Worker A fails), the re-execution action will be notified to all workers who execute Reduce jobs. Any Reduce jobs that have not yet read data from Worker A will instead read data from Worker B.

MapReduce can adapt to large-scale worker failure. For example, in a MapReduce operation, network maintenance on a running cluster causes 80 machines to be inaccessible within a few minutes at the same time. The MapReduce master simply executes the work that has been completed by those inaccessible workers again, and then continues the execution. Finally, the MapReduce operation is completed.

The Master of failure

The master can easily periodically write the data structures described above to the disk to generate checkpoints. If the master task fails, start a new replica master process from the latest checkpoint. However, since there is only one master process, it is unlikely that the master will fail. So if the master fails in our current implementation, MapReduce computation is aborted. The client can check the status and re-execute the MapReduce operation as needed.

There is the semantics of failure.

When the user-provided Map and Reduce operators are functions whose input values are determined, the output of our distributed implementation is the same as the output of the entire program executed in sequence without any errors. The same input should be the same output.

We accomplish this feature by atomically submitting the output of Map and Reduce jobs. Each executing task writes its output to a private temporary file. A Reduce job generates one such file, and a Map job generates R such files (one for each Reduce job). When a Map task is completed, the worker sends a message containing R temporary file names to the master. If the master receives a completion message from a completed Map task, the master simply ignores the message. Otherwise, the master records the names of the R files in a data structure.

When the reduce task is completed, the Reduce worker will atomically rename the temporary output file to the final output file. If the same Reduce job is executed on multiple machines, multiple renaming operations will be applied to the same final output file. We rely on atomicity renaming operations provided by the underlying file system to ensure that the final file system state is only the data generated by a Reduce job.

The vast majority of Map and Reduce operators are deterministic, and the fact that in this case we are semantically equivalent to the sequential execution case makes it easy for programmers to infer the behavior of their programs. When Map and/or Reduce operators are uncertain, we provide a weak but still reasonable processing mechanism. In the case of nondeterministic operators, the output of a specific Reduce job R1 is equivalent to the output produced by sequential execution of a nondeterministic program. However, the output of another Reduce job, R2, might correspond to the output of R2 produced by a different non-deterministic sequence of program execution.

Consider Map job M and Reduce job R1 and R2. Let e(Ri) be the execution that Ri has committed (there is one and only one such execution). Weak semantics occur when e(R1) reads the output produced by one execution of M, and e(R2) reads the output produced by another execution of M.

3.4 Location of Storage (Locality)

Network bandwidth is a relatively scarce resource in our computing environment. We save network bandwidth by storing input data (managed by GFS) as much as possible on local disks on the machines in the cluster. GFS splits each file into 64MB chunks, with multiple copies of each chunk (typically three copies) kept on multiple machines. When scheduling map jobs, the MapReduce master considers the location of input files and tries to execute map jobs on the machines that store backup input data. If the above fails (e.g. the stored machine cannot assign the task), the Master will attempt to perform the Map task on a machine that is close to the one with the input data backup (e.g. on a worker machine that is on the same switch as the machine that contains the input data). When you run a large MapReduce operation on a large enough cluster, most of the input data can be read from the local machine and therefore consumes little network bandwidth.

3.5 Task Granularity

As mentioned above, we split map into M segments and Reduce into R segments for execution. Ideally, M and R should be much larger than the number of worker machines in the cluster. The execution of a large number of different tasks on each worker machine contributes to the dynamic load balancing of the cluster, and the failure of one worker can accelerate the recovery speed: many map tasks that have been completed on the failed worker machine can be distributed to any other worker machine for execution.

There is a practical limit to how big M and R can be in our implementation, because the master must execute O(M+R) times of scheduling and keep O(MxR) states in memory (but the impact on memory usage is small: O(MxR) Block status. Each pair of Map or Reduce jobs contains about one byte. In addition, the R value is usually user-specified, since each Reduce job eventually generates a separate output file. In practice, we also tend to choose an appropriate value of M, so that each independent task can process about 16M to 64M of input data (in this way, the storage location optimization mentioned above is most effective), and we set R as a multiple of the smaller number of worker machines we intend to use. We typically perform MapReduce calculations at this ratio: M=200,000, R=5,000, using 2000 worker machines.

3.6 Backup Tasks

A common reason for extending the total execution time of MapReduce is because of a “lag” : it takes a long time for a machine to complete the last few Map or Reduce tasks in the computation process. There are many reasons why there are “laggards”. For example, if a machine has a hard disk problem, it may cause frequent error correction and slow down the reading speed from 30 MB /s to 1 MB /s. The cluster’s scheduling system may schedule additional tasks on the same machine, resulting in slower execution of MapReduce code due to CPU, memory, local hard disk, and network bandwidth issues. One of the problems we recently encountered was due to a bug in the machine’s initialization code that caused the processor cache to be disabled: The affected computer’s computation slowed down by more than a hundred times.

We have a general mechanism to reduce the number of “laggards.” When a MapReduce operation is nearly complete, the master schedules the backup task process to execute the remaining pending tasks. Either the initial execution or the end of the backup task is marked as complete. We have adjusted this mechanism so that the additional computing resources it brings are no more than a few percentage points. We found that this significantly reduced the time to complete large MapReduce operations. For example, the sorting task described in Section 5.3 takes 44% more time to complete when the standby task is turned off.

4 Refinements

Although the basic functionality provided by writing simple Map and Reduce functions has met most of the requirements, we have found some useful extensions. These extensions are described in this section. Would it be better to call it extension?

4.1 Partitioning Function

MapReduce users typically specify the Reduce jobs they want and the number of output files (R). Data is partitioned using an intermediate key through the partition function. A default partitioning function is to partition using hash partitioning (for example, hash(key) mod R). Hash partitioning produces well-balanced partitions. However, in some cases, it can be useful to partition key values using other partitioning functions. For example, if the key result of the output is URLs, we want all records for each host to be stored in the same output file. To support similar situations, users of MapReduce libraries can provide a dedicated partitioning function. For example, using hash(Hostname(URlkey)) mod R as a partitioning function causes all URLs from the same host to end up in the same output file.

4.2 Ordering Guarantees

We guarantee that in a given partition, the intermediate key pairs will be processed in the order of key increments. The order processing to ensure the can easily let each partition of the generated output file is ordered, as the format of the output file needs to be able to support efficient according to the random access to find the key, will be very useful, or is the output file users will find data sets to sort the results will be very convenient.

4.3 Combiner Function

In some cases, duplicate data of intermediate key values generated by the map function may account for a large proportion, and the reduce function given by the user satisfies the commutative and associative laws. The example of counting word frequency in Section 2.1 is a good example. Since word frequency tends to be a Zipf distribution, each MAP task will produce hundreds or thousands of records in <the,1> format. All of these counts are sent over the network to a single Reduce job, and the values are then added up through the reduce function to produce a single number. We allow the user to specify an optional Combiner function that allows the data to be partially merged before being sent to the network.

The Combiner function is executed on each machine that performs the Map task. Generally the Combiner and Reduce functions are implemented using the same code. The only difference is how the MapReduce library handles the output of the function. The output of the Reduce function is written to the final output file, while the output of the Combiner function is written to the intermediate file, which is then sent to a Reduce job.

Partial merging can significantly speed up some MapReduce operations. Appendix A contains an example of using the combiner function.

4.4 Input and Output Types

The MapReduce libraries support several different formats of input data. For example, each line of input data in text mode is treated as a key-value pair: key is the offset of that line in the file, and value is the contents of that line. Another common format is to store a sequence of key-value pairs sorted by key. Implementations of each type of input need to be able to split the input data into data fragments so that subsequent processing can be done by separate Map tasks (for example, the post-scope of text-mode segmentation must ensure that scope segmentation occurs only at the boundaries of each line). Although most consumers most of the time only need to use one of the small number of input types predefined by MapReduce, a consumer can support a new input type simply by providing an implementation of the Reader interface.

A reader does not have to read data from a file. For example, we can easily define a reader that reads records from a database or reads data from an in-memory data structure.

In a similar way, we provide a set of predefined output data types that can be used to produce data in different formats, and user code can easily add support for new output types.

4.5 Side Effects

In some cases, MapReduce users find it easier to add auxiliary output files to Map and/or Reduce operations. We rely on program writer to turn this “side effect” into atomic and idempotent 3. Typically, the application first writes the output to a temporary file, and after all the data has been printed, renames the temporary file using the system-level atomic operation rename. If a task produces multiple output files, we do not provide atomic operations such as two-phase commit to support this. Therefore, tasks that produce multiple output files and have consistency requirements across files must be deterministic. In practice, however, this limitation has not caused us any trouble.

4.6 Skipping Bad Records

Sometimes, a bug in user code causes the Map or Reduce function to crash while processing certain records, preventing the MapReduce operation from completing. The usual approach is to fix the bug and then execute the MapReduce operation again, but sometimes this is not feasible; These bugs may be in third-party dependency libraries where the source code is not available. At the same time, it is acceptable to skip some questionable records, such as when doing statistical analysis on a large data set. We provide an optional execution mode in which MapReduce detects which records cause deterministic crashes and skiffs them to ensure that the entire processing continues. 【replace here new】

Each worker process is equipped with a signal processor to catch memory segment exceptions and bus errors. Before a user’s Map or Reduce operation is invoked, the MapReduce library stores the sequence number of the execution parameters in a global variable. If the user program generates a signal, the message handler sends the “last send” UDP packet containing the sequence number to the master. If the master sees that the processing of a specific record fails more than once, the record needs to be skipped when the Map or Reduce job is executed again next time.

4.7 Local Execution

Debugging the Map and Reduce functions can be tricky because the actual computation is performed in a distributed system, typically on thousands of computers, and the assignments are determined dynamically by the master. To simplify debugging, analysis, and small-scale testing, we have developed an additional Set of MapReduce libraries that allow MapReduce jobs to execute all tasks sequentially on the local machine. Control is given to the user, allowing the user to restrict computation to specific Map tasks. Users call their programs locally by setting special flags, making it easy to use any debugging and testing tool (such as GDB).

4.8 Status Information

The Master runs an embedded HTTP server and exposes a set of status information pages for monitoring. The status page shows the progress of the computation, such as how many tasks have been completed, how many tasks are being processed, bytes of input, bytes of intermediate data, bytes of output, percentage of processing, and so on. The page also contains links to the standard error and standard output files generated by each task. Users use this data to predict how long the computation will take to perform and whether additional computing resources will be required. These pages can also be used to analyze when calculations are performing slower than expected.

In addition, the status page at the top level shows which workers have failed and which Map and Reduce jobs were running at the time of their failure. This information is useful when the user is trying to diagnose bugs in the user code.

4.9 Counters

The MapReduce library provides a counter to count the occurrence of various events. For example, a user might want to count the total number of words that have been processed, or the number of German documents that have been indexed, etc.

To use this feature, the user creates and names a counter object in the program and increments the counter in the Map and Reduce functions where appropriate. Such as:

Counter * uppercase;
uppercase = GetCounter("uppercase");
map(String name, String contents):
    for each word w in contents:
    if (IsCapitalized(w)):
        uppercase - > Increment();
EmitIntermediate(w, "1");
Copy the code

The values of these counters are periodically uploaded from individual worker machines to the master (attached to the ping response packet). The master aggregates the counter values of successful Map and Reduce jobs and returns them to the user code after the MapReduce operation is complete. The value of the current counter is also displayed on the master’s status page, so that the user can observe the progress of the current live calculation. When aggregating counter values, the master must eliminate the impact of repeated Map or Reduce jobs to avoid the accumulation (the use of a backup job and the re-execution of a backup job after the backup job fails will lead to repeated execution).

Some counter values are automatically maintained by the MapReduce library, such as the number of input key-value pairs that have been processed, the number of output key-value pairs, and so on.

Users will find the counter functionality useful for checking the behavior of MapReduce operations. For example, in some MapReduce operations, the user needs to ensure that the number of output key-value pairs is exactly equal to the number of input key-value pairs, or that the number of German documents processed is within the tolerable range of the total number of documents processed.

5 Performance

In this section we run two calculations on a large cluster to measure MapReduce performance. One calculation searches for data that matches a particular pattern in about a terabyte of data, and the other sorts that data for about a terabyte.

These two programs are typical of real-world applications for a large number of MapReduce users – one that converts data from one representation to another; The other is to extract a small amount of data from a large data set that is of interest to the user.

5.1 Cluster Configuration

All programs are executed on a cluster of about 1800 machines. Each machine is equipped with two 2GHz Intel Xeon processors, support for hyper-threading, 4GB of memory, two 160GB IDE disks and a gigabit Ethernet card. These machines are deployed on a two-tier tree-switched network with approximately 100-200 Gbps of total bandwidth available at the root node. All of these machines have the same deployment, so the network round-trip time between any pair of machines is less than 1 millisecond.

Of the 4GB of memory, about 1-1.5GB is reserved for other tasks running on the cluster. The program was executed on a weekend afternoon, when the CPU, disk, and network were mostly idle.

5.2 Test 1 – Search (Grep)

The grep program scans about 10 ^ 10 of each 100-byte record for a relatively rare 3-character pattern (this pattern appears in 92,337 records). The input data is divided into fragments of approximately 64M (M=15000), and the entire output data is stored in a file (R=1).

Figure 2 shows the processing of this operation over time. The Y-axis represents the processing speed of the input data. The processing speed increases with the number of machines assigned to MapReduce computation, and when 1764 workers are assigned to participate in the computation, the processing speed reaches 30GB/s. When the Map task ends, 80 seconds after the computation begins, the input processing speed drops to zero. The whole calculation took about 150 seconds from start to finish. This includes an initial startup phase of about one minute. The initial startup time is due to the time to transfer the program to the various worker machines, the time to wait for the GFS file system to open the set of 1000 input files, and the time to obtain the relevant file local location optimization information.

5.3 Test 2 – Sort

The sorter will sort 10 ^ 10 of each 100-byte record (about 1TB of data). This program is modeled after TeraSort’s Benchmark [10].

The sorter consists of less than 50 lines of code. With only three lines, the Map function extracts a 10-byte key value from the text line as the sorted key and outputs that key and the original text line as the middle key-value pair. We used a built-in identity function for the Reduce operation. This function prints the middle key-value pair without changing anything. The final sort is output to a two-way copy of the GFS system file (that is, the program outputs 2TB of data).

As mentioned earlier, the input data is divided into 64MB slices (M=15000). We partition the sorted output and store it in 4000 files (R=4000). The partition function uses the raw bytes of the key to partition data into R segments.

The partition function in our Benchmark test knows the distribution of keys. For a normal sorting program, we would add a preprocessed MapReduce operation to sample the key distribution and calculate the partitioning points for the final sorting process.

Figure 3 (a) shows a normal execution of this sorter. The top left shows the speed at which input data is read. The data read speed peaks at 13GB/s and quickly drops to zero after about 200 seconds after all Map tasks are completed. Note that the sorter input data is read less quickly than the grep program above. This is because the Map task of the sorter spends about half of its processing time and I/O bandwidth writing the intermediate output to the local hard drive. The intermediate output of the corresponding distributed grep program can be almost ignored.

The figure in the middle left shows the network speed at which the intermediate data is sent from the Map job to the Reduce job. The process starts slowly from the completion of the first Map task. The first peak in the figure is the first batch of approximately 1700 Reduce jobs started (the entire MapReduce is allocated to approximately 1700 machines, each machine executes at most one Reduce job at a time). After about 300 seconds, some of the first Reduce jobs are finished. We start to shuffle data for the remaining Reduce jobs. All processing ends after about 600 seconds.

The lower left figure shows the speed at which a Reduce job writes sorted data to the final output file. There is a small delay between the end of the first sort phase and data starting to be written to disk because the worker machine is busy sorting the intermediate data. Write speeds range from 2-4GB/s for a while. The entire write time lasts about 850 seconds. Including the initial startup time, the entire operation took 891 seconds. This is close to the best record by TeraSort Benchmark [18] of 1057 seconds.

One more thing to mention: the input data is read faster than the shuffle speed or the output data is written to disk. This is due to our input data localization optimization strategy — most of the data is read from the local hard disk, bypassing network bandwidth limitations. Sorting is faster than writing the output data to disk, because the output data is written in two copies (we use a double backup GFS file system for data reliability and availability). The reason we write the output data to the two replication nodes is because this is the underlying file system’s implementation mechanism for ensuring data reliability and availability. If the underlying file system uses Erasure coding [14] rather than replication to ensure the reliability and availability of the data when the output data is written to disk, then the need for network bandwidth can be reduced.

5.4 Effect of Backup Tasks

Figure 3 (b) shows the sorter execution after the backup job is closed. The execution process is similar to figure 3 (a), except that the output data is written to disk with a long tail in time, and during this time there is very little writing. After 960 seconds, all but five Reduce jobs have been completed. These slow tasks took another 300 seconds to complete. The entire calculation took 1,283 seconds, a 44% increase in execution time.

5.5 Machine Failures

In the execution process of the sorting program demonstrated in Figure 3 (c), we deliberately killed 200 workers out of 1746 in a few minutes. The underlying scheduling of the cluster immediately restarts the new worker processing process on these machines (because only the worker process on the worker machine was killed and the machine itself was still working).

The death of the worker shows a negative input data read rate because some previously completed MAP tasks are lost (because the worker of the corresponding map task was killed) and need to be re-executed. These Map tasks will be re-executed soon. The entire computation was completed in 933 seconds, including startup overhead (only 5% more than normal execution time).

6 Experience

We wrote the first version of the MapReduce library in January 2003, and made significant enhancements in August 2003, including local data optimization, dynamic load balancing of tasks between worker machines, and more. Since then, we’ve been surprised at how widely MapReduce libraries can be applied to all kinds of problems we encounter in our daily work. It is now widely used in various areas within Google, including:

  • Large-scale machine learning problems,
  • Clustering issues with Google News and Froogle products,
  • Extracting data from popular query records to generate reports (like Google’s Zeitgeist),
  • Extracting useful information from a large number of web pages for new applications and products (for example, extracting geolocation information from a large number of location search pages),
  • Large-scale graph calculation.

Figure 4 shows the huge increase in the number of independent MapReduce programs in our source control system over this period of time. From zero in early 2003 to almost 900 different instances in September 2004. MapReduce is successful because the MapReduce libraries can write a simple program in less than half an hour and run it efficiently on a cluster of thousands of machines, greatly speeding up the development and prototyping cycle. In addition, MapReduce makes it easy for programmers with no experience in distributed and/or parallel system development to leverage a large number of resources.

At the end of each task, the MapReduce library logs analyze the usage of computing resources. In Table 1, we show a subset of the statistics consumed by MapReduce jobs running at Google in August 2004.

6.1 Large-scale Indexing

One of the most successful uses of MapReduce so far has been to rewrite the indexing system used by Google’s Web search service. The indexing system uses massive documents captured by web crawlers as input data, and these data are stored in GFS. The original contents of these documents contain more than 20 terabytes of data. The indexer creates an index through a series of approximately 5 to 10 MapReduce operations. Using MapReduce (as opposed to earlier versions of the distributed indexing system) provides these benefits:

  • The code for indexing is simpler, smaller, and easier to understand because the code for handling fault-tolerant, distributed, and parallel computing is hidden in MapReduce libraries. For example, using the MapReduce library, the number of lines of code in a computation phase is reduced from 3,800 lines of C++ code to about 700 lines of code.
  • The Performance of the MapReduce libraries is good enough that we can avoid the overhead of passing data by keeping conceptually unrelated computational steps separate rather than mixing them together. Doing so also makes it easy to change the index processing. For example, a change to the previous indexing system might have taken months, but MapReduce only takes a few days to implement.
  • The indexing process becomes easier to handle because problems caused by machines, machine slowdown, and network failures are handled automatically by the Mapre-Duce library without operator intervention. In addition, you can easily improve the performance of the indexing process by adding new machines to the in-dexing cluster.
  • The process of building indexes has also become much easier. Because most of the problems caused by machine failures, machine slowdowns, network congestion, etc., have been solved by MapReduce libraries without operator intervention. In addition, we can easily improve the performance of our index build simply by adding new machines to the index cluster.

7. Related Work

Many systems offer strict programming patterns and automate parallel computing by placing strict restrictions on programming. For example, an associative function can take the prefix of an array of N elements on N processors and compute it using the parallel prefix algorithm in log N time. MapReduce can be thought of as a simplification and extraction of these models based on our experience with massive computing in the real world. More importantly, we provide fault-tolerant implementations on a scale of thousands of processors. In contrast, most parallel processing systems implement small clusters of parallelism and leave the details of processing machine failures to the programmer.

Bulk Synchronous Programming[17] and some MPI primitives [11] provide a higher level of abstraction to make it easier for programmers to write parallel processors. A key difference between MapReduce and these systems is that MapReduce utilizes a limited programming model to automate concurrent processing of user programs and provides transparent fault-tolerant processing.

Our data local optimization strategy is inspired by techniques such as active disks[12,15], which minimize network and IO subsystem throughput by pushing processing data elements as far away as possible from the local disk of the computing processing node. We perform our operations on a normal machine with several hard disks mounted, rather than on a disk processor, but the two general methods are similar.

Our standby task mechanism is similar to the Eager scheduling mechanism used by Charlotte’s system [3]. One disadvantage of this mechanism is that if a task fails repeatedly, the entire computation cannot be completed. We solved this problem to some extent by skipping the bad notes.

The MapReduce implementation relies on an internal cluster management system that distributes and runs user tasks across a large cluster of shared machines. Although this is not the focus of this paper, it is worth mentioning that the concept of this cluster management system is the same as other systems such as Condor[16].

The sorting mechanism is part of the MapReduce library and is similar in operation to now-sort [1]. The source machine (map workers) partitions the data to be sorted and sends it to one of the R Reduce workers for processing. Each Reduce worker sorts the data locally (in memory if possible). Of course, nowsort doesn’t have the broad applicability of user-defined Map and Reduce functions like MapReduce.

River[2] provides a programming model for processing processes to communicate with each other by sending data in a distributed queue. Like MapReduce, River systems try to provide good average performance in heterogeneous hardware environments, or in the case of system disturbances. River balances tasks by carefully scheduling communication between the hard drive and the network. The MapReduce library takes another approach. By limiting the programming model, the MapReduce framework decomposes the problem into a large number of fine-grained tasks. These tasks are dynamically scheduled on available worker clusters, so that faster workers can perform more tasks. By limiting the programming model, we can schedule redundant tasks when the work is close to completion, which greatly reduces the time to complete the entire operation (such as slow or blocked workers) in the case of heterogeneous hardware.

Bad-fs [5] has a completely different programming model from MapReduce, which is for wan task execution. However, the two systems have two basic features that are similar. (1) The two systems adopt the mode of re-execution to prevent data loss due to failure. (2) Both of them use the data localization scheduling strategy to reduce the data volume of network communication.

TACC[7] is a system designed to simplify the construction of highly available network services. Like MapReduce, it relies on re-execution mechanisms for fault-tolerant handling.

8 Conclusions

The MapReduce programming model has been used successfully in several areas within Google. We attribute this success to several reasons: First, the model is simple to use, even for programmers with no parallel or distributed system development experience, because MapReduce encapsulation hides the underlying implementation details of parallel processing, fault tolerance, data localization optimization, load balancing, and so on. Second, a large number of different types of problems can be represented in the form of MapReduce calculations. For example, MapReduce is used for data generation in Google’s Web search services, for sorting, for data mining, for machine learning, and many other systems; Third, we developed an implementation of MapReduce that can run in a large cluster consisting of thousands of machines. This implementation makes more use of these rich computing resources, so it is well suited to solve the computation-intensive problems That Google has encountered.

We also learned something from this job. First, the constrained programming model makes parallel and distributed computing easy and fault-tolerant. Second, network bandwidth is a scarce resource. A lot of our systems are designed to reduce data transfer over the network: local optimization allows a lot of data to be read from the local disk, and writing a copy of the intermediate file on the local disk also saves network bandwidth; Third, redundant execution can reduce the negative effects of slow machines and solve the problem of data loss due to machine failures.

Acknowledgements

Josh Levenberg has been instrumental in revising and extending the user-level MapReduce API with a number of new features based on his experience with using MapReduce and other people’s suggestions for enhancements. MapReduce reads its input from and writes its output to the Google File System [8]. We would like to thank Mohit Aron, Howard Gobioff, Markus Gutschke, David Kramer, Shun-Tak Leung, and Josh Redstone for their work in developing GFS. We would also like to thank Percy Liang and Olcan Sercinoglu for their work in developing the cluster management system used by MapReduce. Mike Burrows, Wilson Hsieh, Josh Levenberg, Sharon Perl, Rob Pike, and Debby Wallach provided helpful comments on earlier drafts of this paper. The anonymous OSDI reviewers, and our shepherd, Eric Brewer, provided many useful suggestions of areas where the paper could be improved. Finally, we thank all the users of MapReduce within Google’s engineering organization for providing helpful feedback, suggestions, and bug reports.

References

[1] Andrea C. Arpaci-Dusseau, Remzi H. Arpaci-Dusseau,David E. Culler, Joseph M. Hellerstein, and David A. Pat-terson. High-performance sorting on networks of work-stations. In Proceedings of the 1997 ACM SIGMOD In-ternational Conference on Management of Data, Tucson,Arizona, May 1997.

[2] Remzi H. Arpaci-Dusseau, Eric Anderson, Noah Treuhaft, David E. Culler, Joseph M. Hellerstein, David Patterson, and Kathy Yelick. Cluster I/O with River: Making the fast casecommon. InProceedings of the Sixth Workshop on Input/Output in Parallel and Distributed Systems (IOPADS ’99), Pages 10 — 22, Atlanta, Georgia, May 1999.

[3] Arash Baratloo, Mehmet Karaul, Zvi Kedem, and Peter Wyckoff. Charlotte: Metacomputing on the web. In Pro-ceedings of the 9th International Conference on Parallel and Distributed Computing [4] Luiz a. Barroso, Jeffrey Dean, and Urs Holzle. Web searchfor aplanet: TheGooglecluster architecture. IEEE Micro, 23(2):22 — 28, April 2003.

[5] John Bent, Douglas Thain, Andrea C.Arpaci-Dusseau,Remzi H. Arpaci-Dusseau, and Miron Livny. Explicit control in a batch-aware distributed file system. In Pro-ceedings of the 1st USENIX Symposium on Networked Systems Design and Implementation NSDI, March 2004.

[6] Guy E. Blelloch. Scans as primitive parallel operations.IEEE Transactions on Computers, C-38(11), November 1989.

[7] Armando Fox, Steven D. Gribble, Yatin Chawathe, Eric A. Brewer, and Paul Gauthier. Cluster-based scal- able network services. In Proceedings of the 16th ACM Symposium on Operating System Principles, Pages 78 — 91, Saint-Malo, France, 1997.

[8] Sanjay Ghemawat, Howard Gobioff, Shun-Tak Le-ung. The Google File System. In 19th Symposium on Op-erating Systems Principles, Pages 29 — 43, Lake George, New York, 2003

[9] S. Gorlatch. Systematic efficient parallelization of scan and other list homomorphisms. In L. Bouge, P. Fraigni-aud, A. Mignotte, and Y. Robert, Editors, euro-PAR ’96. Parallel Processing, Lecture Notes in Computer Science 1124, Pages 401-408. Springer-verlag, 1996.

[10] Jim Gray. Sort benchmark home page. The research.microsoft.com/barc/SortBe… .

[11] William Gropp, Ewing Lusk, and Anthony Skjellum. Using MPI: Portable Parallel Programming with the Message-Passing Interface. MIT Press, Cambridge, MA, 1999.

[12] L. Huston, R. Sukthankar, R. Wickremesinghe, M. Satya-narayanan, G. R. Ganger, E. Riedel, and A. Ailamaki. Di-amond: A storage architecture for early discard in inter-active search. In Proceedings of the 2004 USENIX File and Storage Technologies FAST Conference, April 2004.

[13] Richard E. Ladner and Michael J. Fischer. Parallel prefix Computation. Journal of the ACM, 27(4):831 — 838, 1980.

[14] Michael O. Rabin. Efficient dispersal of information for security, Load balancing and fault Tolerance. Journal of the ACM, 36(2):335 — 348, 1989.

[15] Erik Riedel, Christos Faloutsos, Garth A. Gibson, And David Nagle. Active Disks for Large-scale data Process-ing. IEEE Computer, Pages 68 — 74, June 2001.

[16] Douglas Thain, Todd Tannenbaum, and Miron Livny. Distributed computing in practice: The Condor experi-ence. Concurrency and Computation: Practice and Ex-perience, 2004.

[17] l. g. Valiant. A bridging model for the parallel computation.Com munications of the ACM, 33 (8) : 103-111, 1997.

[18] Jim Wyllie. Spsort: How to sort a terabyte quickly. Alme1.almaden.ibm.com/cs/spsort.p… .

A Word Frequency

This section contains a complete program that counts the number of occurrences of each unique word in a series of input files typed on the command line.

#include "mapreduce/mapreduce.h"
 / / User 's map function
class WordCounter: public Mapper {
    public: virtual void Map(const MapInput & input) {
        const string & text = input.value();
        const int n = text.size();
        for (int i = 0; i < n;) {
            // Skip past leading whitespace
            while ((i < n) && isspace(text[i]))
                i++;
            // Find word end
            int start = i;
            while ((i < n) && !isspace(text[i]))
                i++;
            if (start < i)
                Emit(text.substr(start, i - start), "1"); }}}; REGISTER_MAPPER(WordCounter);/ / User 's the reduce function
class Adder: public Reducer {
    virtual void Reduce(ReduceInput * input) {
        // Iterate over all entries with the
        // same key and add the values
        int64 value = 0;
        while(! input - > done()) { value += StringToInt(input - > value()); input - > NextValue(); }// Emit sum for input->key()Emit(IntToString(value)); }}; REGISTER_REDUCER(Adder);int main(int argc, char ** argv) {
    ParseCommandLineFlags(argc, argv);
    MapReduceSpecification spec;
    // Store list of input files into "spec"
    for (int i = 1; i < argc; i++) {
        MapReduceInput * input = spec.add_input();
        input - > set_format("text");
        input - > set_filepattern(argv[i]);
        input - > set_mapper_class("WordCounter");
    }
    // Specify the output files:
    // /gfs/test/freq-00000-of-00100
    // /gfs/test/freq-00001-of-00100
    // ...
    MapReduceOutput * out = spec.output();
    out - > set_filebase("/gfs/test/freq");
    out - > set_num_tasks(100);
    out - > set_format("text");
    out - > set_reducer_class("Adder");
    // Optional: do partial sums within map
    // tasks to save network bandwidth
    out - > set_combiner_class("Adder");
    // Tuning parameters: use at most 2000
    // machines and 100 MB of memory per task
    spec.set_machines(2000);
    spec.set_map_megabytes(100);
    spec.set_reduce_megabytes(100);
    // Now run it
    MapReduceResult result;
    if(! MapReduce(spec, & result))abort(a);Done: 'result' Structure contains info
    // about counters, time taken, number of
    // machines used, etc.
    return 0;
}
Copy the code