Airflow is a platform for orchestrating, scheduling, and monitoring Workflow, open-source by Airbnb and now incubated at the Apache Software Foundation. Workflow is programmed as DAGs composed of tasks by AirFlow, and the scheduler executes tasks on a set of workers with specified dependencies. In addition, Airflow provides extensive command-line tools and an easy-to-use user interface for users to view and operate, and provides a monitoring and alarm system.

Airflow uses DAG (Directed acyclic graph) to define workflow and configure job dependencies easily, far outperforming other task scheduling tools in terms of ease of administration and ease of use.

Airflow’s natural advantages

Easy to use. The AirFlow itself is written in Python and the workflow is defined in Python. With Python glue, there is no task that cannot be scheduled, and with open source code, there is no problem that cannot be solved. And more importantly, the code is human-readable.

You can also customize Operators, shell scripts, Python, mysql, Oracle, hive, etc. Regardless of whether it is a non-traditional database platform or a big data platform, you can write Operators if you are not satisfied with the official offer.

Elegant, job definitions are straightforward, script commands are easy to parameterize based on the Jinja template engine, and the Web interface is human-readable, whoever uses it knows.

Easy to scale, providing base classes for scaling and a choice of actuators, CeleryExcutor uses message queues to orchestrate multiple workers, distribute multiple workers, and allow infinite scaling.

This is so vast that you can test, deploy, run, clean, reroute, chase, and so on from the terminal without even opening the browser. It’s nice to think of all The Times we have to click on the screen to deploy a small operation.

This is free of charge. We will manage routine inspection tasks, scheduled scripts (such as Crontab), ETL processing, and monitoring on Airflow without having to write any monitoring scripts. Any errors will be logged to the email address of the designated person for cost-effective and efficient production. However, due to the lack of Chinese documents, most of which are not complete, it is not very easy to get started quickly. The first step is to have some knowledge of Python, read the official documentation repeatedly, and understand the scheduling principle. This series is intended to unveil the AirFlow we have created.

AirFlow’s structure and composition

The AirFlow architecture diagram is shown above and contains the following core components:

** Meta-database: ** This database stores information about the status of tasks.

**Scheduler: **Scheduler is a process that uses DAG definitions in combination with task states in metadata to determine which tasks need to be executed and the priority of task execution. The scheduler typically runs as a service.

**Executor: The **Executor is a message queue process that is bound to the scheduler to determine which worker process actually executes each task plan. There are different types of actuators, each of which uses a class specifying a worker process to perform tasks. For example, LocalExecutor executes tasks using parallel processes running on the same machine as the scheduler process. Other actuators like CeleryExecutor perform tasks using worker processes that exist in a separate cluster of worker machines.

**Workers: ** These are the processes that actually perform the task logic, determined by the actuators being used.

The main components are described as follows:

Scheduler

The scheduler. As the core hub of the whole AIRLFOW, the scheduler is responsible for discovering user-defined DAG files, converting directed acyclic graphs into several specific Dagruns according to timers, and monitoring task status.

Dag

Directed acyclic graph. Directed acyclic graphs are used to define the task dependencies of tasks. The task is defined by the operator, where BaseOperator is the parent class of all operators.

Dagrun

Directed acyclic graph task instance. Under the action of the scheduler, each directed acyclic graph is transformed into a task instance. Different task instances are distinguished by dagid/ execution date.

Taskinstance

A task instance below dagrun. Specifically, for each DagRun instance, the operator will be converted to the corresponding Taskinstance. Because the task may fail, the scheduler is defined to decide whether to retry. Different task instances are distinguished by dagid/ execution date/operator/execution time/retry times.

Executor

Task executor. Each task needs to be completed by the task executor. BaseExecutor is the parent class of all task executors.

LocalTaskJob

Responsible for monitoring tasks and rows, which include an important attribute called TaskRunner.

TaskRunner

Start subprocesses to execute tasks.

AirFlow installation and initial experience

Installation AirFlow requires the Pyhton environment. Please check for the installation of the environment and do not proceed. Here we’ll use python’s PIP tool directly for the AirFlow installation:

# airflow requires the home directory, default is ~/airflow, # however, if you want, # (optional) export AIRFLOW_HOME = ~/airflow # install PIP install apache from pypi -airflow # initialize database airflow initdb # webserver -p 8080 # initiating timer # accessing localhost:8080 Open example Dag on the home pageCopy the code

Sqlite will be used as the database by default and a new database file will be created in the environment variable path by executing the database initialization command directly. You can also specify Mysql as the AirFlow database by modifying authority.conf:

# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor
executor = LocalExecutor
# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engine, more information
# their website
sql_alchemy_conn = mysql://root:xxxxxx@localhost:3306/airflow
Copy the code

After installation, start AirFlow and enter the UI page to see:

Of course we can also switch to tree view mode:

In addition, it also supports icon view, Gantt chart and other modes, isn’t it very nice?

Hello AirFlow!

So far we have installed a stand-alone version of AirFlow locally, and we can do a Demo of this on the official website to see the power of AirFlow. First of all, we will introduce some concepts and principles:

What is AirFlow doing when we write the AirFlow task?

First the user writes the Dag file

Second, SchedulerJob finds new DAG files and converts the DAG to Dagrun based on startTime, endTime, and schedule_interval. Because the Dag is simply a file that locates dependencies, the scheduler needs to turn it into a specific task. At a fine-grained level, a Dag is converted into several DagRuns, each consisting of several task instances, and specifically, each operator into a corresponding Taskinstance. Taskinstance decides whether to execute based on the task dependencies and the context of the dependencies.

The execution of the task is then sent to the executor for execution. Specifically, it can be executed locally, on a cluster or send to the celery worker remotely.

Finally, during execution, a LocalTaskJob is encapsulated and taskRunner is called to start the child process to execute the task.

Then we need to add our own Dag file. Let’s use the example from the official website directly. This is a typical ETL task:

### ETL DAG Tutorial Documentation This ETL DAG is compatible with Airflow 1.0.x (specifically tested with 1.10.12)  and is referenced as part of the documentation that goes along with the Airflow Functional DAG tutorial located [here](https://airflow.apache.org/tutorial_decorated_flows.html) """ # [START tutorial] # [START import_module] import json # The DAG object; we'll need this to instantiate a DAG from airflow import DAG # Operators; we need this to operate! from airflow.operators.python import PythonOperator from airflow.utils.dates import days_ago # [END import_module] # [START default_args] # These args will get passed on to each operator # You can override them on a per-task basis during  operator initialization default_args = { 'owner': 'airflow', } # [END default_args] # [START instantiate_dag] with DAG( 'tutorial_etl_dag', default_args=default_args, description='ETL DAG tutorial', schedule_interval=None, start_date=days_ago(2), tags=['example'], ) as dag: # [END instantiate_dag] # [START documentation] dag.doc_md = __doc__ # [END documentation] # [START extract_function] Def extract(**kwargs): extract(**kwargs): extract(**kwargs): extract(**kwargs): extract(**kwargs): extract(**kwargs): extract(**kwargs): extract(**kwargs): extract(**kwargs): extract(**kwargs): extract 502.22} 'ti. Xcom_push (' order_data', data_string) # [END extract_function] # [START transform_function] def transform(**kwargs): ti = kwargs['ti'] extract_data_string = ti.xcom_pull(task_ids='extract', key='order_data') order_data = json.loads(extract_data_string) total_order_value = 0 for value in order_data.values(): total_order_value += value total_value = {"total_order_value": total_order_value} total_value_json_string = json.dumps(total_value) ti.xcom_push('total_order_value', total_value_json_string) # [END transform_function] # [START load_function] def load(**kwargs): ti = kwargs['ti'] total_value_string = ti.xcom_pull(task_ids='transform', key='total_order_value') total_order_value = json.loads(total_value_string) print(total_order_value) # [END load_function] # [START main_flow] extract_task = PythonOperator( task_id='extract', python_callable=extract, ) extract_task.doc_md = """\ #### Extract task A simple Extract task to get data ready for the rest of the data pipeline. In this case, getting data is simulated by reading from a hardcoded JSON string. This data is then put into xcom, so that it can be processed by the next task. """ transform_task = PythonOperator( task_id='transform', python_callable=transform, ) transform_task.doc_md = """\ #### Transform task A simple Transform task which takes in the collection of order data from xcom and computes the total order value. This computed value is then put into xcom, so that it can be processed by the next task. """ load_task = PythonOperator( task_id='load', python_callable=load, ) load_task.doc_md = """\ #### Load task A simple Load task which takes in the result of the Transform task, by reading it from xcom and instead of saving it to end user review, just prints it out. """ extract_task >> transform_task >> load_task # [END main_flow] # [END tutorial]Copy the code

Tutorial.py needs to be placed in the DAGs folder of the authority.cfg setting. The default location for DAGs is ~/airflow/ DAGs.

Then execute the following command:

python ~/airflow/dags/tutorial.py
Copy the code

If this script returns no errors, then there is nothing particularly wrong with your code and your Airflow environment. We can view this new task with some simple scripts:

# authorizlist_DAGs # authorizlist_tasks # authorizlist_tasks # authorizlist_tasks # authorizlist_tasks # authorizlist_tasks # authorizingthe 'tutorial' DAG Creation list_Tasks tutorial --treeCopy the code

Then we can see the task in action in the UI we mentioned above!

AirFlow itself includes a few common commands:

backfill Run subsections of a DAG for a specified date range list_tasks List the tasks within a DAG clear Clear a set of  task instance, as if they never ran pause Pause a DAG unpause Resume a paused DAG trigger_dag Trigger a DAG run pool CRUD operations on  pools variables CRUD operations on variables kerberos Start a kerberos ticket renewer render Render a task instance's template(s) run Run a single task instance initdb Initialize the metadata database list_dags List all the DAGs dag_state  Get the status of a dag run task_failed_deps Returns the unmet dependencies for a task instance from the perspective of  the scheduler. In other words, why a task instance doesn't get scheduled and then queued by the scheduler, and then run by an executor). task_state Get the status of a task instance serve_logs Serve logs generate by worker test  Test a task instance. This will run a task without checking for dependencies or recording it's state in the database. webserver Start a Airflow webserver instance resetdb Burn down and rebuild the metadata database upgradedb Upgrade the metadata database to latest version scheduler Start a scheduler instance worker Start a Celery worker node flower Start a Celery Flower version Show the version connections List/Add/Delete connectionsCopy the code

Overall, AirFlow is fairly easy to get started with, features reasonably supported, and has decent scalability. If the user is familiar with Python and can do some custom development, it is not too cool!

Airflow is already widely used within commercial companies such as Adobe, Airbnb, Google, Lyft, etc. In China, Alibaba also uses Maat, and the industry has large-scale practical experience.

Come and have a try!