Article by Foochane
Original link: foochane.cn/article/201…
Flume Log Collection framework Install and deploy the Flume running mechanism. Collect static files to HDFS. Collect dynamic log files to HDFS
Flume log collection framework
In a complete offline big data processing system, in addition to HDFS, MapReduce and Hive, which constitute the core of the analysis system, it also needs indispensable auxiliary systems such as data collection, result data export and task scheduling, and these auxiliary tools have convenient open source frameworks in the Hadoop ecosystem, as shown in the figure:
Introduction to the Flume
Flume is a distributed, reliable, and highly available system for collecting, aggregating, and transferring massive logs. Flume can collect files, socket packets, files, folders, kafka and other forms of source data, and output the collected data (sink) to many external storage systems such as HDFS, hbase, Hive, and Kafka.
General collection requirements can be realized through simple configuration of Flume.
Flume can be customized for special scenarios. Therefore, Flume can be used in most daily data collection scenarios.
2 Flume operating mechanism
The core role of Flume distributed system is agent. Flume acquisition system is formed by connecting each agent, and each agent is equivalent to a data transfer agent, with three internal components:
Source
: Collection component used for interconnecting with data sources to obtain dataSink
: sink assembly for going down one levelagent
Pass data or pass data to the final storage systemChannel
: Transport channel component used by thesource
Pass the data tosink
Data collected by single agent:
Series between multi-level agents:
3 Flume Installation and Deployment
Procedure 1 Download the installation package apache-Flume-1.9.0-bin.tar. gz and decompress it
2 Add JAVA_HOME to flume-env.sh in the conf folder
Export JAVA_HOME = / usr/local/bigdata/Java/jdk1.8.0 _211Copy the code
3 Add a collection solution configuration file based on collection requirements. The file name can be any
See the following example for details
4 start the flume
Test environment:
$ bin/flume/-ng agent -c conf/ -f ./dir-hdfs.conf -n agent1 -Dflume.root.logger=INFO,console
Copy the code
Command description:
-c
: Specifies the directory of the flume configuration file-f
: Specify your own configuration file, here ask under the current folderdir-hdfs.conf
-n
: Specify which to use in your own configuration fileagent
, the name defined in the corresponding configuration file.-Dflume.root.logger
: Prints logs on the console of typeINFO
, this is for testing only and will be printed to the log file later
Flume should be started in the background:
nohup bin/flume-ng agent -c ./conf -f ./dir-hdfs.conf -n agent1 1>/dev/null 2>&1 &
Copy the code
4 Collect static files and upload them to the HDFS
4.1 Collection Requirements
New files are constantly generated in a specific directory on a server. When new files are generated, the files need to be collected to the HDFS
4.2 Adding a Configuration File
Conf file in the installation directory, and then add the configuration information.
Obtain the Agent and name it agent1. The following configurations are followed by agent1. You can also change the value to other values, such as agt1.
Based on the requirements, the following three elements are defined
Data source component
Source — Monitor file directory: spooldir Spooldir has the following features:
- Monitor a directory and collect the contents of new files whenever they appear in the directory
- After collecting files, the Agent will automatically add a suffix:
COMPLETED
(Modifiable) - Files with the same file name cannot be repeated in the monitored directory
Sinking component
Sink: HDFS file system: HDFS sink
Modality components
Channel — you can use file channel or memory channel
Define the names of the three components
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
Configure the source component
agent1.sources.source1.type = spooldir
agent1.sources.source1.spoolDir = /root/log/
agent1.sources.source1.fileSuffix=.FINISHED
If a file exceeds this length, the file will be automatically cut off, resulting in data loss
agent1.sources.source1.deserializer.maxLineLength=5120
Configure the sink componentagent1.sinks.sink1.type = hdfs agent1.sinks.sink1.hdfs.path =hdfs://Master:9000/access_log/%y-%m-%d/%H-%M agent1.sinks.sink1.hdfs.filePrefix = app_log agent1.sinks.sink1.hdfs.fileSuffix = .log agent1.sinks.sink1.hdfs.batchSize= 100 agent1.sinks.sink1.hdfs.fileType = DataStream agent1.sinks.sink1.hdfs.writeFormat =Text# roll: Controls the switching rule for writing files
Cut by file size (bytes)
agent1.sinks.sink1.hdfs.rollSize = 512000
## Cut by event number
agent1.sinks.sink1.hdfs.rollCount = 1000000
Switch files by time interval
agent1.sinks.sink1.hdfs.rollInterval = 60
## Control the rules for generating directories
agent1.sinks.sink1.hdfs.round = true
agent1.sinks.sink1.hdfs.roundValue = 10
agent1.sinks.sink1.hdfs.roundUnit = minute
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
# Channel component configuration
agent1.channels.channel1.type = memory
Article # # event number
agent1.channels.channel1.capacity = 500000
Flume transaction control requires a cache capacity of 600 events
agent1.channels.channel1.transactionCapacity = 600
Bind the connection between source, channel, and sink
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
Copy the code
Channel parameter description:
capacity
: The largest that can be stored in the channel by defaultevent
The number oftrasactionCapacity
: Each time the maximum value can be fromsource
To get or sendsink
In theevent
The number ofkeep-alive
:event
Allowed time to add to or remove from a channel
4.3 start the flume
$ bin/flume/-ng agent -c conf/ -f dir-hdfs.conf -n agent1 -Dflume.root.logger=INFO,console
Copy the code
5 Collect dynamic log files and upload them to the HDFS
5.1 Collection Requirements
For example, service systems use Log4j to generate logs. As the log content increases, data added to the log file needs to be collected to the HDFS in real time
5.2 Configuration File
Configuration file name: tail-hdfs.conf Define the following three elements based on requirements:
- The collection source, i.e
source
— Monitoring file content update:exec
tail -F file
- The sinking target, i.e
Sink - HDFS
File system: HDFS Sink Source
andsink
The transfer channel betweenchannel
That can be usedfile channel
You can also use memorychannel
Configuration file contents:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/app_weichat_login.log
# Describe the sinkagent1.sinks.sink1.type = hdfs agent1.sinks.sink1.hdfs.path =hdfs://Master:9000/app_weichat_login_log/%y-%m-%d/%H-%M agent1.sinks.sink1.hdfs.filePrefix = weichat_log agent1.sinks.sink1.hdfs.fileSuffix = .dat agent1.sinks.sink1.hdfs.batchSize= 100 agent1.sinks.sink1.hdfs.fileType = DataStream agent1.sinks.sink1.hdfs.writeFormat =Text agent1.sinks.sink1.hdfs.rollSize = 100 agent1.sinks.sink1.hdfs.rollCount = 1000000 agent1.sinks.sink1.hdfs.rollInterval = 60 agent1.sinks.sink1.hdfs.round = true agent1.sinks.sink1.hdfs.roundValue = 1 agent1.sinks.sink1.hdfs.roundUnit = minute agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
Copy the code
5.3 start the flume
Start command:
bin/flume-ng agent -c conf -f conf/tail-hdfs.conf -n a1
Copy the code
6 Two agents are cascaded
Another node can be configured with an Avro source to relay data to external storage
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/log/access.log
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hdp-05
a1.sinks.k1.port = 4141
a1.sinks.k1.batch-size = 2
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
Copy the code
Receives data from avro port and sinks to HDFS
Collect the configuration file, avro-hdfs.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
The Avro component in ## Source is a receiver service
a1.sources.r1.type = avro
a1.sources.r1.bind = hdp-05
a1.sources.r1.port = 4141
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/taildata/%y-%m-%d/
a1.sinks.k1.hdfs.filePrefix = tail-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 24
a1.sinks.k1.hdfs.roundUnit = hour
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 50
a1.sinks.k1.hdfs.batchSize = 10
a1.sinks.k1.hdfs.useLocalTimeStamp = true
The generated file type is Sequencefile by default and DataStream is available
a1.sinks.k1.hdfs.fileType = DataStream
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
Copy the code