• Understanding Apache Airflow’s Key Concepts
  • By Dustin Stansbury
  • Translation from: The Gold Project
  • This article is permalink: github.com/xitu/gold-m…
  • Translator: Starrier
  • Proofread: Yqian1991

The third in a four-part series

In parts 1 and 2 of Quizlet’s Search for an Optimal Workflow management System, we promoted the need for a workflow management system (WMS) in modern business practices and provided a list of desired features and capabilities. This led us to select Apache Airflow as our WMS choice. This article is intended to provide curious readers with a detailed overview of Airflow’s components and operations. We introduce Airflow’s key concepts by implementing the sample workflow described in the first part of this series (see Figure 3.1).

Figure 3.1: Example of a data processing workflow.

Airflow is a WMS that sees the tasks and their dependencies as code, executes them against those scheduled specification tasks and distributes the required tasks between worker processes. Airflow provides an excellent UI for displaying the status of current and past tasks and allows you to manually manage the execution and status of tasks.

The workflows are directed acyclic graphs

Airflow’s workflows are a collection of tasks with directional dependencies. Airflow uses a directed and directed acyclic graph, or DAG for short, to represent workflow. Each node in the graph is a task, and the edges in the graph represent dependencies between tasks (the graph is forced to be looping, so no looping dependencies occur, leading to an infinite execution loop).

The top of Figure 3.2 illustrates how our sample workflow is realized as a DAG in Airflow. Notice that the execution plan structure of our sample workflow task in Figure 1.1 is similar to the DAG structure in Figure 3.2.

Figure 3.2 is a screen capture from Airflow UI representing the sample workflow DAG. Panel top: Chart view of DagRun on January 25. The dark green node indicates that the TaskInstance status is Succeeded. Light green depicts the “running” state of the TaskInstance. Bottom subpanel: tree diagram for example_Workflow DAG. Airflow’s main components are highlighted in the screen capture and include Sensor, Operator, Task, DagRuns and TaskInstances. DagRuns is shown as columns in the diagram — DagRun is shown in cyan on January 25. Each box in the figure represents a TaskInstance — the TaskInstance (” running “) for the perform_currency_Conversion task on January 25 is shown in blue.

At a high level, you can think of a DAG as a container that contains the dependencies of tasks, and when and how the context of those tasks is set. Each DAG has a set of attributes, the most important being its dag_id, a unique identifier in all DAGs, its start_date to indicate when DAG tasks are executed, and schedule_interval to indicate how often tasks are executed. In addition, dag_id, start_date, and schedule_interval, each DAG can be initialized with a set of default_arguments. These default parameters are inherited by all tasks in the DAG.

In the following code block, we have defined a DAG in Airflow to implement our game company sample workflow.

# Each workflow /DAG must have a unique text identifier
WORKFLOW_DAG_ID = 'example_workflow_dag'

# Start/end time is a dateTime object
# Here we start on January 1, 2017
WORKFLOW_START_DATE = datetime(2017, 1, 1)

# Scheduler/retry interval is a timeDelta object
# Here we perform DAG tasks every day
WORKFLOW_SCHEDULE_INTERVAL = timedelta(1)

The default parameter is applied to all tasks by default
# in the DAG
WORKFLOW_DEFAULT_ARGS = {
    'owner': 'example'.'depends_on_past': False,
    'start_date': WORKFLOW_START_DATE,
    'email': ['example@example_company.com'].'email_on_failure': True,
    'email_on_retry': False,
    'retries': 5,
    'retry_delay': timedelta(minutes=5)
}

# Initialize DAG
dag = DAG(
    dag_id=WORKFLOW_DAG_ID,
    start_date=WORKFLOW_START_DATE,
    schedule_interval=WORKFLOW_SCHEDULE_INTERVAL,
    default_args=WORKFLOW_DEFAULT_ARGS,
)
Copy the code

Operators,SensorsAnd the Tasks

Although the DAG is used to organize and set the execution context, the DAG does not perform any actual calculations. Instead, the mission is really the “work” element of Airflow we want to perform. Tasks have two characteristics: they can perform some display operation, in which case they are operators, or they can pause execution of a dependent task until certain conditions are met, in which case they are Sensors. In principle, an Operator can execute any function that is executed in Python. Also, Sensors can check the status of any process or data structure.

The following code block shows how to define some (hypothetical) Operator and Sensor classes to implement our workflow example.

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
Custom Sensors example/Operators (NoOps) #
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

class ConversionRatesSensor(BaseSensorOperator):
    """ An example of a custom Sensor. Custom Sensors generally overload the `poke` method inherited from `BaseSensorOperator` """
    def __init__(self, *args, **kwargs):
        super(ConversionRatesSensor, self).__init__(*args, **kwargs)

    def poke(self, context):
        print 'poking {}'.__str__()
        
        # poke functions should return a boolean
        return check_conversion_rates_api_for_valid_data(context)

class ExtractAppStoreRevenueOperator(BaseOperator):
    """ An example of a custom Operator that takes non-default BaseOperator arguments. Extracts data for a particular app store identified by `app_store_name`. """
    def __init__(self, app_store_name, *args, **kwargs):
        self.app_store_name = app_store_name
        super(ExtractAppStoreRevenueOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        print 'executing {}'.__str__()
        
        # pull data from specific app store
        json_revenue_data = extract_app_store_data(self.app_store_name, context)
        
        # upload app store json data to filestore, can use context variable for 
        # date-specific storage metadata
        upload_appstore_json_data(json_revenue_data, self.app_store_name, context)

class TransformAppStoreJSONDataOperator(BaseOperator):
    """ An example of a custom Operator that takes non-default BaseOperator arguments. Extracts, transforms, and loads data for an array of app stores identified by `app_store_names`. """
    def __init__(self, app_store_names, *args, **kwargs):
        self.app_store_names = app_store_names
        super(TransformJSONDataOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        print 'executing {}'.__str__()
        
        # load all app store data from filestores. context variable can be used to retrieve
        # particular date-specific data artifacts
        all_app_stores_extracted_data = []
        for app_store in self.app_store_names:
            all_app_stores_extracted_data.append(extract_app_store_data(app_store, context))
        
        # combine all app store data, transform to proper format, and upload to filestore 
        all_app_stores_json_data = combine_json_data(all_app_stores_extracted_data)
        app_stores_transformed_data = transform_json_data(all_app_stores_json_data)
        upload_data(app_stores_transformed_data, context)
Copy the code

The code defines a subclass of BaseSensorOperator, which is ConversionRatesSensor. This class implements the poke method required for all BaseSensorOperator objects. The poke method must return True if the downstream task is to continue, or False otherwise. In our example, this sensor will be used to determine when the exchange rate of the external API is available.

ExtractAppStoreRevenueOperator and TransformAppStoreJSONDataOperator these two classes are inherited from the Airflow BaseOperator classes, and implements the execute method. In our example, the execute method of both classes takes data from the application storage API and converts it to the company’s preferred storage format. Note ExtractAppStoreRevenueOperator app_store_name also accept a custom parameter, it tells such applications where storage should get request from the data.

Note that the Operator and Sensor are usually defined in separate files and imported into the namespace of the same name where we defined the DAG. But we can also add these class definitions to the same DAG definition file.

Formally, Airflow defines the task as an instantiation of the Sensor or Operator classes. An instantiated task needs to provide a unique task_id and DAG container to add the task (note: in versions later than 1.8, DAG objects are no longer required). The following code block shows how to instantiate all the tasks required to perform the sample workflow. (Note: We assume that all operators referenced in the example are defined or imported in the namespace).

# # # # # # # # # # # # # # # # # # # # # # # #
# Instantiate task #
# # # # # # # # # # # # # # # # # # # # # # # #

Instantiate tasks to extract AD network revenue
extract_ad_revenue = ExtractAdRevenueOperator(
    task_id='extract_ad_revenue',
    dag=dag)

# Dynamically instantiate tasks to extract application store data
APP_STORES = ['app_store_a'.'app_store_b'.'app_store_c']
app_store_tasks = []
for app_store in APP_STORES:
    task = ExtractAppStoreRevenueOperator(
        task_id='extract_{}_revenue'.format(app_store),
        dag=dag,
        app_store_name=app_store,
        )
    app_store_tasks.append(task)

# Instantiate tasks to wait for conversion rates, data balancing
wait_for_conversion_rates = ConversionRatesSensor(
    task_id='wait_for_conversion_rates',
    dag=dag)

Instantiate the task to extract the conversion rate from the API
extract_conversion_rates = ExtractConversionRatesOperator(
    task_id='get_conversion_rates',
    dag=dag)

Instantiate tasks to convert spreadsheet data
transform_spreadsheet_data = TransformAdsSpreadsheetDataOperator(
    task_id='transform_spreadsheet_data',
    dag=dag) 

Convert JSON data from instantiated tasks in all application stores
transform_json_data = TransformAppStoreJSONDataOperator(
    task_id='transform_json_data',
    dag=dag,
    app_store_names=APP_STORES)

# Instantiate tasks to apply
perform_currency_conversions = CurrencyConversionsOperator(
    task_id='perform_currency_conversions',
    dag=dag)

# Instantiate tasks to combine all data sources
combine_revenue_data = CombineDataRevenueDataOperator(
    task_id='combine_revenue_data',
    dag=dag)  

# Instantiate the task to check if the historical data exists
check_historical_data = CheckHistoricalDataOperator(
    task_id='check_historical_data',
    dag=dag)

# Instantiate tasks to make predictions based on historical data
predict_revenue = RevenuePredictionOperator(
    task_id='predict_revenue',
    dag=dag)  
Copy the code

The task instantiation code executes in the same file/namespace as the DAG definition. As you can see, the code for adding tasks is very concise and allows for inlining documents through annotations. Lines 10-19 show one of the advantages of defining workflows in code. We can dynamically define three different tasks to extract data from each application store using a for loop. This approach may not benefit us very much in this small example, but as the number of app stores increases, the benefits will become more significant.

Define task dependencies

Airflow’s key strength is the simplicity and intuitive engagement that defines the dependencies between missions. The following code shows how we define a task dependency diagram for our sample workflow:

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
# Define task dependencies #
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

# Dependencies are set using '.set_upstream 'and/or
# `. Set_downstream ` method
# (in version >=1.8.1
# 'extract_ad_revenue << transform_spreadsheet_data' syntax)

transform_spreadsheet_data.set_upstream(extract_ad_revenue)

# Dynamically define application storage dependencies
for task inapp_store_tasks: transform_json_data.set_upstream(task) extract_conversion_rates.set_upstream(wait_for_conversion_rates) perform_currency_conversions.set_upstream(transform_json_data) perform_currency_conversions.set_upstream(extract_conversion_rates) combine_revenue_data.set_upstream(transform_spreadsheet_data) combine_revenue_data.set_upstream(perform_currency_conversions) check_historical_data.set_upstream(combine_revenue_data)  predict_revenue.set_upstream(check_historical_data)Copy the code

At the same time, this code runs in the same file/namespace as the DAG definition. Tasks depend on using the SET_upstream and set_downstream operators to set them up (but in versions higher than 1.8 it is possible to use the shift operators << and >> to perform similar operations more succinct). A task can also have multiple dependencies at once (for example, combine_revenue_data), or none at all (for example, all extract_* tasks).

The top subpanel of Figure 3.2 shows the Airflow DAG created by the above code and rendered as the Airflow’S UI (we’ll see the UI in detail later). The dependency structure of the DAG is very similar to the execution plan we proposed for our sample workflow shown in Figure 1.1. Airflow uses this dependency structure when the DAG is executing to automatically determine which tasks can run simultaneously at any point in time (for example, all extract_* tasks).

DagRuns and TaskInstances

Once we have defined the DAG — that is, we have instantiated tasks and defined their dependencies — we can execute tasks based on the PARAMETERS of the DAG. A key concept in Airflow is execution_time. When the Airflow scheduler is running, it defines a schedule of periodically interrupted dates for executing DAG-related tasks. The execution time starts with DAG start_date and repeats each schedule_interval. In our example, the scheduling time is (‘ 2017 — 01 — 01 00:00:00, ‘2017 — 01 — 02 00:00:00,…) . For each execution_time, DagRun is created and operates in the execution time context. Therefore, DagRun is just a DAG with a certain execution time (see the bottom subpanel of Figure 3.2).

All tasks associated with DagRun are called TaskInstance. In other words, a TaskInstance is a task that has been instantiated and has an execution_date context (see the bottom subpanel of Figure 3.2). DagRuns and TaskInstance are the core concepts of Airflow. Each DagRun and TaskInstance is associated with an entry in the Airflow metadata that records its status (for example, queued, running, failed, skipped, Up for retry). Reading and updating these states is key to the Airflow scheduling and execution process.

Architecture of Airflow

At its core, Airflow is a queuing system built on top of the metadata database. The database stores the state of the queue tasks, which are used by the scheduler to determine how to prioritize other tasks to be added to the queue. This functionality is orchestrated by four main components. (See the left subpanel of Figure 3.2) :

  1. Metadata database: This database stores information about task status. The database performs updates using an abstraction layer implemented in SQLAlchemy. This abstraction layer cleanly separates the rest of the Airflow component functionality from the database.
  2. Scheduler: A scheduler is a process that uses DAG definitions in conjunction with task states in metadata to determine which tasks need to be executed and the priority of their execution. The scheduler usually runs as a service.
  3. actuatorExcutor is a message queue process that is bound to the scheduler to determine which worker process actually executes each task schedule. There are different types of actuators, each of which uses a class that specifies the worker process to perform the task. For example,LocalExecutorTasks are performed using parallel processes running on the same machine as the scheduler process. Like otherCeleryExecutorThe executor executes tasks using worker processes that exist in a separate cluster of work machines.
  4. Workers: These are the processes that actually perform the task logic, determined by the executor being used.

Figure 3.2: Airflow’s general architecture. Airflow’s operation is built on a metadata database, or DAG, of task status and workflows. The scheduler and the executor send the task to the queue for the Worker process to execute. The WebServer runs (often on the same machine as the scheduler) and communicates with the database, presenting task status and task execution logs in the Web UI. Each colored box indicates that each component can exist independently of the others, depending on the type of deployment configuration.

Scheduler operation

For a start, the Airflow scheduler operation looks more like dark magic than logic. That said, if you find yourself debugging its execution, knowing how the scheduler works can save you a lot of time in order to save readers from being stuck in Airflow’s source code (though we highly recommend it!). We outline the basic operations of the scheduler in pseudocode:

Step 0. Load the available DAG definitions from disk (fill DagBag) while the scheduler is running: Step 1. The scheduler uses the DAG definition to identify and/or initialize any DAG runs in the METADATA's DB. Step 2. The scheduler checks the state of the TaskInstance associated with the activity DagRun, resolves any dependencies between taskInstances, identifies taskInstances that need to be executed, and then adds them to the worker queue. Update the newly arranged TaskInstance state to the "queued" state in the database. Step 3. Each available worker takes a TaskInstance from the queue and starts executing it, updating the database record of this TaskInstance from "queued" to "run". Step 4. Once a TaskInstance completes running, the associated worker will report to the queue and update the state of the TaskInstance in the database (for example, "completed", "failed", etc.). Step 5. The scheduler updates the state of all active DagRuns (" Run, "" Failed," "Done") based on the state of all completed related TaskInstances. Step 6. Repeat steps 1-5Copy the code

Web UI

In addition to the main schedule and execution components, Airflow supports a full-featured Web UI component (see Figure 3.2 for some SAMPLE UIs), including:

  1. Webserver: This process runs a simple Flask application that reads all the task states from the metadata and has the Web UI present them.
  2. Web UI: This component allows client users to view and edit the status of tasks in the metadata. Because of the coupling between the scheduler and the database, the Web UI allows the user to manipulate the scheduler’s behavior.
  3. Execution logs: These logs are written by the worker process and stored on disk or in a remote file store (such as GCS or S3). The Webserver accesses the logs and provides them to the Web UI.

While none of these add-ons are necessary for Airflow’s basic operations, they do differentiate them functionally from the rest of its current workflow management. Specifically, UI and integration execution logs allow users to check and diagnose task execution, as well as view and manipulate task status.

Command line interface

In addition to the scheduler and Web UI, Airflow provides robustness through the command line interface (CLI). In particular, we found the following commands useful when we were developing Airflow:

  • airflow test DAG_ID TASK_ID EXECUTION_DATE. Allows users to run tasks independently without affecting the meta-database or concern task dependencies. This command is good for independently testing the basic behavior of a custom Operator class.
  • airflow backfill DAG_ID TASK_ID -s START_DATE -e END_DATE. inSTART_DATEEND_DATETo perform backfilling of historical data without running the scheduler. This is great when you need to change some of the business logic of an existing workflow and you need to update historical data. (Please note, backfillDon’t needCreate in the databaseDagRunItems because they are not made by[SchedulerJob](https://github.com/apache/incubator-airflow/blob/master/airflow/jobs.py#L471)Class).
  • airflow clear DAG_ID. removeDAG_IDIn the metadata databaseTaskInstanceRecord. This is useful when iterating workflow /DAG functionality.
  • airflow resetdb: Although you don’t normally want to run this command very often, it’s helpful if you need to create a “clean history” as might happen when you initially set Airflow (note: this only affects the database and does not delete logs).

In summary, we have provided a number of more abstract concepts as the basis for Airflow. In the Installment, the final installment of this series, we’ll discuss some more practical considerations when deploying Airflow in production.

Thanks to Laura Oppenheimer.

If you find any errors in the translation or other areas that need improvement, you are welcome to revise and PR the translation in the Gold Translation program, and you can also get corresponding bonus points. The permanent link to this article at the beginning of this article is the MarkDown link to this article on GitHub.


Diggings translation project is a community for translating quality Internet technical articles from diggings English sharing articles. The content covers the fields of Android, iOS, front end, back end, blockchain, products, design, artificial intelligence and so on. For more high-quality translations, please keep paying attention to The Translation Project, official weibo and zhihu column.