Hadoop
There are three common questions in Hadoop: first, distributed storage (HDFS); Second: Distributed computing framework (MapReduce); Third: Resource Scheduling Framework (YARN).
This article was first published on the public account [Five minutes to learn big data], pay attention to the public account, get the latest big data technology articles
1. Describe the HDFS read and write process
Even though you’ve seen this question so many times, and the interviewer has asked it so many times, there are still many candidates who can’t say it in its full form, so keep it in mind. And many problems are derived from HDFS read and write processes.
HDFS write process:
-
Client The Client sends an upload request and communicates with the NameNode through RPC. The NameNode checks whether the user has the upload permission and whether the uploaded file has the same name in the corresponding directory of HDFS. If either of the two requirements is not met, an error message is displayed. Returns a message that can be uploaded to the client;
-
The Client splits files into 128 MB blocks by default. After the splits are complete, the Client sends a request to the NameNode to upload the first block to the server.
-
After receiving the request, NameNode allocates files according to the network topology, rack awareness and copy mechanism, and returns the addresses of available Datanodes.
Note: Hadoop is designed to ensure data security and efficiency. By default, three copies of data files are stored in HDFS. The storage policy is to store one copy locally, one copy on a node in the same rack, and one copy on a node in a different rack.
-
After receiving the address, the Client communicates with A node in the server address list such as A, which is essentially RPC call to establish pipeline. A will continue to call B after receiving the request, and B will call C to complete the whole pipeline establishment and return to the Client step by step.
-
The Client sends the first block (reading data from the disk and then storing it in the local memory cache) to A, in the unit of packet (64kb). When A receives A packet, it sends it to B, which then sends it to C. After A receives A packet, it puts it into an answer queue to wait for an answer.
-
Data is segmented into packet packets and transmitted successively on pipeline. In pipeline reverse transmission, ACK (correct response) is sent one by one. Finally, the pipelineACK is sent to the Client by the first DataNode A in pipeline.
-
When a block transfer is complete, the Client requests NameNode to upload the second block, and the NameNode re-selects three Datanodes for the Client.
HDFS read process:
-
The Client sends an RPC request to the NameNode. Request file block location;
-
After receiving the request, NameNode checks the user’s permissions and the existence of the file. If both are true, NameNode returns a partial or complete list of blocks as appropriate. For each block, NameNode returns the DataNode address containing the copy of the block. The returned DataNode addresses are sorted according to the cluster topology to determine the distance between Datanodes and clients. The two sorting rules are as follows: In the network topology, the datanodes closest to clients are ranked first. In the heartbeat mechanism, datanodes reported timeout are in the STALE state.
-
The Client selects the DataNode with the highest order to read the block. If the Client is a DataNode, the Client directly obtains data from the local (short-circuit read feature).
-
The bottom layer is essentially creating a Socket Stream (FSDataInputStream) and repeatedly calling the parent DataInputStream’s read method until the block is finished reading data.
-
After reading the list of blocks, if the file reading is not complete, the client will continue to fetch the next block list from NameNode.
-
After a block is read, checksum is checked. If an error occurs when DataNode reads a block, the client notifies NameNode and reads the block from the next DataNode that has a copy of the block.
-
The read method reads blocks in parallel, not in pieces; NameNode returns only the DataNode address of the block requested by the Client, not the data of the block requested.
-
When it finally reads, all blocks are merged into a complete final file;
2. When the HDFS reads files, what if a block is damaged suddenly
After reading DataNode blocks, the client performs checksum verification, that is, checks the local block read by the client and the original block read by the HDFS. If the check result is inconsistent, the client notifies NameNode and reads the block copy from the next DataNode.
3. When the HDFS is uploading files, if one DataNode fails suddenly, what should I do
The client uploads the file and establishes the pipeline pipeline with DataNode. The positive direction of the pipeline is the data packet sent by the client to DataNode, and the reverse direction of the pipeline is the ACK confirmation sent by DataNode to the client, that is, after receiving the data packet correctly, a confirmed received response is sent.
When the DataNode suddenly hangs up and the client cannot receive the ACK acknowledgement sent by the DataNode, the client notifies NameNode. NameNode checks that the copy of the block does not match the specified one. NameNode notifies DataNode to copy the copy. In addition, the DataNode that fails is taken offline and no longer allowed to upload or download files.
4. What operations does NameNode do when it is started
NameNode data is stored in memory and local disk, local disk data is stored in fsimage image file and edits edit log file.
First start NameNode:
-
Format the file system to generate fsimage image files;
-
Start the NameNode:
- Read the fsimage file and load the file contents into memory
- Wait for DataNade to register and send a block report
-
Start the DataNode:
- Registered to the NameNode
- Send the block report
- Check that the number of blocks recorded in fsimage is the same as the total number of blocks in the block Report
-
Perform operations on a file system (create directories, upload files, delete files, etc.) :
- In this case, information about file system changes is stored in the memory but not in the disk. In this case, information about file system changes is written to the EDITS file, which stores information about file system metadata changes.
Start NameNode for the second time:
-
Read fsimage and edits files;
-
Merge fsimage and edits files into a new fsimage file;
-
Create a new edits file with the contents initially empty;
-
Start the DataNode.
5. Do you know Secondary NameNode? What is its working mechanism
Secondary NameNode is used to merge NameNode edit logs into fsimage file.
Its specific working mechanism:
-
Secondary NameNode asks NameNode whether checkpoint is required. Return to NameNode directly if check result;
-
Secondary NameNode requests checkpoint execution.
-
NameNode scrolls edits logs while writing;
-
Copy the edit log and image file before scrolling to the Secondary NameNode.
-
The Secondary NameNode loads the edit log and image files into memory and merges them.
-
Generate a new image file fsimage.chkpoint.
-
Copy fsimage.chkpoint to NameNode;
-
NameNode renames fsimage. Chkpoint to fsimage;
Therefore, if the metadata in NameNode is lost, part of the metadata information can be recovered from the Secondary NameNode, but not all of it, because the Edits log that NameNode is writing has not been copied to the Secondary NameNode, which cannot be recovered.
6. The Secondary NameNode cannot restore all the data of NameNode, so how to ensure the security of NameNode data storage
The problem is NameNode high availability, NameNode HA.
If a NameNode has a single point of failure, configure two Namenodes. The configuration has two key points: first, ensure that the metadata information of the two Namenodes must be synchronized; second, when one NameNode fails, the other NameNode must be replaced immediately.
-
Metadata information synchronization in the HA solution is shared storage. Each time a file is written, the logs must be synchronized to the shared storage. Only after this step succeeds, the file is written successfully. Then, the backup node periodically synchronizes logs from the shared storage for an active/standby switchover.
-
Zookeeper is used to monitor the status of NameNode. The status of two NameNode nodes is stored in ZooKeeper, and the other two NameNode nodes have a process monitoring program respectively to read the status of NameNode in ZooKeeper. To check whether the current NameNode is down. If the ZKFC of the Standby NameNode finds that the Active NameNode has been suspended, the Standby NameNode will be forced to send a forced shutdown request to the original Active NameNode, and then the Standby NameNode will be set to Active.
Do you know how shared storage is implemented in HA?
Can be explained below: There are many NameNode shared storage solutions, such as Linux HA, VMware FT, QJM, etc. At present, the community has merged the QJM (Quorum Journal Manager) scheme implemented by Clouderea into the trunk of HDFS and implemented it as the default shared storage. Qjm-based shared storage systems are mainly used to save Editlogs, not FSImage files. The FSImage file is still on NameNode’s local disk. The basic idea of QJM shared storage comes from the Paxos algorithm, which uses JournalNode clusters consisting of multiple JournalNodes called JournalNodes to store editlogs. Each JournalNode holds the same copy of EditLog. Each time NameNode writes an EditLog, in addition to writing the EditLog to the local disk, it also sends a write request to each JournalNode in the JournalNode cluster in parallel. Writing editlogs to the JournalNode cluster is considered successful as long as most JournalNode nodes return success. If there are 2N+1 JournalNodes, a maximum of N JournalNode failures can be tolerated by most principles.
7. Will split brains occur in NameNode HA? How to solve the split brain
Assume that NameNode1 is Active and NameNode2 is Standby. If the ZKFailoverController process corresponding to NameNode1 is suspended at a certain time, the Zookeeper server considers NameNode1 to be suspended. Based on the active/standby switchover logic, NameNode2 replaces NameNode1 to enter the Active state. In this case, NameNode1 may still be in the Active state. In this case, NameNode1 and NameNode2 are both in the Active state and can provide services externally. This condition is called split brain.
Splintering can be disastrous for systems such as NameNode that require very high data consistency, where data can be corrupted and cannot be recovered. The ZooKeeper community’s solution to this problem is called fencing, which means to isolate the old Active NameNode so that it cannot provide services to the outside world.
When fencing is created, the following operations are performed:
-
First try calling the transitionToStandby method of the HAServiceProtocol RPC interface of the old Active NameNode to see if you can switch it to Standby.
-
If the transitionToStandby method call fails, the isolation measures predefined in the Hadoop configuration file are executed. Hadoop currently provides two main isolation measures, usually choosing sshfence:
- Sshfence: Log in to the target machine using SSH and run the fuser command to kill the process.
- Shellfence: Executes a user-defined shell script to isolate the corresponding process.
8. What is the harm of too many small files and how to avoid it
A large amount of HDFS metadata information on Hadoop is stored in NameNode memory. Therefore, too many small files will inevitably overwhelm NameNode memory.
Each metadata object is about 150 bytes, so if you have 10 million small files, each taking up a block, the NameNode needs about 2 gigabytes of space. If you store 100 million files, NameNode requires 20 GB of space.
The obvious solution to this problem is to merge small files, either by implementing a strategy of merging small files first at client upload time or by using Hadoop’s CombineFileInputFormat\
.
9. Please describe the HDFS organizational structure
-
Client: indicates the Client
-
Shred files. When uploading a file to the HDFS, the Client splits the file into blocks and stores the file
-
Interact with the NameNode to get the file location information
-
Interact with datanodes to read or write data
-
The Client provides commands to manage HDFS, such as starting and stopping HDFS, and accessing HDFS directories and contents
-
-
NameNode: NameNode, also called primary node, stores metadata information of data, not specific data
-
Manage the HDFS namespace
-
Manages Block mapping information
-
Configuring a Duplicate Policy
-
Process client read/write requests
-
-
DataNode: DataNode, also called secondary node. NameNode issues commands, and DataNode performs actual operations
-
Store actual blocks of data
-
Performs read/write operations on data blocks
-
-
Secondary NameNode: is not the hot standby of NameNode. When a NameNode dies, it cannot immediately replace the NameNode and provide services
-
Assist NameNode to share its workload
-
Periodically merge Fsimage and Edits and push to NameNode
-
In an emergency, you can assist in recovering NameNode
-
10. Please explain the working mechanism of Map Task in MR
Brief overview:
InputFile is cut into multiple split files through split. The content is read to Map (a method of processing logic written by itself) by line through Record. After the data is processed by Map, it is handed to OutputCollect. The result key is partitioned (using the hashPartitioner by default) and then written to a buffer. Each Map task has an in-memory buffer (a circular buffer) that holds the output of the map. When the buffer is nearly full, the buffer needs to be overwritten to disk as a temporary file. After the entire Map task is complete, merge all temporary files generated by the MapTask to generate the final output file and wait for the Reduce Task to pull the file.
Detailed steps:
-
The read data component InputFormat (default TextInputFormat) splits the files in the input directory into blocks based on the getSplits method. The number of blocks corresponds to the number of MapTasks that start.
-
After the input file is split into blocks, the RecordReader object (LineRecordReader by default) reads a line of data with \n as separator, returns <key, value>, key represents the offset value of the first character of each line. Value indicates the line of text.
-
Read the block returns <key,value>, goes into the user’s own inherited Mapper class, executes the user’s rewrite of the map function, RecordReader reads a line here called once.
-
After the Mapper logic is complete, each result of the Mapper is collected by using context.write. In Collect, it is partitioned first and HashPartitioner is used by default.
-
Next, the data is written to memory, which is called the ring buffer (default: 100M). The buffer is used to collect Mapper results in batches and reduce the impact of disk IO. Our Key/Value pair and Partition results are written to the buffer. Of course, both the Key and Value values are serialized into byte arrays before writing.
-
When the data in the ring buffer reaches the overwrite ratio column (0.8 by default), which is 80MB, the overwrite thread starts and needs to Sort the keys in the 80MB space. Sorting is the default behavior of the MapReduce model, and the sorting here is also done on serialized bytes.
-
Merge overwrite files. Each overwrite generates a temporary file on the disk (check whether Combiner exists before writing). If the output result of Mapper is very large and multiple such overwrites occur, multiple temporary files will exist on the disk. After the entire data processing is complete, Merge temporary files in the disk. Because only one final file is written to the disk, an index file is provided for this file to record the offset of each Reduce data.
11. Please describe the working mechanism of Reduce Task in MR
Brief description:
Reduce is divided into three phases: Copy, sort, and Reduce. The first two phases are the most important.
The copy phase contains an eventFetcher to fetch the completed map list, and the Fetcher thread to copy the data. During this process, two merge threads, inMemoryMerger and onDiskMerger, are launched. Merge data in the memory to disks and data in disks. After data copy is complete, the copy phase is complete.
The sort phase mainly performs finalMerge operation. The sort phase is pure sort phase. After that, the reduce phase is called and the user-defined Reduce function is used for processing.
Detailed steps:
-
Copy phase: Simply pull data. The Reduce process starts some data copy threads (Fetcher) and requests MapTask to obtain its own files through HTTP. The partition of a Map task identifies which Reduce task each Map task belongs to. The default reduce task identifier starts from 0.
-
Merge phase: While copying data remotely, ReduceTask starts two background threads to Merge files in memory and on disk to prevent excessive memory usage or file overload on disk.
Merge comes in three forms: memory to memory; Memory to disk; Disk to disk. The first form is not enabled by default. When the amount of data in the memory reaches a certain threshold, merge the memory to disks directly. Similar to map, this is also a write overflow process. In this process, if you set Combiner, it will also be enabled, and many write overflow files will be generated on disk. The memory-to-disk merge runs until there is no map-side data, and then a third disk-to-disk merge is initiated to generate the final file.
-
Merge sort: After combining scattered data into one large data, the combined data is sorted again.
-
Invoke the Reduce method for sorted key-value pairs: Invoke the Reduce method once for those key-value pairs with equal keys. Zero or more key-value pairs are generated each time. These output key-value pairs are written to the HDFS file.
12. Say something about the Shuffle phase in MR
The shuffle phase consists of four steps: Partition, sort, protocol, and group. The first three steps are completed in the Map phase, and the last step is completed in the Reduce phase.
Shuffle is the core of Mapreduce and is distributed in the Map and Reduce phases of Mapreduce. Generally, the process from Map output to Reduce data as input is called shuffle.
-
Collect stage: Output the MapTask result to the ring buffer with the default size of 100M, which stores key/value and Partition information.
-
Spill phase: When the amount of data in the memory reaches a certain threshold, data is written to the local disk. Data is sorted before being written to the disk. If Combiner is configured, data with the same partition NUMBER and key is sorted.
-
MapTask phase Merge: Merge all overflow temporary files at once to ensure that a MapTask ends up producing only one intermediate data file.
-
Copy stage: ReduceTask Starts the Fetcher thread to Copy a Copy of its own data to the node that has completed MapTask, which will be stored in the memory buffer by default, and when the memory buffer reaches a certain threshold, the data will be written to disk.
-
Merge in the ReduceTask phase: During the ReduceTask remote replication of data, two threads will be started in the background to Merge data files from memory to the local.
-
Sort stage: sorting operation will be carried out when data is merged. As MapTask stage has carried out partial sorting of data, ReduceTask only needs to ensure the final overall effectiveness of Copy data.
The size of the buffer in Shuffle affects the execution efficiency of the MapReduce program. In principle, the larger the buffer is, the fewer I/O operations are performed on the disk, and the higher the execution speed is.
The size of the buffer can pass parameter adjustment, parameters: graphs. Task. IO. Sort. The MB of 100 MB by default
13. What is the data compression mechanism in Shuffle phase
In the Shuffle phase, a large number of data copies are performed. All data output from the Map phase is copied over the network and sent to the Reduce phase. During this process, a large number of NETWORK I/OS are involved.
Hadoop supports the following compression algorithms: Gzip, bzip2, LZO, LZ4, and Snappy. Google’s Snappy is the best compression and decompression rate for these algorithms. Made by Google, it’s a must.
14. When can a specification be used when writing MR
Combiner is a partial summary that cannot affect the running result of a task. It is applicable to summing classes but not to averaging. If the input parameter type of Reduce is the same as the output parameter type, the reduced class can use the Reduce class.
15. How much do you know about the YARN cluster architecture and working principle
The basic design concept of YARN is to split JobTracker in MapReduce V1 into two independent services, ResourceManager and ApplicationMaster.
ResourceManager manages and allocates resources for the entire system, and ApplicationMaster manages individual applications.
- ResourceManager: RM is a global ResourceManager that manages and allocates resources in the entire system. ResourceManager consists of two parts: Scheduler and Application Manager.
Scheduler according to the capacity, queue, such as restrictive conditions, the system of the allocation of resources to the running application, in under the premise of guarantee capacity, fairness and service level, optimize the cluster resource utilization, and let all resources by making full use of the application manager is responsible for managing all of the application of the system as a whole, This includes submitting the application, negotiating resources with the scheduler to start ApplicationMaster, monitoring ApplicationMaster’s health, and restarting it if it fails.
- ApplicationMaster:
An application submitted by the user corresponds to an ApplicationMaster, which has the following main functions:
- Negotiates with the RM scheduler to obtain resources. Resources are represented by Containers. - Further assign the resulting tasks to internal tasks. - Communicates with NM to start/stop tasks. - Monitors the status of all internal tasks and re-applies for resources to restart the task if the task fails.Copy the code
-
NodeManager: NodeManager is a resource and task manager on each node. On the one hand, NodeManager periodically reports resource usage on the node and the running status of each Container to RM. On the other hand, he receives and processes Container start and stop requests from the AM.
-
The Container:
Container is a resource abstraction in YARN, encapsulating various resources. An application allocates a Container, and the application can only use the resources described in the Container. Different from the resource encapsulation of slots in MapReduceV1, Container is a dynamic resource division unit that can make full use of resources.
16. What is the task submission process of YARN
After JobClient submits an application to YARN, YARN runs the application in two stages: Start ApplicationMaster; The second stage is when the ApplicationMaster creates the application, allocates resources for it, and monitors the execution until completion. The specific steps are as follows:
-
The user submits an application to YARN and specifies ApplicationMaster, the command to start ApplicationMaster, and the user program.
-
RM assigns the first Container to the application and communicates with its NM counterpart, asking it to start the application ApplicationMaster in this Container.
-
ApplicationMaster registers with RM, splits into internal subtasks, allocates resources for each internal task, and monitors the execution of these tasks until completion.
-
AM applies for and obtains resources from RM in polling mode.
-
RM allocates resources for AM in Container format.
-
After the AM obtains the resource, it communicates with NM and asks NM to start the task.
-
NodeManager sets up the running environment for the task, writes the task startup command to a script, and starts the task by running the script.
-
Each task reports its status and progress to the AM so that the task can be restarted if the task fails.
-
After the application is complete, ApplicationMaster logs out of ResourceManager and shuts itself down.
17. Do you know the three resource scheduling models of YARN
Three schedulers can be selected in Yarn: FIFO Scheduler, Capacity Scheduler, and Fair Scheduler.
The Apache Hadoop uses the Capacity Scheduler scheduling mode by default. The CDH version uses Fair Scheduler scheduling by default
FIFO Scheduler (first come first served) :
FIFO Scheduler arranges applications into a queue according to the order of submission, which is a first-in, first-out queue. When allocating resources, the application at the top of the queue will be allocated resources first, and the next application will be allocated after the demand of the application at the top is satisfied, and so on.
FIFO Scheduler is the simplest and easiest to understand Scheduler and does not require any configuration, but it is not suitable for shared clusters. A large application may occupy all cluster resources, which causes other applications to be blocked. For example, if a small task is submitted and a large task occupies all cluster resources, the small task will remain blocked.
Capacity Scheduler:
For the Capacity scheduler, there is a dedicated queue for running small tasks, but setting up a dedicated queue for small tasks preemptively consumes cluster resources, resulting in the execution time of large tasks falling behind that of the FIFO scheduler.
Fair Scheduler:
In the Fair scheduler, we do not need to use system resources up front. The Fair scheduler dynamically adjusts system resources for all running jobs.
For example, when the first big job is submitted, only this job is running and it obtains all cluster resources. When the second small task is submitted, the Fair scheduler allocates half of the resources to the small task so that the two tasks share cluster resources equally.
Note that in the Fair scheduler, there is a delay between the second task submission and resource acquisition because it waits for the first task to release the occupied Container. Small tasks will also release their occupied resources after the completion of the execution, and large tasks get all the system resources. The end result is that the Fair scheduler achieves high resource utilization while ensuring that small tasks are completed on time.
Search wechat official account [Five Minutes to learn Big Data] to get the latest big data technology articles