“This is the third day of my participation in the November Gwen Challenge. See details of the event: The last Gwen Challenge 2021”.
A summary of HADOOP
1.1 What is Hadoop
- The concept of Hadoop
It’s an open source,reliableAnd can beextensionDistributed computing framework- Allows distributed processing of large data across clusters of computers using a simple programming model
- Scalable: Scales from a single server to thousands of computers, each providing local computing and storage
- Reliable: Does not rely on hardware to provide high availability, but detects and handles failures at the application layer to provide high availability services on top of a cluster of computers
- What can Hadoop do
- Build a large data warehouse
- Pb-level data storage, analysis, processing and statistics
- Search engine
- Log analysis
- Data mining
- Business intelligence
1.2 Hadoop Core Components
-
Hadoop is a cheap solution to a problem common to all search engines
- How to store the ever-increasing volume of Web pages: single-node V.S. distributed storage
- How to rank the ever-growing web: supercomputing V.S. distributed computing
- HDFS solves distributed storage problems
- MapReduce solves distributed computing problems
-
Hadoop Common: The Common utilities that support The other Hadoop modules.
-
HDFS :(distributed file system)
- Derived from Google’s GFS paper, published in October 2003
- HDFS is an open source implementation of GFS
- HDFS features: scalability, fault tolerance, and mass storage
- Divide files into blocks of a specified size and keep multiple copies on multiple machines
- Operations such as data sharding, multiple copies, and fault tolerance are transparent to the user
-
The following figure shows the storage diagram of multiple data blocks
-
For the file /users/sameerp/data/part-0, the number of copies is set to 2 and the BlockID stored is set to 1 and 3 respectively.
-
The two backups of Block1 are stored on DataNode0 and DataNode2 servers
-
The two backups of Block3 are stored on DataNode4 and DataNode6 servers
-
-
MapReduce
-
Distributed computing framework
-
Derived from Google’s MapReduce paper, published in December 2004
-
MapReduce is the open source implementation of GoogleMapReduce
-
MapReduce features include scalability, fault tolerance, and offline processing of massive data
-
-
YARN: resource scheduling system
-
YARN: Yet Another Resource Negotiator
-
Manages and schedules cluster resources
-
YARN features: scalability, fault tolerance, and unified resource scheduling of multiple frameworks
-
1.3 the advantage of Hadoop
- Highly reliable
- Data storage: Multiple copies of data blocks
- Data calculation: When a node crashes, job calculation is automatically rescheduled
- High scalability
- When storage/computing resources are insufficient, the machine can be linearly scaled horizontally
- A cluster can contain thousands of nodes
- Clusters can use cheap machines at low cost
- The Hadoop ecosystem matures
Distributed file system HDFS
2.1 HDFS Shell Operations
File system (FS)Shell commands should be called in the bin/ Hadoop FS format
-
ls
Hadoop Fs-ls hadoop Fs-ls
If it is a file, the file information is returned in the following format: File name < number of copies > File size Modified Date Modified Time Permission User ID Group ID If it is a directory, a list of its immediate child files is returned, as in Unix. Directory The following information is returned: Directory name
Date Modified Time Permission User ID Group ID Example: Hadoop fs – ls/user/hadoop/file1 / user/hadoop/file2 HDFS: / / host: port/user/hadoop/dir1-name nonexistentfile return values: Returns 0 on success, -1 on failure.
-
text
Hadoop fs-text hadoop fs-text
Output the source file to text format. The allowed formats are ZIP and TextRecordInputStream.
-
mv
Hadoop fs -mv URI [URI…
Move the file from the source path to the destination path. This command allows multiple source paths, in which case the destination path must be a directory. It is not allowed to move files between different file systems. Example:
- hadoop fs -mv /user/hadoop/file1 /user/hadoop/file2
- hadoop fs -mv hdfs://host:port/file1 hdfs://host:port/file2 hdfs://host:port/file3 hdfs://host:port/dir1
The return value:
Returns 0 on success, -1 on failure.
-
put
Hadoop fs -put… hadoop fs -put…
Copies one or more source paths from the local file system to the target file system. It also supports reading input from standard input and writing it to the target file system.
- hadoop fs -put localfile /user/hadoop/hadoopfile
- hadoop fs -put localfile1 localfile2 /user/hadoop/hadoopdir
- hadoop fs -put localfile hdfs://host:port/hadoop/hadoopfile
- Hadoop fs – put – HDFS: / / host: port/hadoop/hadoopfile read input from standard input.
The return value:
Returns 0 on success, -1 on failure.
-
rm
Hadoop fs -rm URI [URI…
Deletes the specified file. Delete only non-empty directories and files. Refer to the RMR command for recursive deletion. Example:
- hadoop fs -rm hdfs://host:port/file /user/hadoop/emptydir
The return value:
Returns 0 on success, -1 on failure.
2.2 HDFS Design Roadmap
-
Design idea of distributed file system:
-
Design objectives of HDFS
- Suitable for distributed file systems running on Commodity Hardware
- Highly fault tolerant systems suitable for deployment on inexpensive machines
- HDFS provides high-throughput data access and is suitable for applications on large-scale data sets
- Easy to expand, to provide users with good performance file storage services
2.3 the HDFS architecture
- 1 NameNode/NN(Master) with DataNode/DN(Slaves) (master-slave structure)
- A file is split into multiple blocks
- NameNode(NN)
- Responsible for responding to client requests
- Responsible for metadata management (file name, copy coefficient, Block storage DN)
- MetaData MetaData Describes data
- Monitor the Health status of datanodes. No report is received from Datanodes indicating that datanodes are dead for 10 minutes
- DataNode(DN)
- Data blocks corresponding to user files
- To send heartbeat message to NN regularly, report itself and all block information, health status
- Distributed clusters NameNode and DataNode are deployed on different machines
HDFS pros and cons
- advantages
- Data redundancy hardware fault tolerance
- Good for storing large files
- Processing streaming data
- Can be built on cheap machines
- disadvantages
- Low latency data access
- Small file storage
3. YARN&MapReduce
3.1 Resource Scheduling Framework YARN
3.1.1 What is YARN
- Yet Another Resource Negotiator
- Universal resource management system
- It provides unified resource management and scheduling for upper-layer applications, bringing great benefits to cluster utilization, resource management, and data sharing
3.1.2 Background of YARN Generation
-
Universal resource management system
- Hadoop Distributed data storage (data block, redundant storage)
- If multiple MapReduce jobs need to use the same HDFS data, resource scheduling management is required
- In hadoop1. x mode, MapReduce performs computing and manages cluster resources
-
Problems arising from the coupling of server cluster resource scheduling management and MapReduce execution
-
In the early days of Hadoop, when the technology was just Hadoop, this problem wasn’t obvious
-
With the development of big data technology, Spark Storm… All computing frameworks use server cluster resources
-
Without a common resource management system, data can only be provided separately for multiple clusters
- Low resource utilization and high o&M costs
-
Yarn (Yet Another Resource Negitiator) Another Resource manager
- Mesos big data resource management product
-
-
Different computing frameworks can share data in the same HDFS cluster and enjoy overall resource scheduling
3.1.3 YARN Architecture and Implementation process
- ResourceManager: RM Resource Manager There is only one RM that provides services at the same time in the whole cluster, which is responsible for the unified management of cluster resources and scheduling and processing client requests: Submit, kill Monitor our NM. Once a NM fails, the tasks running on this NM need to tell our AM how to process them
- NodeManager: multiple NM node managers manage and use node resources in the cluster. They report resource usage to RM periodically. They receive and process commands from RM: Start Container to process commands from AM
- ApplicationMaster: AM Each application has one: MR and Spark, which manage applications. Applications apply for core and memory resources from RM and allocate them to internal tasks. Start/stop tasks. Tasks run in containers, and AM runs in Containers
- Container: A Container that encapsulates CPU and Memory resources. It is an abstraction of a task running environment
- Client: submits a job. Queries the running progress of a job and kills a job
- The Client submits a job request
- ResourceManager communicates with NodeManager, allocates the first Container for user programs based on cluster resources, and distributes ApplicationMaster to the Container
- Create ApplicationMaster in the started Container
- ApplicationMaster registers with ResourceManager to apply for resources
- After ApplicationMaster obtains the resources, it requests the corresponding NodeManager to start the Container, and the program to be executed is distributed to NodeManager
- After the Container is started, perform related tasks
- When Tast completes, results are returned to ApplicationMaster
- ApplicationMaster requests the kill from ResourceManager
3.2 Distributed processing framework MapReduce
3.2.1 What is MapReduce
- MapReduce Paper from Google (December 2004)
- Hadoop’s MapReduce is an open source implementation of Google papers
- Advantages of MapReduce: Massive data is processed offline and easy to develop
- Disadvantages of MapReduce: Real-time streaming computing
3.2.2 MapReduce programming model
-
The idea of divide and conquer in MapReduce
- Example of counting money: a pile of banknotes, the different denominations of how many
- A single point of strategy
- Count all the bills one by one and count how many there are in various denominations
- Divide and conquer strategy
- Each person got a pile of banknotes and counted how many there were in various denominations
- Aggregate, each person is responsible for counting one denomination
- Solve applications where data can be sliced for computation
- A single point of strategy
- Example of counting money: a pile of banknotes, the different denominations of how many
-
MapReduce programming consists of Map and Reduce stages
- Split the job into Map phase and Reduce phase
- Divide complex problems into “simple Tasks”.
- Reduce phase: Reduce Tasks Range: Reduce
-
Procedure for executing MapReduce programming
- Prepare MapReduce input data
- Prepare Mapper data
- Shuffle
- The Reduce processing
- Results output
-
Programming model
-
Learn from functional programming
-
The user only needs to implement two function interfaces:
-
Map(in_key, in_value) list
—>(out_key,intermediate_value) list)
-
Reduce(out_key, intermediate_value) list
—>out_value list
-
-
3.3 graphs application
3.3.1 Compile and Run MapReduce Code using MRJob
Mrjob introduction:
- Mrjob is the easiest way to use Python to develop programs that run on Hadoop
- The MRJob program can be tested locally or deployed to run on a Hadoop cluster
- If you don’t want to be a Hadoop expert, but need to write MapReduce code using Hadoop,mrJob is a great choice
Mrjob installation:
- Install using PIP
- pip install mrjob
Mrjob implements WordCount:
from mrjob.job import MRJob
class MRWordCount(MRJob) :
Enter each line from line
def mapper(self, _, line) :
for word in line.split():
yield word,1
# word will go to the same reduce
def reducer(self, word, counts) :
yield word, sum(counts)
if __name__ == '__main__':
MRWordCount.run()
Copy the code
3.3.2 Different modes of running MRJOB
-
Inline (-r inline) mode
The default (-r inline) can be omitted. The output file is > output-file or -o output-file. For example, the following two running modes are equivalent
python word_count.py -r inline input.txt > output.txt python word_count.py input.txt > output.txt
-
Local (-r local) mode
Used to simulate Hadoop debugging locally, the difference from inline mode is that multiple processes are started to execute each task. Such as:
python word_count.py -r local input.txt > output1.txt
-
Hadoop (-r Hadoop)
Used in Hadoop environment, supporting Hadoop scheduling control parameters, such as:
1) specify the Hadoop task scheduling priority (VERY_HIGH | HIGH), such as: – jobconf graphs. The job. The priority = VERY_HIGH.
2) Limit the number of Map and Reduce tasks, for example, –jobconf mapreduce.map.tasks=2 –jobconf mapreduce.reduce.tasks=5
python word_count.py -r hadoop hdfs:///test.txt -o hdfs:///output
3.3.3 Realizing topN statistics by MRJob (experiment)
The top n items that occur most frequently in statistics
from mrjob.job import MRJob
class MRWordCount(MRJob) :
Enter each line from line
def mapper(self, _, line) :
for word in line.split():
yield word,1
# word will go to the same reduce
def reducer(self, word, counts) :
yield word, sum(counts)
if __name__ == '__main__'Run () Run the WordCount code to open the command line, find a text document, and type the following command: python mr_word_count.py myfile.txt3.32.Different ways to run MRJOB1, inline (-r inline) mode features easy debugging, start a single process to simulate the execution status and result of the task, default (-r inline) can be omitted, output file > output-file or -o output-file, For example, the following two runs are the equivalent python word_count.py -r inlineinput.txt > output.txt python word_count.py input.txt > output.txt
2The -r local mode is used to simulate Hadoop debugging locally. The difference from inline mode is that multiple processes are started to execute each task. For example, python word_count.py -r localinput.txt > output1.txt
3Hadoop(-r Hadoop) is used in the Hadoop environment to support Hadoop scheduling control parameters, such as:1) specifies the Hadoop task scheduling priority (VERY_HIGH | HIGH), such as: - jobconf graphs. The job. The priority = VERY_HIGH.2Limit the number of Map and Reduce jobs, for example: --jobconf mapReduce.map.tasks=2 --jobconf mapreduce.reduce.tasks=5
python word_count.py -r hadoop hdfs:///test.txt -o hdfs:///output
3.33.Mrjob implements the first n data that occur most frequently in topN statistics (experimental) statisticsimport sys
from mrjob.job import MRJob,MRStep
import heapq
class TopNWords(MRJob) :
def mapper(self, _, line) :
ifline.strip() ! ="":
for word in line.strip().split():
yield word,1
# is between mapper and reducer, which is used to make statistics of data output by mapper temporarily
def combiner(self, word, counts) :
yield word,sum(counts)
def reducer_sum(self, word, counts) :
yield None, (sum(counts),word)
# Use HEAPQ to sort the data and take out the largest 2
def top_n_reducer(self,_,word_cnts) :
for cnt,word in heapq.nlargest(2,word_cnts):
yield word,cnt
The steps method is used to specify custom mapper, comnbiner and Reducer methods
def steps(self) :
# Passing in two steps defines the order of execution
return [
MRStep(mapper=self.mapper,
combiner=self.combiner,
reducer=self.reducer_sum),
MRStep(reducer=self.top_n_reducer)
]
def main() :
TopNWords.run()
if __name__=='__main__':
main()
Copy the code
3.4 MapReduce Principles
Single machine program calculation process
Input data –> read data –> process data –> write data –> output data
Hadoop Computing Process
Input data: indicates the input data
InputFormat: Divides and formats data
Map: Map the previously segmented data (classify the data and output (k, V) key-value pair data)
Shuffle&sort: Put the same data together and sort the data
Reduce: Computes the output map data using hash and calculates statistics for each map data
OutputFormat: Formats output data
Map: Data is processed
Buffer in memory: When 80% of the data is reached, the data is locked in memory and output to disk
Partitions: There are a lot of “small data” on disks that are merged and sorted.
Merge on disk: Merge all “small data”.
Reduce: Different Reduce tasks copy data from the corresponding tasks in the map
Merge operations are also performed in reduce
Graphs architecture
-
Graphs architecture 1. X
-
JobTracker: responsible for receiving job submissions from customers, running tasks on job nodes, and checking job status
-
TaskTracker: Tasks assigned by JobTracker are reported to JobTracker on a regular basis. There is always only one TaskTracker per job node
-
-
Graphs architecture 2. X
-
ResourceManager: Manages resources, submits tasks to the node where NodeManager resides, and checks the node status
-
NodeManager: Assigns tasks by ResourceManager and periodically reports the status to ResourceManager
-
Hadoop concept expansion
4.1Hadoop ecosystem
Hadoop ecosystem in a broad sense refers to the Hadoop ecosystem. Hadoop ecosystem is a very large concept in which Hadoop is the most important and basic part. Each subsystem of the ecosystem only solves a specific problem domain (or even narrower), rather than a unified all-powerful system, but a number of small and refined systems.
Hive: data warehouse
R: Data analysis
Mahout: Machine learning library
Pig: Scripting language, similar to Hive
Oozie: workflow engine that manages job execution sequence
Zookeeper: The user is unaware of the failure of the active node and selects the secondary node as the active node
Flume: log collection framework
Sqoop: data exchange framework, for example, data exchange between relational databases and HDFS
Hbase: queries massive data. Hbase is similar to a database in a distributed file system
Spark: Distributed computing framework based on memory
- spark core
- spark sql
- Spark Streaming quasi-real-time is not a standard streaming computing
- spark ML spark MLlib
Kafka: message queue
Storm: Distributed streaming computing framework python operates Storm
Flink: distributed streaming computing framework
Features of the Hadoop ecosystem
- Open source, active community
- It covers all aspects of big data processing
- Mature ecosphere
4.2HDFS Read and write Process & High availability
-
HDFS Read and write Process
-
The client sends a file write request to NameNode.
-
Check whether files exist and check permissions. If it passes, the operation is written directly to the EditLog and the output stream object is returned. Write ahead log WAL (write ahead log) logs all write operations performed by the latest HDFS client. If the subsequent real write operation fails, the operation will be written into the EditLog before the actual write operation, so there will still be records in the EditLog. We do not need to worry that the subsequent client cannot read the corresponding data block, because in step 5, DataNode will return a confirmation message after receiving the block. If the write fails, The sender does not receive the confirmation message and will retry until it succeeds.
-
The client splits files into 128MB blocks.
-
Client sends the list of allocated writable Datanodes returned by NameNode together with Data Data to the first nearest DataNode. After that, multiple Datanodes allocated by client and NameNode constitute the pipeline. The client writes data to the output stream object. Each time the client writes a packet to the first DataNode, the packet will be directly transmitted to the second and third DataNode in the pipeline. The DataNode. (Note: Not a block or an entire file is written and then distributed backwards.)
-
After each DataNode writes a block, a confirmation message is returned. (Note: confirmation information is not returned after each packet is written. Personally, since each chunk in packet carries verification information, there is no need to report each packet written, which is too slow. The correct way is to write a block, the check information is summarized and analyzed, can determine whether there is a block error.)
-
After writing the data, close the output stream.
-
Send completion signal to NameNode.
(Note: The timing of sending the completion signal depends on whether the cluster is strongly or ultimately consistent. Strongly consistent data needs to be written by all datanodes before reporting to NameNode. Final consistency: Any DataNode can report to NameNode independently after writing. HDFS generally emphasizes strong consistency.
-
-
How to implement high availability of HDFS
- Data store fault tolerance
- Disk media may be affected by environment or aging during storage, and data may be corrupted
- For data blocks stored on datanodes, calculate and store checksums.
- When data is read, the system recalculates the checksum of the data. If the checksum is incorrect, an exception is thrown and backup data is read from other Datanodes
- Disk fault tolerance
- The DataNode detects that a disk on the host is damaged
- Report all BlockID stored on this disk to NameNode
- NameNode checks which datanodes these data blocks are backed up on.
- Inform datanodes to copy data to other servers
- DataNode fault tolerance
- Communicate with the NameNode via heartbeat
- If the heartbeat is not sent due to timeout, NameNode considers the DataNode to be down
- The NameNode looks up what blocks of data exist on this DataNode and how the data is stored on other DataNode servers
- Copy data from other DataNode servers
- NameNode fault tolerance
- Primary/secondary hot standby Secondary Namenode
- Zookeeper works with master node elections
- Data store fault tolerance
4.3 Integration of big data products with Internet products
-
Task execution bottleneck in distributed system: High MapReduce delay several minutes Spark seconds
-
Internet Product Requirements
- Millisecond response (completed in less than 1 second)
- Big data is needed to achieve statistical analysis data mining associated recommendation user portrait
-
Big data Platform
- Integrate the differences between website applications and big data systems, import the data generated by applications into the big data system, and then export it to applications for use after processing and calculation
-
The data collection
- Data & logs generated by App/Web are synchronized to big data system
- Database synchronization :Sqoop Log synchronization :Flume log synchronization: Kafka
- The quality of data produced by different data sources can vary widely
- Maybe you can use the database directly
- Log crawler extensive cleaning, conversion processing
-
The data processing
- The core of big data storage and computing
- After data synchronization, import it to the HDFS
- MapReduce Hive Spark reads data and saves the calculation result to the HDFS
- MapReduce Hive Spark offline computing, HDFS offline storage
- Offline computing is usually done for aggregate data, such as all orders in history
- Offline computing features: Large data scale and long running time
- Flow calculation
- Taobao double 11 second order monitoring propaganda
- Storm(ms) SparkStreaming(SEC)
-
Data output and presentation
- HDFS needs to export the data to the application so that users can display ECharts in real time
- Taobao seller quantum Rubik’s cube
- Provide various statistical reports to the operations and decision level, data needs to be written to the database
- Many operation management personnel, after going to work, will log in the background data system
- HDFS needs to export the data to the application so that users can display ECharts in real time
-
Task scheduling system
- Put the above three parts together