1. HDFS

1. Working mechanism of NameNode and SecondaryNameNode

Phase 1: NameNode starts

(1) After NameNode formatting is started for the first time, create Fsimage and Edits files. If it is not the first boot, load edit logs and image files directly into memory.

(2) Requests for adding, deleting and modifying metadata by the client.

(3) NameNode records operation logs and updates rolling logs.

(4) NameNode adds, deletes, and modifies metadata in memory.

Stage 2: Secondary NameNode work

(1) The Secondary NameNode asks the NameNode whether it needs to CheckPoint. Check the result directly back to NameNode.

(2) The Secondary NameNode requests CheckPoint execution.

(3) NameNode scrolls the Edits log it is writing.

(4) Copy the edit log and image file before scrolling to the Secondary NameNode.

(5) The Secondary NameNode loads the edit log and image file into memory and merges it.

(6) Generate the image file fsimage.chkpoint.

(7) Copy fsimage.chkpoint to NameNode.

(8) NameNode rename fsimage. Chkpoint to fsimage.

2, NameNode high availability principle

Principle of –

In a typical HA cluster, each NameNode is an independent server. Only one NameNode is active at any time, and the others are standby. The active NameNode is responsible for all client operations, while the standby NameNode maintains the data state and is ready to switch at any time. For data synchronization, the two Namenodes communicate with each other through a set of separate processes called JournalNodes. Most JournalNodes processes are notified of any changes to the namespace of the active NameNode. The Standby NameNode has the ability to read changes in the JNs and always monitors the Edit log changes and applies the changes to its own named space. Standby ensures that the namespace state is fully synchronized in the event of a cluster failure. To ensure fast switchover, namenodes in the standby state need to know the location of all data blocks in the cluster. To do this, all datanodes must be configured with the addresses of the two Namenodes, and send block location information and heartbeat to both of them. It is critical for an HA cluster to ensure that only one NameNode is active at any one time. Otherwise, the data states of the two Namenodes diverge and data may be lost or incorrect results may be produced. To do this, JNs must ensure that only one NameNode can write to it at any one time.

– HDFS-HA core problem

(1) How to ensure the data consistency of the three Namenodes

A. simage: Let one NameNode generate data, let other machines NN synchronization

B. Edits: It is necessary to introduce a new module JournalNode to ensure data consistency of edtis files

(2) How to make only one NameNode active and all others standby

A. Manual assignment

B. Automatic allocation

(3) 2NN does not exist in HA architecture, who will do the job of merging fsimage and EDtis regularly

The standby NameNode does the job

(4) If there is a problem with NameNode, how do I get another NameNode to do the work

A. Manual failover

B. Automatic failover

  • Automatic failover mechanism

– HDFS- Cluster planning for automatic HA failover

3. Working mechanism of DataNode

(1) A data block is stored on the disk as a file on the DataNode, consisting of two files: the data itself and metadata, including the length of the data block, the checksum of the data block, and the timestamp.

(2) After DataNode starts, it registers with NameNode and periodically reports all block information to NameNode (1 hour).

(3) The heartbeat occurs every 3 seconds. The heartbeat returns a result with NameNode commands to the DataNode, such as copying block data to another machine or deleting a data block. If no heartbeat message is received from a DataNode within 10 minutes, the node is considered unavailable.

(4) Some machines can be added and exited safely in the cluster operation.

4. Data compression

– gzip compression

Application scenario: If each file is compressed within 130 MB (one block size), you can use the GZIP compression format. For example, a day’s or an hour’s worth of logs are compressed into a gzip file, which can be reached and sent from multiple gzip files when running MapReduce. Hive programs, Streaming programs, and Java-written MapReduce programs are exactly the same as text processing, with no modifications to the original program after compression.

Advantages: high compression rate, and compression/decompression speed is relatively fast; Hadoop itself supports processing gZIP files in applications as if they were text; Hadoop Native library; Most Linux systems come with the gzip command, which is easy to use.

Cons: No split support.

– after compression

Application scenario: When a map output of a MapReduce job has a large amount of data, it is used to compress intermediate data from Map to Reduce jobs. Or as the output of one MapReduce job and the input of another MapReduce job.

Advantages: high compression speed and reasonable compression rate; Hadoop Native library is supported.

Cons: No split support; The compression rate is lower than gZIP; Hadoop itself is not supported and needs to be installed. The corresponding command does not exist in Linux.

– lzo compression

Application scenario: A large text file can be considered if it is larger than 200M after compression. The larger the file is, the more obvious the advantages of LZO are.

Advantages: compression/decompression speed is relatively fast, reasonable compression rate; Support split, which is the most popular compression format in Hadoop; Hadoop Native library support; You can install the lzop command in Linux.

Disadvantages: Lower compression rate than Gzip; Hadoop itself is not supported and needs to be installed. In the application of lZO format files need to do some special processing (in order to support split need to build index, also need to specify the inputformat lZO format).

– bzip2 compression

Application scenario: Suitable for scenarios that require low speed but high compression ratio and can be used as the output format of MapReduce jobs. Or the output data is relatively large, the data after processing needs to be compressed and archived to reduce disk space and less data is used in the future; Or if you want to compress a single very large text file to reduce storage space, and you need to support split and compatibility with previous applications (that is, the application doesn’t need to be modified).

Advantages: Support split; High compression rate, higher than gZIP compression rate; Hadoop itself supports, but native is not supported; The bzip2 command comes with the Linux operating system and is easy to use.

Disadvantages: Slow compression/decompression.

5, HDFS storage mechanism write process

(1) The client requests NameNode to download the file through the Distributed FileSystem. NameNode queries metadata to locate the DataNode address where the file block resides. (2) Select a DataNode (nearest principle, and then random) server to request data reading.

(3) The DataNode starts to transmit data to the client (reads data input streams from disks and performs Packet verification).

(4) The client receives the Packet as a unit, which is first cached locally and then written to the target file.

Read process of HDFS storage mechanism

(1) The client uploads a file to NameNode through the Distributed FileSystem. NameNode checks whether the target file exists and whether the parent directory exists.

(2) NameNode returns whether it can be uploaded.

(3) The client requests which DataNode servers to upload the first Block to.

(4) NameNode returns three Datanodes, which are DN1, DN2 and DN3.

(5) The client requests DN1 to upload data through FSDataOutputStream module, dN1 will continue to call DN2 after receiving the request, and then DN2 will call DN3 to complete the establishment of the communication pipeline.

(6) DN1, DN2, DN3 step by step answer the client.

(7) The client starts to upload the first Block to DN1 (read data from disk and put it into a local memory cache). Packet is single bit. Dn1 receives a Packet and sends it to DN2 and DN2 to DN3. Once DN1 sends a packet, it puts the packet into a reply queue and waits for the reply.

(8) When a Block transfer is complete, the client requests NameNode to upload the second Block to the server. Repeat steps 3-7.

7. HDFS small file processing

– Disadvantages of HDFS small files

Each file in HDFS needs to create an index on NameNode, and the index size is about 150 bytes. In this way, when a large number of small files are created, a large number of index files will be generated. On the one hand, the memory space of NameNode will be occupied, and on the other hand, the index speed will be slowed down due to large index files.

**- HDFS small file solution **

Hadoop Archive, or HAR, is a file archiving tool that efficiently places small files into HDFS blocks. It can package multiple small files into a HAR file, which reduces namenode memory usage while reducing namenode memory usage. Transparent access to files is still allowed.

Sequence file Sequence file consists of a series of binary keys and values. If key is the small file name and value is the file content, a large number of small files can be combined into a large file.

(3) CombineFileInputFormat is used to combine multiple files into a single Split. In addition, it takes into account where the data is stored.

(4) Principle of enabling JVM reuse: ONE Map runs on one JVM. If reuse is enabled, the JVM will continue to run other maps after the Map runs on the JVM. Graphs. The job. The JVM numtasks), the job for a large number of small files, the 45% can reduce the running time.

2. MapReduce

1. MapReduce process

1. MapTask parses key/value from input InputSplit through user written RecordReader.

2. Give the parsed key/value to the user to write the map() function for processing, and generate a series of new keys/values.

3. Call outputCollector.collect () to output the result. Inside the function, it writes the generated key/value partition (calling the Partitioner) to a default 100MB ring memory buffer that holds half the data and half the index (metadata).

4. When the ring buffer is written to 80%, it overwrites to disk, and the new data is written in reverse.

5. Data partition, in which indexes of keys are quickly sorted in lexicographical order. If Combiner is configured, data in each partition is aggregated once.

6. Merge and sort the ordered temporary data files within the partition to ensure that a MapTask produces only one data file. In this case, you can perform the second Combiner merge and compression operation according to the configuration to improve network transmission efficiency.

7. ReduceTask Get corresponding result partitioning data from each MapTask machine according to its partition number. If the size of the data piece exceeds a certain threshold, the data piece is written to the disk. Otherwise, the data piece is directly stored in the memory.

8. While remote copying data, ReduceTask starts two background threads to merge and sort the files on memory and disk, so as to prevent excessive memory use or too many files on disk.

9. Group data by key to ensure that reduce method processes data with the same key at a time.

10. The aggregate operation result is written to the HDFS by the RecordWriter in the OutPutFormat component.

2, to optimize

– Map phase optimization

(1) Increase the size of ring buffer. From 100 meters to 200 meters

(2) Increase the overwrite ratio of ring buffer. From 80% to 90%

(3) Reduce merge times of overwrite files. Merge 10 files, 20 at a time

(4) On the premise that services are not affected, Combiner is used to merge data in advance to reduce I/ OS.

– Reduce phase optimization

(1) Set Map and Reduce numbers properly: Neither too few nor too many. If there is too little, the Task will wait and the processing time will be prolonged. Too many Map and Reduce tasks compete for resources, resulting in errors such as processing timeout.

(2) Set the coexistence of Map and Reduce: adjust the parameter of slowstart.completedmaps to make Map run to a certain extent, and Reduce Reduce waiting time.

(3) Avoid using Reduce, because Reduce will generate a large amount of network consumption when it is used to connect data sets.

(4) Increase the number of parallelism for each Reduce to obtain data from Map.

(5) Increase the size of memory for storing data on the Reduce end when the cluster performance is acceptable.

– IO transfer

Data compression is used to reduce network I/O time. Install the Snappy and LZOP compression encoders.

Compression:

(1) Map input mainly considers the size of data volume and slice. Bzip2 and LZO support slice. Note: LZO must create indexes in order to support slicing.

(2) The map output terminal mainly considers speed, including fast SNappy and LZO.

(3) Reduce output end mainly depends on specific requirements. For example, as the next Mr Input, slice should be considered, and gZIP with relatively high compression rate should be permanently saved.

3. Data skew

1, define,

When the MapReduce program is executed, most Reduce nodes are executed, but one or more Reduce nodes run slowly, resulting in a long processing time for the entire program. This is because the number of a key is much more than that of other keys (sometimes hundreds or thousands of times). The reduce node where the key resides processes much more data than other nodes. As a result, some nodes are running late.

2, the phenomenon of

1. Most tasks execute very fast, but some tasks execute extremely slowly. 2. An OOM (memory overflow) exception occurs on the Spark job that can be executed normally. Observe the exception stack, which is caused by the business code we wrote.

3. Solutions

1. If it can be processed in the MAP stage in advance, it is better to process it in the MAP stage first

Combine and MapJoin in map in advance to reduce the amount of data transferred. Adding combiner to a Mapper is equivalent to reducing in advance. That is, same keys in a Mapper are converged, reducing the amount of data transferred during shuffle and the amount of calculation on the Reducer end. This approach is not very effective if the keys that cause data skew are distributed in large numbers across different Mapper.

2. A large number of keys causing data skew are distributed in different Mapper

(1) local polymerization plus global polymerization. In the map stage for the first time, add random prefixes from 1 to N to those keys that cause data bias, so that the same keys will also be divided into multiple Reducer for local aggregation, and the number will be greatly reduced. In the second MapReduce, the random prefix of the key is removed to perform global aggregation. Idea: Second Mr, hash keys randomly to different reducer for the first time to achieve load balancing. The second time, remove the random prefix of the key and reduce the key. This method performs mapReduce twice, with slightly lower performance.

(2) increase the Reducer, improve parallelism: JobConf. SetNumReduceTasks (int)

(3) Self-defined partition: According to data distribution, self-defined hash function and evenly distributed keys to different Reducer

3. Increase reduce memory

This applies if there are very few unique values and very few values have very many recorded values (fewer than a few thousand unique values). In such cases, tuning is often done by means of hardware, and increasing JVM memory can significantly improve performance.

4. Add reduce

This applies to a large number of unique values, where some values of the field have far more records than others, but it is also less than one percent or one thousandth. As we know, in this case, the most likely result is that a large number of the same keys are partitioned to a partition, so that a Reduce performs a large amount of work. If we increase the number of Reduce, this situation will be relatively relieved. After all, there are more nodes to calculate, and even if the workload is uneven, it still needs to be Small lot.

Yarn part

1. Yarn working mechanism

2. Yarn scheduler

1. FIFO Scheduler

Single queue, first come, first served, based on the order in which jobs are submitted. Advantages: Easy to understand; Disadvantages: Does not support multiple queues and is rarely used in production environments.

2. Capacity scheduler

Resources are divided by queue. Each queue can set a certain proportion of the minimum guarantee and upper limit of resource usage. In addition, each user can set a certain upper limit of resource usage to prevent resource abuse. If the resources in one queue are available, the remaining resources can be temporarily shared with other queues.

The characteristics of

(1) The minimum guarantee and upper limit of resource usage can be set for each queue, and all applications submitted to the queue share these resources.

(2) If a queue has surplus resources, they can be temporarily shared with those queues in need of resources, and once the queue has a new application submitted, the resources released by other queues will be returned to the queue.

(3) Support multi-user shared cluster and multi-application running at the same time. To prevent a single application, user, or queue from monopolizing resources in a cluster, you can add multiple constraints (such as the number of tasks a single application can run at the same time, etc.).

(4) Each queue has a strict ACL list specifying its access users, and each user can specify which users are allowed to view the running status of their application or control the application (such as killing the application).

3. Fair scheduler

Fair scheduler can be applied for all of the “average” fair allocation of resources, of course, this is “fair” can be configured, known as the weight, can be in the allocation file for each queue set weight of the allocation of resources, and if not set, the default is 1 (due to the default weight is the same, so in the case of do not do the configuration, operation (queue) between the resources of the same). The goal of the fairness scheduler design is that all jobs get a fair amount of resources on a time scale. The gap between what a job should have at any given moment and what it actually has is called a “shortfall”, and the scheduler allocates resources to the job with the biggest shortfall first.

Features:

(1) Resource sharing is allowed, that is, when an APP is running, if no task is executed in other queues, other queues (queues not belonging to the APP) can be used, and the occupied queues can be released when other queues have APPS that need resources. All apps allocate resources from resource queues.

(2) When there are tasks in the queue, the queue will obtain at least the minimum resources. When queue resources are used up, other queues can use them.

(3) When the queue cannot meet the minimum resources, it can preempt from other queues.