Write in front: through learning MapReduce client source code, further understand the relationship between split slice and block, as well as divide and conquer and the idea of computing to move to data.
First look at the waitForCompletion() method in the MapReduce entry:
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
if (state == JobState.DEFINE) {
// Commit tasks asynchronously
submit();
}
if (verbose) {
// Detects and prints job run logs, such as the percentage of mapTask runs and the progress.
monitorAndPrintJob();
} else {
// get the completion poll interval from the client.
// Get the completion polling interval from the client.
int completionPollIntervalMillis =
Job.getCompletionPollInterval(cluster.getConf());
while(! isComplete()) {try {
Thread.sleep(completionPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
}
return isSuccessful();
}
Copy the code
Tasks are submitted asynchronously through the Submit () method. Let’s look at the method implementation:
public void submit(a)
throws IOException, InterruptedException, ClassNotFoundException {
ensureState(JobState.DEFINE);
setUseNewAPI(); // Changes to the old and new apis
connect(); // The client needs to connect to the cluster master service connection
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run(a) throws IOException, InterruptedException,
ClassNotFoundException {
// Describes how a job is submitted and how a split list is verified
return submitter.submitJobInternal(Job.this, cluster); }}); state = JobState.RUNNING; LOG.info("The url to track the job: " + getTrackingURL());
}
Copy the code
The source code presented in this article is trunk-branched, so the new version of the API is used. Let’s focus on the submitJobInternal() method:
/**
* Internal method for submitting jobs to the system.
*
* <p>The job submission process involves:
* <ol>
* <li>
* Checking the input and output specifications of the job.
* </li>
* <li>
* Computing the {@link InputSplit}s for the job.
* </li>
* <li>
* Setup the requisite accounting information for the
* {@linkDistributedCache} of the job, if necessary. For * </li> * <li> * Copying the job's jar and configuration to the map-reduce system * directory on the distributed file-system. * </li> * <li> * Submitting the job to the <code>JobTracker</code> and optionally * monitoring it's status. * </li> * </ol></p> *@param job the configuration to submit
* @param cluster the handle to the Cluster
* @throws ClassNotFoundException
* @throws InterruptedException
* @throws IOException
*/
JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException {... Path submitJobDir =new Path(jobStagingArea, jobId.toString()); // Store jar packages, etc
JobStatus status = null;
try{...// Create the splits for the job
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
// The method to get the slice list
int maps = writeSplits(job, submitJobDir);
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info("number of splits:"+ maps); . }finally{... }}Copy the code
The submitJobInternal method does five things altogether:
- Determine the input and output details and check the input and output paths
- To calculate
split
listing - for
job
theDistributedCache
Set the necessary information - copy
job
runningjar
Package and configuration file information toHDFS
In theMapReduce
System path of - submit
job
toJobTracker
And start monitoring thisjob
The running status of.
We’ll focus on the split listing step, the writeSplits method:
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
JobConf jConf = (JobConf)job.getConfiguration();
int maps;
// Use the old and new Mapper classes. Hadoop2.x uses the new Mapper
if (jConf.getUseNewMapper()) {
maps = writeNewSplits(job, jobSubmitDir);
} else {
maps = writeOldSplits(jConf, jobSubmitDir);
}
return maps;
}
Copy the code
The Trunk splits used in this article use the new Mapper class, so the writeNewSplits method will have to be removed.
@SuppressWarnings("unchecked")
private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException { Configuration conf = job.getConfiguration(); InputFormat<? ,? > input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf);/ / the default TextInputFormat
List<InputSplit> splits = input.getSplits(job); // Get slices
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(array, new SplitComparator());
JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
jobSubmitDir.getFileSystem(conf), array);
return array.length;
}
Copy the code
Because the JobContext Job implementation class here is JobContextImpl, look for the getInputFormatClass method in that class. Can see InputFormat class can by graphs. Job.. InputFormat class parameter specified, and the default for TextInputFormat.
The getSplits method has no implementation in TextInputFormat and its parent FileInputFormat.
/**
* Generate the list of files and make them into FileSplits.
* @param job the job context
* @throws IOException
*/
public List<InputSplit> getSplits(JobContext job) throws IOException {
StopWatch sw = new StopWatch().start();
// Get the minimum split size
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
// Get the maximum split size
long maxSize = getMaxSplitSize(job);
// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
// FileStatus contains the metadata of the input file, including the file path information, length, and block size
List<FileStatus> files = listStatus(job);
booleanignoreDirs = ! getInputDirRecursive(job) && job.getConfiguration().getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS,false);
for (FileStatus file: files) {
if (ignoreDirs && file.isDirectory()) {
continue;
}
Path path = file.getPath(); / / path
long length = file.getLen(); // File size
if(length ! =0) { // The file has data
// Return the file block path for each file and the machine on which the file block resides (sorted by proximity to the client)
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length); // Get the entire file block starting from 0
}
// Determine whether the file can be split
if (isSplitable(job, path)) {
long blockSize = file.getBlockSize();
// Calculate the true size of split
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
// The number of bytes of a file that are not split, initially equal to the entire file
long bytesRemaining = length;
// When the remaining unsplit file contents are split, the split size is greater than the set SPLIT_SLOP
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
// Obtain the index of the file block according to the path of all file blocks and the start of the file block offset=length-bytesRemaining, that is, determine the current split allocated to the file block.
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
// makeSplit is called to evaluate the split list.
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
// Subtract the number of file bytes allocated to the split part. The rest go to the next cycle
bytesRemaining -= splitSize;
}
if(bytesRemaining ! =0) {
// Get the index of the block
intblkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); }}else { // not splitable
if (LOG.isDebugEnabled()) {
// Log only if the file is big enough to be splitted
if (length > Math.min(file.getBlockSize(), minSize)) {
LOG.debug("File is not splittable so no parallelization "
+ "is possible: " + file.getPath());
}
}
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts())); }}else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0])); }}// Save the number of input files for metrics/loadgen
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
}
return splits;
}
Copy the code
First determine the split size range, with a minimum of 1 and the maximum in the configuration minimum, which is obtained directly from the configuration:
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
Copy the code
FileStatus contains soft data about the file, such as file path information, file size, and block size.
List<FileStatus> files = listStatus(job);
Copy the code
It then iterates through the files and splits each file.
First get the block path of the non-empty file and the machine where it resides:
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length); // Get the entire file block starting from 0
}
Copy the code
Then compute the true size of split, the computeSplitSize method:
protected long computeSplitSize(long blockSize, long minSize,
long maxSize) {
// The default split size is block size
return Math.max(minSize, Math.min(maxSize, blockSize));
}
Copy the code
As you can see, the default size of a split is the size of a block. If you want split to be larger than block, you can increase minSize. Weak is less than block and can be reduced by maxSize.
Here’s how to compute a split list:
// The number of bytes of a file that are not split, initially equal to the entire file
long bytesRemaining = length;
// When the remaining unsplit file contents are split, the split size is greater than the set SPLIT_SLOP
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
// Obtain the index of the file block according to the path of all file blocks and the start of the file block offset=length-bytesRemaining, that is, determine the current split allocated to the file block.
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
// makeSplit is called to evaluate the split list.
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
// Subtract the number of file bytes allocated to the split part. The rest go to the next cycle
bytesRemaining -= splitSize;
}
Copy the code
At first, bytesRemaining is the entire file size. Then, when the ratio of bytesRemaining to the slice size is greater than 1.1, the list continues to be calculated. After each calculation, bytesRemaining subtracts the slice size, and the list of the next slice continues to be calculated from the remaining file.
The getBlockIndex method obtains the index of blkLocations[]. Blocks are obtained by using Length-BytesRemaining. For example, if length-BytesRemaining is 0 during the first calculation, it is equivalent to cutting blocks from offset0. The second split is equivalent to starting at offset from the first line of the second split. The getBlockIndex method is as follows:
protected int getBlockIndex(BlockLocation[] blkLocations,
long offset) {
// Determine which block is allocated to the file by the current split
// The starting position of the slice is contained in a block
for (int i = 0 ; i < blkLocations.length; i++) {
// is the offset inside this block?
if ((blkLocations[i].getOffset() <= offset) &&
(offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
return i;
}
}
BlockLocation last = blkLocations[blkLocations.length -1];
long fileLength = last.getOffset() + last.getLength() -1;
throw new IllegalArgumentException("Offset " + offset +
" is outside of file (0.." +
fileLength + ")");
}
Copy the code
This method determines whether the offset information of each file block is smaller than the file offset that needs to be split, and the file offset that needs to be split needs to be smaller than the entire size of the file block.
Finally, calculate the split list, which makeSplit method:
protected FileSplit makeSplit(Path file, long start, long length,
String[] hosts, String[] inMemoryHosts) {
return new FileSplit(file, start, length, hosts, inMemoryHosts);
}
Copy the code
You can see that a split contains the following important properties:
file
Which file to belong tooffset
The offsetlength
The block sizehosts
List of hosts where the file block resides
The hosts property determines which server the MapTask needs to move to execute the code, and the supported computation moves to the data.