Why did I take airflow?
Originally in the business development department back-end development; Due to business needs, from each department a developer to do development, data report received task, heart tell yourself do, data platform for executives and convenient operating personnel to check the data platform, in the new project team can learn new python airflow and task scheduling tool, for he would have a lot of ascension.
Start your learning airflow journey
-
preparation
Setup the airflow UI for the test environment
Airflow’s best practice
Official Airflow document in English
Etl technical documentation
Etl-example best practice (code)
Airflow Chinese document
What is the airflow
- Market access is a platform to programmatically author, schedule and monitor workflows.
Airflow is a programmable, scheduled and monitored workflow platform.
Use Airflow to author workflows as Directed Acyclic Graphs (DAGs) of tasks. The Airflow scheduler executes your tasks on an array of workers while following the specified dependencies. Rich command line utilities make performing complex surgeries on DAGs a snap. The rich user interface makes it easy to visualize pipelines running in production, monitor progress, and troubleshoot issues when needed.
So, given the directed Acyclic diagram (DAG), we can define a set of dependent tasks to be executed accordingly. Airflow provides extensive command-line tools for system management and its Web management interface enables easy task scheduling and real-time monitoring of task status to facilitate operation and administration of the system.
- Airflow profile:
Airflow is Airbnb’s open source Python workflow management platform with built-in Web UI and scheduling, currently incubated under Apache.
What will airflow be used for?
In real projects, we often encounter the following scenarios:
- The simplest way is to add some Crond tasks, but what if you want to trace the results of each task?
- In big data scenarios, multiple sub-operations such as exporting online data, importing data to the big data platform, and triggering data processing are required at intervals, and each sub-operation has dependencies?
- When managing a large number of hosts, want a unified job management platform on which you can define tasks to manage the devices below?
Market access via the DAG profile provides easy definition of various tasks and their dependencies and scheduled execution with a visual operational Web interface.
What is airflow’s advantage?
-
Own web management interface, easy to use;
-
Business code and scheduling code are completely decoupled;
-
Define subtasks through Python code and support various Operate operators, which is flexible and can meet various user requirements.
-
Python open source project, support to expand operate and other plug-ins, easy secondary development;
-
Similar tools include Akzban, Quart, etc.
Operations and tasks in Airflow
- DAG
Summary: DAG (Directed Acyclic Graph) is a Directed Acyclic Graph, also known as a Directed Acyclic Graph. In Airflow, a DAG defines a complete job. All tasks in the same DAG have the same scheduling time.
Parameter: dag_id: identifies the DAG to facilitate future management
Default_args: the default parameter. If no corresponding parameter is configured for the current DAG instance job, the corresponding parameter in default_args of the DAG instance is used
Schedule_interval: sets the DAG execution period. The crontab syntax can be used
- Task
Summary: A Task is a specific job Task in a DAG. It depends on the DAG. It must exist in a DAG. Task can configure dependencies in a DAG (cross-DAG dependencies can also be configured, but this is not recommended. Cross-dag dependencies make DAG diagrams less intuitive and cause problems for dependency management).
Parameters:
Dag: Pass an instance of DAG so that the current job belongs to the corresponding DAG
Task_id: Gives an identifier (name) to a task for future management
Owner: Indicates the owner of the task for future management
Start_date: indicates the start time of a task
Airflow’s dispatch time
Start_date: in the configuration, this is the time when the job is scheduled to start. When talking about execution, it is the start time of scheduling.
“Schedule_interval” : scheduling execution interval.
Execution_date: indicates the execution time. This is called execution time in Airflow but is not the actual execution time.
Therefore, the first scheduling time is start_date specified in a job and the value of schedule_interval. The recorded execution_date is the first time that the schedule_interval of starT_date is configured in the job.
[An example]
If we configure a job with a start_date of June 2, 2019 and a schedule_interval of * 00 12 * * *, the first execution will be on June 3, 2019 at 12:00. Therefore, execution_date is not a literal execution time. The actual execution time is the next time that the execution_date display time meets the schedule_interval.
Code sample
# coding: utf-8
# DAG object; We will need it to instantiate a DAG
from airflow import DAG
import pendulum
# Operators; We need to use this object to execute the process!
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
Define default parameters
default_args = {
'owner': 'airflow'.# owner name
'depends_on_past': False, Whether to rely on the execution state of the previous one
'start_date': datetime,7,24,16,45 (2019),The first time to start execution is Greenwich Mean time (GMT). For testing purposes, the current time is set minus the execution period
'retries': 3.Number of failed retries
'retry_delay': timedelta(seconds=5) Retry interval
}
# define DAG, instantiate
dag = DAG(
dag_id='hello_world_dag'.# dag_id
default_args=default_args, # specify default parameters
# schedule_interval="00, *, *, *, *, *" # Schedule_interval ="00, *, *, *, *
schedule_interval=timedelta(minutes=1) The value is executed once every minute
)
Define the Python function to execute 1
def hello_world_1():
current_time = str(datetime.today())
with open('/root/tmp/hello_world_1.txt'.'a') as f:
f.write('%s\n' % current_time)
assert 1 == 1 You can use assert assertions in a function to determine if it is executing properly, or you can throw an exception directly
# Define the Python function to execute 2
def hello_world_2():
current_time = str(datetime.today())
with open('/root/tmp/hello_world_2.txt'.'a') as f:
f.write('%s\n'% current_time) -- -- -- -- -- -- -- --Define task 1 to execute
t1 = PythonOperator(
task_id='hello_world_1'.# task_id
python_callable=hello_world_1, # specify the function to execute
dag=dag, Dag to which you belong
retries=2, # Number of retries for rewriting failure. If no writing is done, default_args specified in daG class is used by default
)
Define task 2 to execute
t2 = PythonOperator(
task_id='hello_world_2'.# task_id
python_callable=hello_world_2, # specify the function to execute
dag=dag, Dag to which you belong
)
t2.set_upstream(t1) # t2 depends on T1; Equivalent to the t1. Set_downstream (t2); Equivalent to dag.set_dependency('hello_world_1', 'hello_world_2')
# indicates that t2 is executed only if T1 is successfully executed.
# or
#t1 >> t2
Copy the code
As you can see, the entire DAG configuration is a complete Python code that instantiates the DAG, instantiates the appropriate operators, and configures upstream and downstream dependencies through methods such as set_downstream.
Web UI
The Airflow UI makes it easy to monitor and troubleshoot your data pipelines. Here’s a quick overview of some of the features and visualizations you can find in the Airflow UI.
-
DAGs View
List of the DAGs in your environment, and a set of shortcuts to useful pages. You can see exactly how many tasks succeeded, failed, or are currently running at a glance.
-
Tree View
A tree representation of the DAG that spans across time. If a pipeline is late, you can quickly see where the different steps are and identify the blocking ones.
-
Graph View
The graph view is perhaps the most comprehensive. Visualize your DAG’s dependencies and their current status for a specific run.
-
Variable View
The variable view allows you to list, create, edit or delete the key-value pair of a variable used during jobs. Value of a variable will be hidden if the key contains Any words in (‘ password ‘, ‘secret’, ‘passwd’, ‘authorization’, ‘API_key’, ‘apikey’, ‘access_token’) by default, but can be configured to show in clear-text.
-
Gantt Chart
The Gantt chart lets you analyse task duration and overlap. You can quickly identify bottlenecks and where the bulk of the time is spent for specific DAG runs.
-
Task Duration
The duration of your different tasks over the past N runs. This view lets you find outliers and quickly understand where the time is spent in your DAG over many runs.
-
Code View
Transparency is everything. While the code for your pipeline is in source control, this is a quick way to get to the code that generates the DAG and provide yet more context.
Review of learning:
- It will take me too long to study the official airflow document. I should have read it in one day, but in fact I should have read it in two days.
- Learning etL technical documentation should be calculated on an hourly basis.
- Whether the search of learning materials is accurate
- Learned practices
Learn from your colleagues
- Project communication skills
- Learn from project Managers (previously SAP)
- Be familiar with the business
- Learn about the health care system
The results of
Data is migrated from the original repository to the target repository.
Learning materials
The airflow’s official website
Airflow in Chinese
Example github project
Community to share
The purpose of 007 is to create a platform for ordinary people to grow up through continuous writing. The platform is to create an atmosphere of companionship + supervision and mutual assistance, and an environment of “self-discipline + practice of otherness”, so that more comrades can gain new life.
Friends from all over the country can participate in it. Offline activities will be organized regularly every month. Professionals from all over the country are also in the community of “007 or You’re Out”, and we welcome partners from all walks of life to communicate.
-
Why I write
-
Why did comrades-in-arms join the 007 writing community
-
To join 007 or You’re Out, scan penguin’s QR code.
-
Every morning like to listen to the business thinking course, share with everyone
Tutor introduction:
Lady Min: The thinking Institute of the World’s Richest man, led by Lady Min, a diva mentor of Chinese entrepreneurship, is about to open. The system is the first university in China to study the thinking of the world’s richest man.
Miss Min is the founder of Weishi Management Consulting, China entrepreneur coach, architect of new business ecology, serving the world’s top 500 companies: Bank of China, China Mobile, BYD. Train ️ 10,000 entrepreneurs every year, and give one-to-one guidance to over ️️️ enterprises in 10 years.
- Service projects include: social new retail, store clothing, beauty industry, financial industry, manufacturing industry
- Good at: business model design, strategy consulting, sales explosion, business school system, new business design
- Known as: walking printing money machine, female Einstein, goddess of wealth
Team management articles | executes the supervisor how to execute instructions – to improve execution team”
1. Express clearly, unambiguously, with process description and result description. 2. Do not give vague instructions, after giving instructions to give methods.Copy the code