Introduction to the
This document shows how to deploy the data warehouse and briefly shows how and when to use it.
The environment
software |
version |
Centos |
7.2 |
CDH |
5.15.0 |
Hadoop |
server |
Airflow |
1.10.9 |
Python |
2.7.5 (Delivered with the System) 3.7.0 (Installation Required) |
MySQL |
5.7.28 |
Redis |
4.0.14 |
Sqoop |
1.4.6 |
Module is installed
Airflow
Airflow is a platform for orchestrating, scheduling, and monitoring Workflow, open-source by Airbnb and now incubated at the Apache Software Foundation. Workflow is programmed as DAGs composed of tasks by Airflow, and the scheduler executes tasks on a set of workers with specified dependencies. In addition, Airflow provides extensive command-line tools and an easy-to-use user interface for users to view and operate, and provides a monitoring and alarm system.
An airflow cluster is not necessary in production, and the need for an airflow cluster to be fully configured adds additional operational and learning costs, so this article has deployed only Airflow on hosts in the CDH cluster.
Upgrade Python
Compile environment
yum install gcc-c++ gcc make cmake zlib-devel bzip2-devel openssl-devel ncurse-devel libffi-devel -yCopy the code
Compile and install Python3
Ready to Python – 3.7.0. Tar. Xz
# Confirm version
python -V
# decompressionThe tar Jxvf Python - 3.7.0. Tar. XzGo to the python3.7.0 directory
cdPython - 3.7.0Create a directory
mkdir -p /usr/local/python3
# configure (specify installation directory)
./configure --prefix=/usr/local/python3 --enable-optimizations
make && make installCopy the code
Modifying a System Path
Back up the original default Python path
mv /usr/bin/python /usr/bin/python.bak
Link Python to python3
ln -s /usr/local/ python3 / bin/python3.7 / usr/bin/python ln-s /usr/local/python3/bin/pip3 /usr/bin/pip
# upgrade PIP
pip install --upgrade pip
If the linked command fails, try using the following command
ln -sf /usr/local/ python3 / bin/python3.7 / usr/bin/python# Confirm version
python -V
pip -VCopy the code
Modifying the Yum Path
Modify the following two files
vi /usr/bin/yum
vi /usr/libexec/urlgrabber-ext-downCopy the code
The first line of the “#! The/usr/bin/python “instead
#! The/usr/bin/python2.7Copy the code
Redis installation
The installation
cd/ TMP tar ZXVF redis - 4.0.14. Tar. GzcdRedis-4.0.14 make make install PREFIX=/usr/local/redis
cd /usr/local/redis
mkdir /usr/localCp/redis/etc/TMP/redis - 4.0.14 / redis. Conf/usr /local/redis/etc/Copy the code
configuration
Edit the configuration file/usr/local/redis/etc/redis. Conf to modify the following Settings
bind 0.0.0.0
daemonize yes
protected-mode no
appendonly yes Change the persistence scheme from RDB to AOF
If the following 'appendfilename 'appendone.aof' is commented, then uncomment it
# change ----- as needed
requirepass foobared
# Write the next line
requirepass your-password
You can also change the port here if necessaryCopy the code
Start the
Add Redis to boot
vi /etc/rc.local
# Add content
/usr/local/redis/bin/redis-server /usr/local/redis/etc/redis.conf
# start redis
/usr/local/redis/bin/redis-server /usr/local/redis/etc/redis.confCopy the code
check
Port and process lookup
netstat -tunpl | grep 6379
ps -aux | grep redis
# test
redis-cliCopy the code
Airflow installation
Creating a Linux User
# Modify as needed
adduser airflow
passwd airflow
# Enter a custom password
# change the user's permission so that the user does not need a password to use sudo
vi /etc/sudoers
root ALL=(ALL) ALL
airflow ALL=(ALL) NOPASSWD:ALL Copy the code
Modify the configuration
vi /etc/profile
export AIRFLOW_HOME=/home/airflow # Modify as needed
export PYTHON_HOME=/usr/local/python3 # Modify according to the actual situation
export SLUGIFY_USES_TEXT_UNIDECODE=yes # Temporary Settings
export PATH=$PATH:$PYTHON_HOME/bin
source /etc/profileCopy the code
The installation
pip install --upgrade setuptools -i http://pypi.douban.com/simple/ --trusted-host=pypi.douban.com
cd /tmp
wget http://repo.mysql.com/mysql57-community-release-el7-8.noarch.rpm
rpm -ivh mysql57-community-release-el7-8.noarch.rpm
yum install mysql-deve
pip install apache-airflow -i http://pypi.douban.com/simple/ --trusted-host=pypi.douban.coCopy the code
Airflow will be installed into a third party package under Python at ${PYTHON_HOME}/lib/python3.7/site-packages/airflow
pip show apache-airflowCopy the code
Continue with the installation
pip install apache-airflow[mysql] -i http://pypi.douban.com/simple/ --trusted-host=pypi.douban.com
pip install apache-airflow[celery] -i http://pypi.douban.com/simple/ --trusted-host=pypi.douban.com
pip install apache-airflow[redis] -i http://pypi.douban.com/simple/ --trusted-host=pypi.douban.com
# test
airflow versionCopy the code
MySQL
By default, the initial MySQL setup is completed when CDH is deployed.
mysql -u root -p
Create database for Airflow use, add user and authorize
CREATE DATABASE airflow CHARACTER SET utf8 COLLATE utf8_unicode_ci;
# Change password strength
set global validate_password_policy=0;
set global validate_password_length=4;
CREATE USER af@localhost IDENTIFIED BY 'xxxxx';
grant all privileges on airflow.* to 'af'@The '%' identified by 'xxxxx' with grant option;
flush privileges;
# opentimestamp, add current time when data is updated (airflow recommended)
set @@global.explicit_defaults_for_timestamp=on;
exit
# restart server
service mysqld restart
# Account test
mysql -u af -p
show databasCopy the code
Airflow configuration
The core configuration
# editor/home/airflow/airflow. The CFG file, modify the following content, according to the actual situation to fill in/IP and port:Sql_alchemy_conn = mysql: / / af: af123@10.0.10.xx: 3306 / airflowMysql :// account: password @ip:port/db
executor = CeleryExecutor
# web_server_host = 10.0.10. XxBase_url = http://10.0.10.xx:8080 [celery] broker_url = redis: / / 10.0.10. Xx: result_backend = 6379/0 Db + mysql: / / af: xxxxx@10.0.10.xx: 3306 / airflowRender the dependency diagram directly when entering the DAG detail page
dag_default_view = graph
# change time zone
default_timezone = Asia/ShanghCopy the code
Change the time on the webUI
Modify the following files, note
${PYTHON_HOME}/ lib/python3.7 / site - packages/airflowCopy the code
This is the airflow installation path.
/ home/airflow/local/lib/python3.7 / site – packages/airflow/utils/timezone. Py
/ home/airflow/local/lib/python3.7 / site – packages/airflow/utils/sqlalchemy. Py
/ home/airflow/local/lib/python3.7 / site – packages/airflow/WWW/templates/admin/master HTML
Other configuration
Do not load the sample DAG
load_examples = False
# Run task as airflow user
default_impersonation = airflow
# For each DAG, run only the latest instance of the cycle condition
catchup_by_default = False
The maximum number of threads that scheduler uses
max_threads =Copy the code
Start the
When you use the database for the first time or need to initialize it
airflow initdbCopy the code
Log in as the airflow user for your operating system and run
airflow webserver -D
airflow scheduler -D
airflow flower -D
airflow worker -D Copy the code
webUI
airflow |
host:8080 |
flower |
host:5555 |
test
DAG script, if cluster, need to configure DAG under all nodes
mkdir /home/airflow/dagsCopy the code
Place the hello_world.py file in this folder.
# -*- coding: utf-8 -*-
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import timedelta
# -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -
# these args will get passed on to each operator
# you can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow'.'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'email': ['langai818@qq.com'].'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -
# dag
dag = DAG(
'hello_world_dag',
default_args=default_args,
description='test DAG',
schedule_interval=timedelta(days=1))
def print_hello():
return 'Hello world! '
# -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -
# first operator
date_operator = BashOperator(
task_id='date_task',
bash_command='date',
dag=dag)
# -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -
# second operator
sleep_operator = BashOperator(
task_id='sleep_task',
depends_on_past=False,
bash_command='sleep 5',
dag=dag)
# -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -
# third operator
hello_operator = PythonOperator(
task_id='hello_task',
python_callable=print_hello,
dag=dag)
# -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -
# dependencies
sleep_operator.set_upstream(date_operator)
hello_operator.set_upstream(daCopy the code
Python code validation
python hello_world.pyCopy the code
Antiquated compile run
airflow list_dags
airflow list_tasks dag_name
airflow test dag_name task_name date
# Empty the task instance
airflow clear dag_id Copy the code
Sqoop
Test Environment Configuration
Static gateway
For testing purposes, set the gateway on the local machine (Windows) to static.
Disabling the Firewall
MySQL setup
The following changes are for MySQL database, operating system Windows10
Add and authorize users for SQoop
[mysql]
CREATE USER sqoop@localhost IDENTIFIED BY 'xxxxxx';
grant all privileges on *.* to 'sqoop'@The '%' identified by 'xxxxxx' with grant option;
flush privileges;
# restart server
exit
[bash]
service mysqld restart
# Account test
mysql -u af -p
show databasesCopy the code
test
Connect the test
Database --connect JDBC :mysql://10.0.11.99/ --username sqoop -p sqoop list-tables --connect JDBC: mysql: / / 10.0.11. Xx/database_name - username sqoop - P sqoop import - connect JDBC: mysql: / / 10.0.11. Xx/database_name --username sqoop -password xxxxxx-table table_nameCopy the code
MySQL and other databases, providing their own transmission mode, can greatly speed up extraction.
Sqoop import - connect JDBC: mysql: / / 10.0.11 xx/database_name - username sqoop - password XXXXXX - table table_name - directCopy the code
The script to import
create database if not exists shell_db comment "data import with script" location '/user/hive/warehouse/shell_db';Copy the code
Write full extraction override scripts. Hive and SQOOP are case-insensitive to table names and fields.
#! /bin/bash
local_db_address='10.0.11. Xx'
db_name='shell_db'
tables=(cjd jgtzd rkd scjh scjld zcjh)
for table_name in ${tables[@]};
do
sqoop import \
--connect jdbc:mysql://$local_db_address/gnbz \
--direct \
--username sqoop \
--password xxxxxx \
--table $table_name \
--m 2 \
--hive-import \
--hive-overwrite \
--delete-target-dir \
--hive-database $db_name;
donCopy the code
Timing task
Note that if you run a DAG on a schedule_interval of one day, the run stamped 2016-01-01 will be trigger soon after 2016-01-01T23:59. In other words, the job instance is started once the period it covers has ended.
When will Airflow commence operation? It actually starts from daG start_date + schedule_interval, or the day after start_date if schedule_interval is set to @daily.
integration
Use airflow to complete scheduled daily full updates to both tables and then combine some of their fields into a single table.
There are three scripts to be used, scheduling them using a Python file called DAG.
The script
qw_import.sh
#! /bin/bash
db_address='192.168.200. Xx'
db_name='dw1'
tables=(BIStoreFGReceive)
for table_name in ${tables[@]};
do
sqoop import \
--connect jdbc:mysql://$db_address/db_qw \
--direct \
--username ljw1 \
--password xxx \
--table $table_name \
--m 1 \
--hive-import \
--hive-overwrite \
--delete-target-dir \
--hive-table $table_name \
--hive-database $db_name;
donCopy the code
kfq_import.sh
#! /bin/bash
db_address='192.168.200. Xx4'
db_name='dw1'
tables=(RKD)
for table_name in ${tables[@]};
do
sqoop import \
--connect jdbc:mysql://$db_address/db_kaifaqu \
--direct \
--username ljw1 \
--password xxxxxx \
--table $table_name \
--m 1 \
--hive-import \
--hive-overwrite \
--delete-target-dir \
--hive-table $table_name \
--hive-database $db_name;
donCopy the code
hive_comb.sql
D_output (storage_date timestamp, unit_area DOUBLE, storage_qty DOUBLE, base_no INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY'\t'; Truncate table dw1.d_output; D_output select cast(date_format(substr(ReceiveDate,1,10),'yyyy-MM-dd') as timestamp) as storage_date, UnitArea as unit_area, ReceiveQty as storage_qty, 6 as base_no from dw1.bistorefgreceive; Insert into dw1.d_output select cast(date_format(substr(RKRQ,1,10),'yyyy-MM-dd') as timestamp) as storage_date,
cpmj as unit_area,
rksl as storage_qty,
8 as base_no
from dw1.rkCopy the code
DAG
# -*- coding: utf-8 -*-
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator
from datetime import timedelta, datetime
from airflow.utils.trigger_rule import TriggerRule
default_args = {
'owner': 'jin'.'depends_on_past': False,
'start_date': datetime(2020, 4, 12, 18),
'email': ['jwli@aiit.org.cn'].'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag_sqool_import = DAG(
dag_id="daily_task",
default_args=default_args,
description='import and combine tasks',
schedule_interval=timedelta(days=1))
# -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --
kfq_script = "/home/script/kfq_import_all.sh"
qw_script = "/home/script/qw_import_all.sh"
#jd_script = "/home/script/jindie_import.sh"
hive_script = "/home/script/hive_comb.sql"
task_import_kfq = BashOperator(
task_id='import_kfq',
bash_command=". %s " % kfq_script,
dag=dag_sqool_import)
task_import_qw = BashOperator(
task_id='import_qw',
bash_command=". %s " % qw_script,
dag=dag_sqool_import)
task_combine = BashOperator(
task_id='hive_combine',
bash_command= "hive -f %s " % hive_script,
dag=dag_sqool_import
)
[task_import_kfq, task_import_qw] >> task_coCopy the code
operations
CDH
# master nodeThe/opt/cm - 5.15.0 / etc/init. D/cloudera - SCM - server start# all nodesThe/opt/cm - 5.15.0 / etc/init. D/cloudera - SCM - agent start# query statusThe/opt/cm - 5.15.0 / etc/init. D/cloudera - SCM - agent status/opt/cm - 5.15.0 / etc/init. D/cloudera - SCM - server status# query log
tail -f/ opt/cm - 5.15.0 /log/cloudera-scm-agent/cloudera-scm-agent.log
tail -f/ opt/cm - 5.15.0 /log/cloudera-scm-agent/cloudera-scm-server.log
# closeThe/opt/cm - 5.15.0 / etc/init. D/cloudera - SCM - agent stop/opt/cm - 5.15.0 / etc/init. D/cloudera - SCM - server stopExit safe mode
sudo -uhdfs hdfs dfsadmin -safemode leave
Datanode health status
hdfs datanode
# Empty the recycle bin
hdfs dfs -expungeCopy the code
airflow
cd /home/airflow/
rm -rf *.pid
Delete all Pids, then
airflow webserver -D
airflow scheduler -D
airflow worker -D
airflow flower -DCopy the code
Linux
Find files
find / -name file_nameCopy the code
Hive
Query the last modification time of the table
Find the transient_lastDdlTime corresponding to the table
SHOW CREATE TABLE table_name;Copy the code
or
show TBLPROPERTIES table_name ('transient_lastDdlTime');Copy the code
This value is then substituted into the following Query
SELECT CAST(from_unixtime(your_transient_lastDdlTime_value) AS timestamp);Copy the code
Or use web tools for time conversion
Error screen
Environment/Configuration
bash: airflow: command not found
You need to add the airflow system path
vi /etc/profile
# append the following to the end
export PATH=$PATH:$PYTHON_HOME/binCopy the code
mysql_config: command not found
This is an error on apache-airflow[mysql] installation due to compilation setup being used while installing mysql and partial repo files missing.
wget http://repo.mysql.com/mysql57-community-release-el7-8.noarch.rpm
rpm -ivh mysql57-community-release-el7-8.noarch.rpm
yum install mysql-develCopy the code
Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql
vi /etc/my.cnf
# append at the end
explicit_defaults_for_timestamp=1
# restart MySQL
systemctl restart mysqld.serviceCopy the code
or
set @@global.explicit_defaults_for_timestamp=on;
# restart MySQL
service mysqld restartCopy the code
bash: netstat: command not found
yum install net-toolsCopy the code
Logon user ‘-bash-4.2$’
Cause: When useradd is used to add normal users, sometimes the environment variable files in the home directory will be lost.
- .bash_profile
- .bashrc
These files are required for every user.
Solution: Run the following command to copy the configuration information from the main default file /etc/skel/ to the user’s home directory.
cp /etc/skel/.bashrc /home/airflow
cp /etc/skel/.bash_profile /home/airflowCopy the code
ssh: connect to host localhost port 22: Connection refused
yum install openssh-serverCopy the code
Airflow
Internal Server Error 500
Cause: The server is not started properly.
Solution: Clear all out, err, log and PID files under AIRFLOW_HOME, kill all airflow related processes and restart the airflow service.
Lockfile. AlreadyLocked: /home/authorization-webserver-monitor. pid is already locked
Cause: The process ID is occupied.
Solution: Clear all err and log files under AIRFLOW_HOME and kill all airflow related processes.
Sqoop
Hive
Job Submission failed with exception ‘org.apache.hadoop.security.AccessControlException(Permission denied: user=root, access=WRITE, inode=”/user”:hdfs:supergroup:drwxr-xr-x
Solution: Create a root path
# su - hdfs
$ hdfs dfs -mkdir /user/root
$ hdfs dfs -chown root:root /user/rootCopy the code
If you already have /user/root, check the file permissions
[root@dn001 ~]# hdfs dfs -ls /userCopy the code
Change to root all:
# hdfs dfs -chown root:root /user/root
hdfs dfs -chmod 777 /user/rootCopy the code
Or:
Then restart the HDFS server.
Insufficient log space
Modify the configuration
- On the CM management console, many alarms are generated because the log space is insufficient. Enter the configuration page of each service and click Filter Log to change the maximum log file backup size from 10 to 2
- On the CM management console, go to the HDFS configuration page and type dfs.replication
- Log in to the cluster and run the hadoop fs -setrep -r 2 / command to restart the cluster
- After the restart, check the host storage and find some imbalance. Log in to the HDFS page of the CM management console, click Operation, and choose Rebalance. After balancing, the storage resources of each host are basically balanced.
Modified components:
HDFS, YARN
Example Query HDFS storage information
hadoop dfsadmin -reportCopy the code
Clear the monitor file in TMP
Clean out the recycle bin
hadoop fs -expungeCopy the code
The resources
Airflow[V1.10] Installation tutorial for the Task Scheduling platform
Centos7 installation of mysql-devel package dependency problem
Command Line Interface Reference
Sqoop imports relational databases to Hive
Importing Data Into Hive
Sqoop User Guide (v1.4.6)
Hive Tutorial
How can I find last modified timestamp for a table in Hive?
Solution after CDH disk space is about to run out