I. Setup of Hadoop cluster
You need at least three machines to build a Hadoop cluster locally, because then you need to build ZooKeeper, Kafka, etc., which involves a master election, so the smallest base besides 1 is 3. If you want to install a lot of Hive, hbase, Kylin/Presto/Druid, or solr/Elasticsearch as a distributed search engine, you will need to install a lot of memory. If you want to install a lot of hive, hbase, Kylin/Presto/Druid, or solr/Elasticsearch as a distributed search engine, you will need a lot of memory. It’s ok to give about 6-8 GB, the key is to give enough hard disk, if possible, 40 to 50 GB (clean up logs and useless installation package is also ok). If the inode is filled up later, the virtual machine may not start without disk space. There are remedies, but there’s no need to bother.
1. Xsync Cluster distribution script
(a) In the /home/xxx directory, create the bin directory, and in the bin xsync directory, create the vim xsync file
#! /bin/bash
#1 Obtain the number of input parameters. If no parameters are specified, exit
pcount=$#
if((pcount==0)); then
echo no args;
exit;
fi
#2 Obtain the file name
p1=$1
fname=`basename $p1`
echo fname=$fname
#3 Obtain the absolute path from the parent directory
pdir=`cd -P $(dirname $p1); pwd`
echo pdir=$pdir
#4 Obtain the name of the current user
user=`whoami`
#5 cycle
for((host=103; host<105; host++)); do
echo ------------------- hadoop$host --------------
rsync -rvl $pdir/$fname $user@hadoop$host:$pdir
done
Copy the code
(b) Modify the execution permission of script xsync
[xxx@hadoop102 bin]$ chmod 777 xsync
(c) Call script form: xsync file name
[xxx@hadoop102 bin]$ xsync /home/atguigu/bin
If xsync is not available globally after being placed in /home/xxx/bin, you can move it to /usr/local/bin
2. Configure the cluster
- Cluster Deployment Planning
2. Configure the cluster
(1) Core configuration file
Configure the core – site. The XML
[xxx@hadoop102 hadoop]$ vi core-site.xml
Write the following configuration in this file
<! </value> </property> <name>fs.defaultFS</name> <value> HDFS :// hadoop109:9000 </value> </property> <! /opt/module/hadoop-2.7.2/data/ TMP </value> </value> </property>Copy the code
(2) HDFS configuration file
Configure hadoop – env. Sh
[xxx@hadoop102 hadoop]$ vi hadoop-env.sh
Export JAVA_HOME = / opt/module/jdk1.8.0 _144
Configuration HDFS – site. XML
[xxx@hadoop102 hadoop]$ vi hdfs-site.xml
Write the following configuration in this file
<property> <name>dfs.replication</name> <value>3</value> </property> <! Host configuration - specifies the Hadoop auxiliary name node - > < property > < name > DFS. The namenode. Secondary. HTTP - address < / name > < value > hadoop104:50090 < / value > </property>Copy the code
(3) YARN configuration file
Configuration of yarn – env. Sh
[xxx@hadoop102 hadoop]$ vi yarn-env.sh
Export JAVA_HOME = / opt/module/jdk1.8.0 _144
Configuration of yarn – site. XML
[xxx@hadoop102 hadoop]$ vi yarn-site.xml
Add the following configuration to the file
<! Mapreduce_shuffle </value> </property> <name> mapreduce_shuffle</value> </property> <! - specifies the ResourceManager YARN address - > < property > < name > YARN. The ResourceManager. The hostname < / name > < value > hadoop103 value > < / </property>Copy the code
(4) MapReduce configuration file
Configuration mapred – env. Sh
[xxx@hadoop102 hadoop]$ vi mapred-env.sh
Export JAVA_HOME = / opt/module/jdk1.8.0 _144
Configuration mapred – site. XML
[xxx@hadoop102 hadoop]$ cp mapred-site.xml.template mapred-site.xml
[xxx@hadoop102 hadoop]$ vi mapred-site.xml
Add the following configuration to the file
<! -- Specify MR running on Yarn --> <property> <name>mapreduce.framework. Name </name> <value> Yarn </value> </property>Copy the code
3. Distribute the configured Hadoop configuration file on the cluster
[XXX @ hadoop102 hadoop] $xsync/opt/module/hadoop – 2.7.2 /
4. View the file distribution status
[XXX @ hadoop103 hadoop] $cat/opt/module/hadoop – 2.7.2 / etc/hadoop/core – site. XML
5. If the cluster is started for the first time, format NameNode (note: all data will be cleared after formatting).
[xxx@hadoop102 hadoop-2.7.2]$hadoop namenod-format
3.SSH login is not configured
(2) Generate public and private keys:
[xxx@hadoop102 .ssh]$ ssh-keygen -t rsa
Then hit (three enter) and two files id_rsa (private key) and id_rsa.pub (public key) are generated.
(3) Copy the public key to the target machine on which you want to log in without password
[xxx@hadoop102 .ssh]$ ssh-copy-id hadoop102
[xxx@hadoop102 .ssh]$ ssh-copy-id hadoop103
[xxx@hadoop102 .ssh]$ ssh-copy-id hadoop104
Note:
You also need to log in to hadoop102, hadoop103, and hadoop104 without password as user root on hadoop102.
You also need to configure atguigu account on Hadoop103 to log in to haDOop102, HaDOOP103, and HaDOop104 servers without password.
4. Cluster
- Configuration of slaves
The/opt/module/hadoop – 2.7.2 / etc/hadoop/slaves
[xxx@hadoop102 hadoop]$ vi slaves
Add the following to the file:
hadoop102
hadoop103
hadoop104
Copy the code
Note: No Spaces are allowed at the end of the content added to this file, and no empty lines are allowed in the file.
None Example Synchronize configuration files of all nodes
[atguigu@hadoop102 hadoop]$ xsync slaves
- Start the cluster
(1) If the cluster is started for the first time, format NameNode (before formatting, stop all NameNode and Datanode processes started last time, and then delete data and log data).
[xxx@hadoop102 hadoop-2.7.2]$bin/ HDFS namenod-format
(2) Start the HDFS
[XXX @ hadoop102 hadoop – 2.7.2] $sbin/start – DFS. Sh
[XXX @ hadoop102 hadoop – 2.7.2] $JPS
4166 NameNode
4482 Jps
4263 DataNode
[XXX @ hadoop103 hadoop – 2.7.2] $JPS
3218 DataNode
3288 Jps
[XXX @ hadoop104 hadoop – 2.7.2] $JPS
3221 DataNode
3283 SecondaryNameNode
3364 Jps
(3) Start YARN
[XXX @ hadoop103 hadoop – 2.7.2] $sbin/start – yarn. Sh
Note: If NameNode and ResourceManger are not on the same machine, start YARN on the machine where ResouceManager resides instead of NameNode.
(4) View SecondaryNameNode on the Web
(a) the browser input: http://hadoop104:50090/status.html
(b) View the SecondaryNameNode information,
5.### ** Summary of cluster start and stop modes
- Each service component starts/stops one by one
(1) Start and stop the HDFS component
hadoop-daemon.sh start / stop namenode / datanode / secondarynamenode
(2) Start/stop YARN
yarn-daemon.sh start / stop resourcemanager / nodemanager
- Each module is started and stopped separately (SSH is the prerequisite)
(1) Start or stop the HDFS as a whole
start-dfs.sh / stop-dfs.sh
(2) Start/stop YARN as a whole
start-yarn.sh / stop-yarn.sh
6. Synchronize cluster time
- Time server configuration (root user is required)
(1) Check whether NTP is installed
[root @ hadoop102 desktop] # RPM – qa | grep NTP (NTP) – 4.2.6 p5-10. El6. Centos. X86_64 fontpackages – filesystem – 1.41-1.1. El6. Noarch Ntpdate-4.2.6 P5-10.el6.centos.x86_64 (2) Modify the NTP configuration file [root@hadoop102 desktop]# vi /etc/ntp.conf Modify the following information
A) Modify 1 (authorizes all machines in network segment 192.168.1.0-192.168.1.255 to query and synchronize time from this machine)
**#** RESTRICT 192.168.1.0 mask 255.255.255.0 nomodify NOTrap specifies restrict 192.168.1.0 mask 255.255.255.0 nomodify NOTrapCopy the code
B) Modify 2 (cluster on LAN, not using other Internet time)
server 0.centos.pool.ntp.org iburst server 1.centos.pool.ntp.org iburst server 2.centos.pool.ntp.org iburst server 3.centos.pool.ntp.org iburst for **#** server 0.centos.pool.ntp.org iburst **#** server 1.centos.pool.ntp.org iburst **#** server 2.centos.pool.ntp.org iburst **#** server 3.centos.pool.ntp.org iburstCopy the code
C) Add 3 (when the node loses its network connection, it can still use the local time as a time server to provide time synchronization for other nodes in the cluster)
Server 127.127.1.0 fudge 127.127.1.0 stratum 10Copy the code
(3) Modify the /etc/sysconfig/ntpd file
[root@hadoop102 desktop]# vim /etc/sysconfig/ntpd
Add the following content (to synchronize hardware time with system time)
SYNC_HWCLOCK=yes
(4) Restart NTPD service
[root@hadoop102 desktop]# service NTPD status NTPD is stopped [root@hadoop102 desktop]# service NTPD start
Starting NTPD: [OK]
(5) Set the startup of the NTPD service
[root@hadoop102 desktop]# chkconfig NTPD on
- Other machine configurations (must be root)
(1) Configure synchronization with the time server every 10 minutes on other machines
[root@hadoop103 desktop]# crontab -e
Write scheduled tasks as follows:
*/10 * * * * /usr/sbin/ntpdate hadoop102
(2) Modify any machine time
[root@hadoop103 desktop]# date-s “2017-9-11 11:11:11”
(3) Check whether the machine is synchronized with the time server ten minutes later
[root@hadoop103 desktop]# date
Note: When testing, you can adjust 10 minutes to 1 minute to save time.
Ii. HDFS architecture and high availability
1. An HDFS
Added: HDFS file block size
2. HDFS writing process
1) The client requests the NameNode to upload files through the Distributed FileSystem module. The NameNode checks whether the target file exists and whether the parent directory exists.
2) NameNode returns whether it can be uploaded.
3) Which DataNode servers the client requests the first Block to be uploaded to.
4) NameNode returns three Datanodes, dn1, dn2, and DN3.
5) The client requests DN1 to upload data through FSDataOutputStream module, dN1 will continue to call DN2 after receiving the request, and then DN2 will call DN3 to complete the establishment of this communication pipeline.
6) DN1, DN2, and DN3 respond to clients step by step.
7) The client starts to upload the first Block (read data from disk and put it into a local memory cache) to DN1. In the unit of Packet, dN1 receives a Packet to DN2 and DN2 to DN3. Dn1 puts a reply queue for each packet transmitted.
8) When the transfer of one Block is complete, the client requests NameNode again to upload the second Block to the server.
Repeat steps 3-7.
3. Network topology – Node distance calculation
Node distance: Total distance between two nodes to the nearest common ancestor.
For example, suppose there is node N1 in rack R1 of data center D1. This object can be represented as /d1/ R1 /n1. Using this notation, four distance descriptions are given here
4. Rack Awareness (Replica storage node selection)
1. Official IP address
Rack perception that hadoop.apache.org/docs/r2.7.2…
5.HDFS data read process
1) The client uses the Distributed FileSystem to request the NameNode to download files. The NameNode queries metadata and finds the address of the DataNode where the file block resides.
2) Select a DataNode (nearby principle, and then random) server and request to read data.
3) The DataNode starts to transfer data to the client (read the data input stream from the disk and check it in Packet).
4) The client receives the Packet, first in the local cache, and then writes to the target file.
6. The NameNode and SecondaryNameNode
- Phase 1: NameNode starts
(1) After starting NameNode formatting for the first time, create Fsimage and Edits files. If it is not the first startup, load the edit log and image file directly to the memory. (2) Client requests for adding, deleting and modifying metadata. (3) NameNode records operation logs and updates rolling logs. (4) NameNode adds, deletes and changes data in memory.
2. The second stage: The Secondary NameNode works. (1) The Secondary NameNode asks the NameNode whether to CheckPoint. Return the result of whether NameNode check directly. (2) Secondary NameNode requests to execute CheckPoint. (3) NameNode scrolls the Edits log that is being written. (4) Copy the edit log and image file before scrolling to Secondary NameNode. (5) Secondary NameNode loads the editing log and image file to the memory and merges them. (6) Generate a new image file fsimage.chkpoint. (7) Copy fsimage.chkpoint to NameNode. (8) NameNode renames fsimage.chkpoint to fsimage.
Fsimage: NameNode memory metadata serialized to form a file.
Edits: Records each step of updating metadata information by the client (metadata can be calculated by Edits).
When the NameNode starts, it first scrolls the Edits and generates an empty edits.inprogress, and then loads the Edits and Fsimage into memory, where NameNode memory holds the latest metadata information. The Client starts sending requests to the NameNode to add, delete, or modify metadata. These requests are first recorded in edits.inProgress (queries for metadata are not recorded in edits because queries do not change metadata). If the NameNode fails, After the restart, metadata information is read from the Edits. NameNode then adds, deletes, and modifies the metadata in memory.
As more and more operations are recorded in Edits, the Edits file will become larger, making NameNode slow to start loading Edits. Therefore, merging Edits and fsimages is required. Follow the steps in Edits to form a new Fsimage. The SecondaryNameNode helps the NameNode merge Edits and fsimages.
The SecondaryNameNode first asks the NameNode whether it wants to CheckPoint (one of two conditions must be met to trigger the CheckPoint, the timing time and the Edits are full). Return the result of whether NameNode check directly. The SecondaryNameNode performs the CheckPoint operation by first asking the NameNode to scroll the Edits and generate an empty edits.inprogress. The purpose of the scroll is to mark the Edits. All new operations are then written to edits.inprogress, and the other unmerged edits and fsimages are copied to the local SecondaryNameNode. The copied edits and fsimages are then loaded into memory for merge. Generate fsimage.chkPoint, copy fsimage.chkpoint to NameNode, rename it to fsimage, and replace the original fsimage. When the NameNode starts, it only needs to load the previously unmerged Edits and Fsimage, because the metadata information in the merged Edits is already recorded in the Fsimage
Fsimage and Edits concepts
7. Cluster security mode
8. Working mechanism of DataNode
1) A data block is stored on the disk in the form of a file on the DataNode. There are two files, one is data itself, the other is metadata, including the length of the data block, the checksum of the block data, and the timestamp.
2) DataNode registers with NameNode after it starts. After passing, DataNode reports all block information to NameNode periodically (within 1 hour).
3) The heartbeat occurs every 3 seconds. The heartbeat returns a command from NameNode to the DataNode, such as replicating a data block to another machine or deleting a data block. If no heartbeat message is received from a DataNode within 10 minutes, the node is considered unavailable.
4) Some machines can be added and quit safely during the cluster operation.
1) When the DataNode reads a Block, it calculates the CheckSum.
2) If the calculated CheckSum is different from the value when the Block was created, the Block is damaged.
3) The Client reads blocks from other Datanodes.
4) DataNode periodically verifies CheckSum after its file is created.
9. Working mechanism of HDFS-HA
Eliminate single point of failure with dual Namenodes
- The way metadata is managed needs to change
Each store a copy of metadata in memory;
Only NameNode nodes in the Active state can write Edits logs.
Both Namenodes can read Edits;
Shared Edits are managed in a shared storage (qJournal and NFS are two mainstream implementations);
- A status management function module is required
Each ZKFailover is responsible for monitoring its own Namenode node and using ZK to identify the status. When the status needs to be switched, ZkFailover is responsible for the switchover. The brain split phenomenon needs to be prevented when switching.
-
You must ensure that two Namenodes can log in to each other using SSH without a password
-
A Fence means that only one NameNode is providing services at a time
1) Fault detection: Each NameNode in the cluster maintains a persistent session in ZooKeeper. If the machine crashes, the ZooKeeper session is terminated and ZooKeeper notifies another NameNode that a failover is required.
2) Active NameNode selection: ZooKeeper provides a simple mechanism for uniquely selecting a node as active. If the current active NameNode crashes, another node may obtain a special exclusive lock from ZooKeeper to indicate that it should be an active NameNode.
ZKFC, another new component in automatic failover, is a client for ZooKeeper and also monitors and manages NameNode status. Each host running NameNode also runs a ZKFC process, ZKFC is responsible for:
1) Health monitoring: ZKFC uses a health check command to ping the NameNode on the same host periodically. As long as the NameNode returns to health status in time, ZKFC considers the node to be healthy. If the node crashes, freezes, or enters an unhealthy state, the health monitor identifies the node as unhealthy.
2) ZooKeeper session management: ZKFC keeps a session open in ZooKeeper when the local NameNode is healthy. If the local NameNode is active, ZKFC also maintains a special ZNode lock that uses ZooKeeper support for ephemeral nodes and is automatically removed if the session terminates.
3) ZooKeeper-based selection: If the local NameNode is healthy and ZKFC finds that no other node currently holds the ZNode lock, it will acquire the lock for itself. If successful, it has won the selection and is responsible for running the failover process to make its local NameNode Active. The failover process is similar to manual failover described earlier, first protecting the previously Active NameNode if necessary, and then converting the local NameNode to the Active state.
Principles of Yarn and Mapreduce
1.MapTask parallelism mechanism
Data Block: A Block is a physical partition of data in the HDFS. Data slicing: Data slicing merely slices the input logically and does not slice it on disk for storage.
2.MapMapReduce workflow
The process,The preceding process is the most complete MapReduce workflow, but the Shuffle process starts from step 7 to step 16. The detailed Shuffle process is as follows:
1) MapTask collects the kv pairs output by our map() method and puts them in the memory buffer
2) The local disk files are constantly overflowed from the memory buffer, which may overflow multiple files
3) Multiple overflow files will be merged into a large overflow file
4) The Partitioner is called to partition and sort by key during both the overflow procedure and the merge
5) ReduceTask according to its own partition number, to get the corresponding result partition data from each MapTask machine
6) ReduceTask will get the result files from different MapTasks of the same partition, and ReduceTask will merge these files again (merge sort)
7) After merging into a large file, the Shuffle process will be finished. Then enter the logical operation process of ReduceTask (take out key and value pairs one by one from the file, and call the user-defined Reduce () method).
The size of the buffer in Shuffle affects the execution efficiency of the MapReduce program. In principle, the larger the buffer is, the fewer disk I/OS are required and the faster the execution speed is
3. The mechanism of the Shuffle
(1) Read phase: MapTask parsed each key/value from the input InputSplit through the user-compiled RecordReader.
(2) Map phase: This node mainly gives the parsed key/value to the user to write a Map () function for processing, and generates a series of new key/value.
(3) Collect Phase: In the map() function written by the user, when the data processing is complete, outputCollector.collect () is usually called to output the results. Inside the function, it partitions the generated key/value (called Partitioner) and writes it to a ring memory buffer.
(4) Spill: When the ring buffer is full, MapReduce writes data to the local disk and generates a temporary file. Note that before data is written to the local disk, the data must be sorted locally and merged or compressed if necessary.
Overwrite phase details:
Step 1: Use the quick sort algorithm to sort the data in the cache by Partition number and then by key. In this way, the sorted data is aggregated by partition, and all data in the same partition is ordered by key.
Step 2: Write the data in each partition to the output/ spilln. out temporary file in the working directory of the task in ascending order based on the partition id (N indicates the current number of overwrites). If Combiner is configured, data in each partition is aggregated once before writing files.
Step 3: Write the metadata of the partition data to the in-memory index data structure SpillRecord, where the metadata of each partition includes the offset in the temporary file, the data size before compression, and the data size after compression. If the current memory index size exceeds 1MB, the memory index is written to the file output/ spilln.out. index.
(5) Combine phase: When all data processing is complete, MapTask merges all temporary files once to ensure that only one data file will be generated at last.
When all data is processed, MapTask merges all temporary files into a large file and saves it to file output/file.out and generates corresponding index file output/file.out.index.
During file merging, MapTask merges files by partition. For a partition, it takes multiple rounds of recursive merges. Merge io.sort.factor (10 by default) files each round, add the resulting files back to the list to be merged, sort the files, and repeat the process until you finally get a large file.
Having each MapTask end up generating only one data file avoids the overhead of random reads caused by opening a large number of files simultaneously and reading a large number of small files simultaneously.
4. ReduceTask mechanism
(1) Copy phase: ReduceTask copies a piece of data from each MapTask. If the size of a piece of data exceeds a certain threshold, the data is written to the disk. Otherwise, the data is directly stored in the memory.
(2) Merge phase: While remotely copying data, the ReduceTask starts two background threads to Merge files on memory and disk to prevent excessive memory usage or excessive files on disk.
(3) Sort stage: According to the MapReduce semantics, the input data of the reduce() function written by the user is a set of data aggregated by key. To bring together data with the same key, Hadoop uses a sortion-based strategy. Since each MapTask has implemented local sorting of its own processing results, ReduceTask only needs to perform a merge sort for all data.
(4) Reduce phase: The Reduce () function writes the calculation results to HDFS.
5.Yarn scheduling principle
YARN consists of ResourceManager, NodeManager, ApplicationMaster, and Container
Detailed description of the working mechanism(1) The MR program is submitted to the node where the client resides.
(2) The YarnRunner applies for an Application from ResourceManager.
(3) RM returns the resource path of the application to YarnRunner.
(4) The program submits the resources required for running to HDFS
(5) After the application resources are submitted, apply for running mrAppMaster.
(6) RM initializes the user’s request into a Task.
(7) One of the NodeManager receives the Task.
(8) The NodeManager creates Container and generates MRAppmaster.
(9) Container Copies resources from the HDFS to the local.
(10) MRAppmaster applies to RM for running MapTask resources.
(11) RM allocates the running MapTask task to the other two NodeManagers, and the other two NodeManagers receive the task and create the container respectively.
(12) MR starts scripts to the two received NodeManager sender programs, which respectively start MapTask, which sorts the data partition.
(13) MrAppMaster waits for the completion of all MapTasks, applies for containers from RM and runs ReduceTask.
(14) ReduceTask obtains data of the corresponding partition from MapTask.
(15) After the program is run, MR will apply to RM for cancellation of himself.
Yarn Job Submission process (1) Job submission Step 1: The Client calls the job.waitForCompletion method to submit a MapReduce job to the entire cluster.
Step 2: The Client applies for a job ID from RM.
Step 3: RM returns the submission path and job ID of the job resource to the Client.
Step 4: Client submits the JAR package, slice information and configuration file to the specified resource submission path.
Step 5: After submitting resources, the Client applies to RM for running MrAppMaster.
(2) Job initialization
Step 6: When RM receives the request from the Client, add the job to the capacity scheduler.
Step 7: An idle NM gets the Job.
Step 8: The NM creates the Container and generates the MRAppmaster.
Step 9: Download the resources submitted by the Client to the local.
(3) Task assignment
Step 10: MrAppMaster applies to RM to run multiple MapTask resources.
Step 11: RM assigns the running MapTask task to the other two NodeManagers, and the other two NodeManagers take the task and create the container respectively.
(4) Task running
Step 12: MR starts scripts to the two NodeManager transmitters that receive the task. The two NodeManagers respectively start MapTask, which sorts the data partition.
Step 13: MrAppMaster waits for all MapTasks to run, applies for a container from RM, and runs ReduceTask.
Step 14: ReduceTask obtains data of corresponding partition from MapTask.
Step 15: After the program runs, MR will apply to RM to log himself off.
(5) Progress and status update
YARN in the progress and status (including counter) is returned to the application manager, the client per second (through graphs. Client. Progressmonitor. Pollinterval) to the application manager request progress update, display to the user.
(6) Homework is completed
In addition to requesting the job progress from the application manager, the client checks that the job has completed by calling waitForCompletion() every 5 seconds. Time interval by mapreduce.client.com pletion. Pollinterval to set. After the job is completed, the application manager and Container clean up the working state. The job information is stored by the job history server for later user verification.
6. Resource scheduler
At present, there are three kinds of Hadoop job schedulers: FIFO, Capacity Scheduler and Fair Scheduler. The default resource Scheduler for Hadoop2.7.2 is Capacity Scheduler.
4. Mapreduce tuning
1). Merge small files. Before performing the MR task, merge small files. A large number of small files generate a large number of Map tasks, which increases the loading times of Map tasks. As the loading time is time-consuming, the MR speed is slow
2). In terms of IO transmission, data compression is adopted to reduce IO time and snappy and LZO compilers are installed
3) data skew
Small files can be optimized in the following ways:
(1) During data collection, small files or small batches of data are synthesized into large files and then uploaded to HDFS.
(2) Before service processing, use the MapReduce program on the HDFS to merge small files.
(3) In MapReduce processing, CombineTextInputFormat can be used to improve efficiency.
For the video, thank you