Learn more about Java basics


Learn about the Hadoop YARN architecture

This article starts with the MapReduce 1.x framework. This section describes the MapReduce 2.x/Yarn framework after MapReduce 1.x upgrade. Currently, the MapReduce 2. X /Yarn framework is used.

MapReduce 1.x

Key concepts of MapReduce 1.x

JobClient

The MapReduce program written by the user is submitted to JobTracker through the Client. In addition, users can view job running status through some interfaces provided by the Client. MapReduce programs are denoted by jobs inside Hadoop. For each Job, the Client will package the application program and parameter Configuration into Jar files and store them in HDFS through the Client class, and submit the path to JobTracker. JobTracker then creates each Task (that is, MapTask and ReduceTask) and distributes them to the various TaskTracker services for execution.

The detailed Job submission process of JobClient is as follows:

JobTracker

JobTracker is responsible for resource monitoring and job scheduling. JobTracker monitors the health status of all TaskTracker and Job tasks. If a task fails, JobTracker moves the task to another node. At the same time, JobTracker will track the execution progress of tasks, resource usage and other information, and inform the task scheduler of these information, and the scheduler will select appropriate tasks to use these resources when the resources are idle. In Hadoop, the task scheduler is a pluggable module that users can design according to their own needs.

The following quote www.aboutyun.com/thread-7778…

JobTracker does two things for assignment submission: one. Generate a Job for the Job; Accept the assignment. As we all know, the JobClient saves all the relevant information of the job to the System directory of JobTracker (HDFS of course). One of the biggest benefits of this is that the client does what it can do and also reduces the load of the server Side JobTracker. Let’s take a look at how JobTracker completes client job submission. Oh. When JobClient formally submits a Job to JobTracker, it sends it a JobId to change the Job. This is because all the information about the Job already exists in JobTracker’s system directory. JobTracker simply gets the Job directory based on JobId.

For the Job submission process, I will briefly introduce the following processes:

  1. Create JobInProgress for the Job

    The JobInProgress object records detailed Job configuration information and its execution. Specifically, it is the Map and Reduce tasks that the Job is decomposed. JobInProgress: JobInProgress: JobInProgress: JobInProgress: JobInProgress: JobInProgress: JobInProgress: JobInProgress: JobInProgress: JobInProgress: JobInProgress: JobInProgress: JobInProgress: JobInProgress: JobInProgress: Job./ the jobTracker jobid. XML, job. Jar – >The jobTracker/jobid. Jar); Second, create mapTask and reduceTask queues of JobStatus and Job to track Job status information.
  2. Checking whether the client has the permission to submit the Job JobTracker verifies that the client has the permission to submit the Job to QueueManager.
  3. Check whether the MapReduce cluster meets the memory requirements of jobs. Before submitting a Job, the client configures the memory requirements of the Job based on the actual application conditions. In addition, JobTracker limits the memory requirements of the Job to improve the throughput of the Job. JobTracker You need to check whether the memory requirements of the Job meet JobTracker Settings.

The above process has been completed and can be summarized as the following figure:

TaskTracker

The TaskTracker periodically reports the resource usage and task running progress of the node to JobTracker through the heartbeat mechanism, receives the commands sent by JobTracker, and performs corresponding operations, such as starting a new task or killing a task. TaskTracker allocates the amount of resources on this node to the same number of slots. Slot indicates computing resources (such as cpus and memory). The Hadoop scheduler allocates free slots on each TaskTracker to tasks before a Task can run. Slot is classified into Map slot and Reduce slot, which are used by Map Task and Reduce Task respectively. TaskTracker specifies the concurrency of tasks by the number of slots (configurable parameter).

Datanodes, NameNode, datanodes, datanodes, datanodes, etc. JobTracker corresponds to NameNode and TaskTracker corresponds to DataNode. Datanodes and Namenodes are for data storage, JobTracker and TaskTracker are for MapReduce execution.

MapTask and ReduceTask

Tasks are classified into Map tasks and Reduce tasks, and both tasks are started by TaskTracker. As we know from HDFS architecture, HDFS stores data in a fixed size block as the basic unit, while for MapReduce input, its processing unit is split. The default split/block relationship is 1:1. Split is a logical concept that contains only metadata information such as data start location, data length, node where the data is located, and so on. How it is divided is entirely up to the user. Note, however, that the number of splits determines the number of Map tasks, since each split is handed over to one Map Task. Split will be covered in more detail later in MapReduce execution.

MapReduce 1.x workflows

www.aboutyun.com/thread-1549…

The working process of a MapReduce job is as follows:

  1. JobClient’s submitJob() method implements the job submission process as follows: request a new job ID through JobTracker’s getNewJobId(); (2) Check the output description of the job (such as no specified output directory or output directory already exists, throw an exception); Computes the input shard of the job (throws an exception if the shard cannot be computed, for example if the input path does not exist); Copy the resources needed to run the job (such as job Jar files, configuration files, calculated input shards, and so on) to a directory named after the job ID. (There are multiple copies of the cluster available for TaskTracker to access.) (3) Call JobTracker’s submitJob() method to tell the job that it is ready to execute. (4)

  2. When JobTracker receives a call to its submitJob() method, it puts the call into an internal queue and dispatches it to job schedulers (such as fifO, Capacity, fairness, etc.) for scheduling; (5) Initialization is mainly to create an object that represents the running job — encapsulating the task and recording information so as to track the status and process of the task; (5) In order to create the task running list, the job scheduler first obtains the input fragment information calculated by JobClient from HDFS (6). Then create a MapTask for each shard and create a ReduceTask. (The Task ID is specified. Please distinguish the Job ID from the Task ID.)

  3. TaskTracker periodically communicates with JobTracker through its heartbeat to tell JobTracker whether it is alive and ready to run new tasks. (7) Before JobTracker can select a task for TaskTracker, the job scheduler must first select the job where the task is located. For MapTask and ReduceTask, TaskTracker has a fixed number of task slots (the exact number is determined by the number of TaskTracker cores and memory size). JobTracker will first fill up the MapTask of TaskTracker and then allocate ReduceTask to TaskTracker. For MapTrask, JobTracker will select a TaskTracker that is closest to its input shard. As for ReduceTask, there is no criteria for selecting which TaskTracker to use, since data localization cannot be taken into account.

  4. When TaskTracker assigns a task, it copies the Jar files of the job from HDFS to the TaskTracker’s file system (Jar localization is used to start the JVM), and TaskTracker copies all the files required by the application from the distributed cache to local disk. (8) TaskTracker creates a local working directory for the task and unextracts the contents of the Jar file into this folder; TaskTracker starts a new JVM (9) to run each Task (including MapTask and ReduceTask) so that the Client’s MapReduce does not affect the TaskTracker daemon (e.g., cause it to crash or hang, etc.); The child communicates with the parent through the Umbilical interface, and the Task’s child tells the parent its progress every few seconds until the Task is complete.

  5. Process and state updates A job and each of its tasks have status information, including the running status of the job or task, Map and Reduce progress, counter values, status messages or descriptions (which can be set by user code). How does this state information communicate with the Client as it changes during the job?

  • As tasks run, they are kept track of their progress (that is, the percentage of tasks completed). For MapTask, the task progress is the proportion of input that has been processed. For ReduceTask, the situation is a little more complicated, but the system still estimates the proportion of Reduce inputs that have been processed;
  • These messages are aggregated by the Child JVM — >TaskTracker — >JobTracker at regular intervals. JobTracker produces a global view of all running jobs and their task status. You can view it on the Web UI. Meanwhile, JobClient queries JobTracker every second to obtain the latest status and outputs the status to the console.
  1. Job completion When JobTracker receives notification that the last task of the job has completed, it sets the status of the job to “Success.” Then, when JobClient queries the status, it knows that the job completed successfully, and JobClient prints a message to inform the user and returns it from the runJob() method.

Disadvantages of MapReduce 1.x

With the growth of the scale of distributed system cluster and its workload, the problems of the original framework gradually emerged, the main problems are as follows:

  1. JobTracker is a map-Reduce centralized processing point and has a single point of failure.
  2. JobTracker completes too many tasks, resulting in excessive resource consumption. When a large number of Map-Reduce jobs are created, it will incur a large memory overhead and potentially increase the risk of JobTracker failure. This is also the industry generally concluded that map-Reduce of old Hadoop can only support the upper limit of 4000 node hosts.
  3. On TaskTracker, it is too simple to use the number of Map/Reduce tasks as the representation of resources, without considering the CPU/memory usage. If two tasks with large memory consumption are scheduled together, OOM is easy to appear.
  4. On TaskTracker, resources are forcibly divided into Map task slots and Reduce task slots. If only Map tasks or Reduce tasks are available in the system, resources are flooded.
  5. When the source code level is analyzed, it is found that the code is very difficult to read, often because a class does too many things, more than 3000 lines of code, resulting in the task of the class is not clear, increasing the difficulty of bug repair and version maintenance.
  6. From an operational perspective, the Current Hadoop MapReduce framework forces system-level updates whenever there are any significant or minor changes (such as bug fixes, performance improvements, and characterization). Worse, it forces every client of a distributed clustered system to update at the same time, regardless of the user’s preference. These updates can cause users to waste a lot of time trying to verify that their previous applications are working with the new Hadoop version.

Graphs 2 x/Yarn framework

Given the changing trends in the use of distributed systems in the industry and the long-term evolution of the Hadoop framework, MapReduce’s JobTracker/TaskTracker mechanism needs a major overhaul to fix its scalability, memory consumption, threading model, reliability, and performance flaws. Over the years, the Hadoop development team has made a number of bug fixes, but the cost of those fixes has increased recently, indicating that it is increasingly difficult to make changes to the original framework.

YARN is a resource management system in Hadoop 2.0. The basic design concept of YARN is to split JobTracker in MRv1 into two independent services: a global ResourceManager ResourceManager and a application-specific ApplicationMaster. ResourceManager manages and allocates resources for the entire system, and ApplicationMaster manages a single application. ApplicationMaster assumes some of the roles of the previous TaskTracker, and ResourceManager assumes the role of JobTracker.

YARN is a framework for resource management and task scheduling. It consists of ResourceManager (RM), NodeManager (NM), and ApplicationMaster (AM). ResourceManager monitors, allocates, and manages all resources. ApplicationMaster is responsible for scheduling and coordinating each specific application; NodeManager is responsible for the maintenance of each node. For all applications, RM has absolute control and the right to allocate resources. Each AM negotiates resources with the RM and communicates with the NodeManager to execute and monitor tasks. The relationship between several modules is shown in the figure.

Key concepts of MapReduce 2.x

ResourceManager(RM)

RM is a global resource manager that manages and allocates resources for the entire system. It mainly consists of two components: the Scheduler and the Applications Manager (AsM).

  • The scheduler

    The scheduler allocates resources in the system to running applications based on capacity and queue constraints, such as the allocation of resources to each queue and the execution of a maximum number of jobs.

    It is important to note that this scheduler is a “pure scheduler”. It is no longer responsible for any application-specific work, such as monitoring or tracking application execution status, or restarting failed tasks caused by application execution failure or hardware failure. This is done by the application-specific ApplicationMaster. The scheduler allocates resources based only on the Resource requirements of each application. The Resource allocation unit is represented by an abstract concept called Resource Container (Container). Container is a dynamic Resource allocation unit. It encapsulates resources such as memory, CPU, disk, and network to limit the amount of resources used by each task. In addition, the Scheduler is a pluggable component. Users can design new schedulers based on their own requirements. YARN provides a variety of directly available schedulers, such as Fair Scheduler and Capacity Scheduler.

  • Application manager

    The application manager is responsible for managing all applications throughout the system, including application submission, negotiating resources with the scheduler to start ApplicationMaster, monitoring ApplicationMaster’s health, and restarting it if it fails.

NodeManager (NM)

NM is the resource and task manager on each node. On the one hand, NM periodically reports resource usage on the node and the running status of each Container to RM. On the other hand, it receives and processes various requests from the AM to start/stop containers and so on.

ApplicationMaster (AM)

Applications submitted by users contain an AM, which applies for resources for applications and allocates them to internal tasks, monitors applications, tracks application execution status, and fails to restart tasks. ApplicationMaster is an application framework that coordinates resources with ResourceManager and works with NodeManager to execute and monitor tasks. MapReduce is a native-supported framework that can run MapReduce jobs on YARN. Many distributed applications have developed corresponding application frameworks to run tasks on YARN, such as Spark and Storm. If necessary, we can also write a YARN Application that complies with the specification.

Container

Container is a resource abstraction in YARN. It encapsulates multi-dimensional resources on a node, such as memory, CPU, disk, and network resources. When AM applies for resources from RM, the resources returned by RM are represented by Container. YARN assigns a Container to each task, and the task can use only the resources described in the Container. Each Container can run ApplicationMaster, Map, Reduce, or any program as required.

YARN application workflow

  1. Users submit applications to YARN, including AM programs, AM startup commands, command parameters, and user programs. In fact, you need to accurately describe all the information about the Unix process running ApplicationMaster. Submission work is usually done by YarnClient.

  2. RM assigns the application the first Container and communicates with the corresponding NM, asking it to start AM in this Container;

  3. AM to RM to register first, so that users can directly through the RM view application running state, running state through AMRMClientAsync. The CallbackHandler getProgress () method to transfer to RM. It will then request resources for each task and monitor its running status until the end of the run, i.e., repeat steps 4 to 7.

  4. AM applies for and obtains resources from RM through RPC in polling mode. Resources coordination through AMRMClientAsync asynchronous completion, encapsulation in AMRMClientAsync corresponding processing method. The CallbackHandler.

  5. – Once AM obtains resources, IT communicates with NM and asks it to start tasks. Typically, you need to specify a ContainerLaunchContext that provides the information needed to start the Container.

  6. After setting the running environment (including environment variables, JAR packages, and binary programs) for the task, NM writes the task startup command to a script and starts the task by running the script.

  7. Each task reports its status and progress to AM through an RPC protocol so that AM can master the running status of each task at any time and restart the task when the task fails. ApplicationMaster and NM communication through NMClientAsync object, a container of all events through NMClientAsync. CallbackHandler to deal with. Such as start, status update, stop, etc.

  8. After the application is complete, the AM logs out to RM and shuts itself down.