Scheduler is the core of Airflow. I will start the source code based on Scheduler and talk about the scheduling logic. If there is any mistake, please leave a comment and discuss.
Airflow scheduler
- Scheduler starts execution
airflow scheduler
Command, the command is hitairflow/bin/cli.py
In thescheduler()
Method, which is instantiated based on the pass parameterSchedulerJob
And then execute itrun()
Methods;
#Scheduler starts the command, and production only specifies the DAGS path
airflow scheduler -sd /data/airflow_dags
Copy the code
-
SchedulerJob is a subclass of BaseJob, and the run() method is the parent m template method:
- Insert a SchedulerJob instance into the job table.
job_type=SchedulerJob
; - Call a subclass
_execute()
To achieve scheduling; - if
_execute()
No exception is executed orSystemExit
Every job counts as success. If an exception occurs, the job fails.
- Insert a SchedulerJob instance into the job table.
-
The _execute() method is implemented by SchedulerJob:
- Initialize the
DagFileProcessorAgent
Object, an agent that processes DAG files and is responsible for all DAG parse-related work throughout the scheduling process. DagFileProcessorAgent creates Scheluer’s child process, DagFileProcessorManager, and DagFileProcessorManager creates a DagFileProcessor process for each DAG file. To process daG files and collect daG file parsing results. During daG file parsing, communicate between processes and report file processing results to the main Scheluer process. - call
_execute_helper()
Implement scheduling logic;
- Initialize the
-
_execute_helper() is the core logical method for scheduling, including the loop logic for scheduling:
self.executor.start()
, the Executor object has been initialized with the authority.cfg configuration at SchedulerJob instantiation. So the call here is actuallyKubernetesExecutor.start()
, initializes the task queue, result queue, including kube operation client, and so on.
def start(self) : self.worker_uuid = KubeWorkerIdentifier.get_or_create_current_kube_worker_uuid() KubeResourceVersion.reset_resource_version() Initialize two queues, task_queue puts the task, result_queue is the collected DAG self.task_queue = self._manager.Queue() self.result_queue = self._manager.Queue() self.kube_client = get_kube_client() Initialize the scheduler of K8S, which mainly implements THE POD of K8S to perform tasks self.kube_scheduler = AirflowKubernetesScheduler( self.kube_config, self.task_queue, self.result_queue, self.kube_client, self.worker_uuid, self.sync_state_buffer ) self._inject_secrets() if self.sync_state_buffer: self.clear_not_launched_queued_tasks() Copy the code
-
Self. Reset_state_for_orphaned_tasks () : When starting the Scheduler program, the status of DagRun with RUNNING and QUEUED tis is SCHEDULED and reset to None. In this way, the scheduler program can schedule these tis normally later.
-
Self.processor_agent.start () : Starts the multi-process collection of daGs and places the collected DAGs in executor’s result_queue;
-
This is followed by an infinite loop of daGs being fetched and then scheduled
self.processor_agent.harvest_simple_dags()
Fetch the DAg from the executor result_queue.self._change_state_for_tis_without_dagrun
:- If task_instance is in the up_for_retry state but dag_run is not in the running state, Set the state of task_instance to failed and stop scheduling them later.
- If the task_instance state is scheduled/queued/up for reschedule, but the dag_run state is not RUNNING, set the state to None and stop scheduling them later.
_execute_task_instances
: Updates the status of executable tasks to SCHEDULED;self.executor.heartbeat()
:- Celery queue with dag_id, task_id, CMD (
airflow run xxx
), DAG complete path, etcexecute_async()
Methods; - After all tasks are sent to the queue, the executor executes
sync()
Method, herekube_scheduler.run_next(task)
Go to pod to perform tasks;
- Celery queue with dag_id, task_id, CMD (
self._process_executor_events()
: Handles events for the actuator, such as retry, update status, etc.
-
Self.processor_agent.terminate () : terminates the DAG collection process out of the loop;
-
Self.executor.end () : end of the out-of-loop executor;
Scheduler dispatches a number of pods to perform tasks. In pod case, airlow run is executed:
Airflow Run
- Run authorizing image of pod, pass in the parameters for the task to be executed, run the following command:
airflow run CASE_STUDY_GUJING_MONTHLY select_monthly_data_from_mysql_task 2021-11-05T08:00:00+00:00 --local --pool default -sd /Users/user/Docker/airflow/dags/CASE_STUDY_GUJING_MONTHLY.py
Copy the code
LocalTaskJob.run()
, calling the run() method of BaseJob inserts a job record,job_type=LocalTaskJob
, and then calls the subclass implementation_execute()
If the method is executed normally, it succeeds. If the manual exit is also successful. If an exception occurs, it fails.
KubernetesExecutor based executor
recruitment
Little red book, one of the few reliable Internet unicorn at present, want to get on the bus to look at it ~
Urgent recruitment of big data development, data platform development, other departments can help internal promotion, looking for talents.
Interested message plus wechat chat!!