The APScheduler framework for scheduling tasks

preface

Following up on the previous article, which briefly outlined the process of using the BackgroundScheduler to do scheduled tasks, this article analyzes the _real_ADD_job and _main_loop methods.

Although APScheduler has many different schedulers and many different ways of using it, its core is similar.

By the way, the APScheduler analyzed in this article is the latest version 3.6.1.

Analyze the _real_add_job

Review the general logic for adding task objects from the previous article.

Instantiate BackgroundScheduler –> call the add_job method –> finally call _real_add_job

The _real_add_job source code is shown below, and the general logic of the code is commented accordingly.

# apscheduler/schedulers/base.py/BaseScheduler

    def _real_add_job(self, job, jobstore_alias, replace_existing):
        Adds the task object to the specified storage back end (default: in-memory -->dict)
        
        Fill in undefined values using default values
        replacements = {}
        for key, value in self._job_defaults.items():
            if not hasattr(job, key):
                replacements[key] = value

        If the next run time is not defined, the next run time is calculated
        if not hasattr(job, 'next_run_time'):
            now = datetime.now(self.timezone)
            replacements['next_run_time'] = job.trigger.get_next_fire_time(None, now)

        # Apply any substitutions
        job._modify(**replacements)

        Add the job to the given job library
        store = self._lookup_jobstore(jobstore_alias)
        try:
            store.add_job(job)
        except ConflictingIdError:
            if replace_existing:
                store.update_job(job)
            else:
                raise

        # Mark the task object as unpending
        job._jobstore_alias = jobstore_alias

        Notify listeners that a new task has been added
        event = JobEvent(EVENT_JOB_ADDED, job.id, jobstore_alias)
        self._dispatch_event(event)

        self._logger.info('Added job "%s" to job store "%s"', job.name, jobstore_alias)

        Notify the scheduler about a new job
        if self.state == STATE_RUNNING:
            self.wakeup()
Copy the code

There are general comments in the code, here is a brief analysis.

Start by defining the replacements dictionary, then loop through the _job_defaults dictionary to determine whether the key in the dictionary is an attribute of the job task object, and if so, add it to the replacements dictionary.

In the same way, check whether the job task object has the next_RUN_time attribute. If it does not, call the get_next_fire_time method on the trigger of the current task object to calculate the next running time of the current task object.

Then call the job task object _modify method to modify the attributes of the job task object.

How to modify attributes? The approved dictionary is defined, and then the value of replacements dictionary is obtained and the judge whether to replace. The replacement is placed in the approved dictionary. Finally, the approved dictionary is passed through. The setattr method is used to set the values in the dictionary as the attributes of the job object to modify the attributes of the job object.

Then call the _lookup_jobStore method to find the job stores used to store the current task object. Jobstore_alias is the parameter to the _real_add_job method, which is called by the add_job method. Looking back, By default, jobstore_alias is a default character string. In this case, memory is selected as job stores, and the add_job method of job sotres is invoked to add job objects to the job.

Job stores are the default job store.

See the start method of the BaseScheduler class. Part of the logic of this method is as follows.

# apscheduler/scheduleers/base.py/BaseScheduler

 with self._jobstores_lock:
            # Create a default job store if nothing else is configured
            if 'default' not in self._jobstores:
                self.add_jobstore(self._create_default_jobstore(), 'default')
Copy the code

It creates the default jobstore by calling the _create_default_jobStore method, which is coded as follows.

# apscheduler/scheduleers/base.py/BaseScheduler

def _create_default_jobstore(self):
        """Creates a default job store, specific to the particular scheduler type."""
        return MemoryJobStore() Create memory as jobStore
Copy the code

Looking at the __init__ method of the MemoryJobStore class, you can see that it uses dict as the final storage object.

# apscheduler/jobstores/memory.py

class MemoryJobStore(BaseJobStore):
    """ Stores jobs in an array in RAM. Provides no persistence support. Plugin alias: ``memory`` """

    def __init__(self):
        super().__init__()
        # list of (job, timestamp), sorted by next_run_time and job id (ascending)
        self._jobs = []
        self._jobs_index = {}  # id -> (job, timestamp) lookup table
Copy the code

Go back to the _real_add_job method, use the _lookup_jobStore method to find the jobstore, use the add_job method to add it to the corresponding jobstore, and modify the job job object _jobstore_alias to mark the job object as undetermined.

The JobEvent class is instantiated, and the _dispatch_event method is called to implement event notification. The details are omitted here, but the dispatcher has its own custom listeners that will be called when a task is triggered.

Once that’s done, the scheduler thread is woken up with the wakeup method, which in BaseScheduler is an empty method decorated with an AbstractMethod decorator, so subclasses of which have to override the wakeup method because of AbstractMethod. The code for this method in BlockingScheduler calls the set() method to wake up the thread.

# apscheduler/schedulers/blocking.py/BlockingScheduler

def wakeup(self):
   The # set() method sets the event flag state to true.
   self._event.set() Wake up to avoid long sleep
Copy the code

Analyze the get_next_fire_time

Looking back at _real_add_JOB, one of the code looks like this.

if not hasattr(job, 'next_run_time'):
            now = datetime.now(self.timezone)
            replacements['next_run_time'] = job.trigger.get_next_fire_time(None, now)
Copy the code

The get_next_fire_time method, which is critical because None and the timestamp of the current locale are passed in the code above, is computed as follows.

# apscheduler/triggers/interval.py/IntervalTrigger

    def get_next_fire_time(self, previous_fire_time, now):
        if previous_fire_time:
            The last run time plus the interval gets the next run time
            next_fire_time = previous_fire_time + self.interval
        elif self.start_date > now:
            next_fire_time = self.start_date
        else:
            timediff_seconds = timedelta_seconds(now - self.start_date)
            next_interval_num = int(ceil(timediff_seconds / self.interval_length))
            next_fire_time = self.start_date + self.interval * next_interval_num

        if self.jitter is not None:
            next_fire_time = self._apply_jitter(next_fire_time, self.jitter, now)

        if not self.end_date or next_fire_time <= self.end_date:
            return self.timezone.normalize(next_fire_time)
Copy the code

The get_next_fire_time method basically calculates the next time the task object will run.

The get_next_fire_time method is different for different triggers.

Analyze the _main_loop

Before dissecting _main_loop, let’s review the inheritance of the BackgroundScheduler class.

BackgroundScheduler –> BlockingScheduler –> BaseScheduler

BaseScheduler is a metaclass.

BlockingScheduler is suitable for running only a single task in a process, usually when the scheduler is the only thing you have to run.

BackgroundScheduler is based on BlockingScheduler. In contrast to BlockingScheduler, threads are actually created to implement background processing. The rest of the BackgroundScheduler is no different from BlockingScheduler.

In BackgroundScheduler’s start method, another thread is created to start the _main_loop method. The logic for this method is in BlockingScheduler class.

class BlockingScheduler(BaseScheduler):
    """ A scheduler that runs in the foreground (:meth:`~apscheduler.schedulers.base.BaseScheduler.start` will block). """
    _event = None

    def start(self, *args, **kwargs):
        self._event = Event()
        super().start(*args, **kwargs)
        self._main_loop()

    def shutdown(self, wait=True):
        super().shutdown(wait)
        self._event.set()

    def _main_loop(self):
        wait_seconds = TIMEOUT_MAX
        whileself.state ! = STATE_STOPPED:Wait_seconds specifies the timeout period for waiting for event notification
            The # wait() method blocks the thread until the event flag state is true.
            self._event.wait(wait_seconds)
            The # clear() method sets the event flag state to false
            self._event.clear()
            wait_seconds = self._process_jobs()

    def wakeup(self):
        The # set() method sets the event flag state to true. Wake up the thread
        self._event.set()
Copy the code

TIMEOUT_MAX. Then determine the current state of the scheduler. If it is not in the stopped state, the thread is blocked by the wait method of thread events. Either the _real_add_job method calls the wakeup method to wakeup the thread. If the scheduler thread is woken up, there is a task object for it to handle. Another way to unblock is to wait for it to time out.

When the thread is woken up, the clear method is immediately called, the thread event state is set to false, and it will block again when it loops to WAIT.

The key of _main_loop is _process_jobs. This method iterates through all job stores, calls the executioner to execute the job by the specified time, calculates the next execution time of the job in each job store, and returns the shortest execution time. The scheduler blocks and waits for the appropriate time.

The source code for the _process_jobs method is as follows.

# apscheduler/schedulers/base.py/BaseScheduler

       def _process_jobs(self):
        if self.state == STATE_PAUSED:
            self._logger.debug('Scheduler is paused -- not processing jobs')
            return None

        self._logger.debug('Looking for jobs to run')
        now = datetime.now(self.timezone) # current time
        next_wakeup_time = None
        events = []

        with self._jobstores_lock:
            Get the current task from _jobStores
            for jobstore_alias, jobstore in self._jobstores.items():
                try:
                    # Check whether the execution time is up based on the current time
                    due_jobs = jobstore.get_due_jobs(now)
                except Exception as e:
                    self._logger.warning('Error getting due jobs from job store %r: %s',
                                         jobstore_alias, e)
                    # Wake up time
                    retry_wakeup_time = now + timedelta(seconds=self.jobstore_retry_interval)
                    if not next_wakeup_time or next_wakeup_time > retry_wakeup_time:
                        next_wakeup_time = retry_wakeup_time

                    continue

                for job in due_jobs:
                    Search the executor of the current task object
                    try:
                        executor = self._lookup_executor(job.executor)
                    except BaseException:
                        self._logger.error(
                            'Executor lookup ("%s") failed for job "%s" -- removing it from the '
                            'job store', job.executor, job)
                        self.remove_job(job.id, jobstore_alias)
                        continue
                    Get run time
                    run_times = job._get_run_times(now)
                    run_times = run_times[- 1:] if run_times and job.coalesce else run_times
                    if run_times:
                        try:
                            Submit this task to the executor
                            executor.submit_job(job, run_times)
                        except MaxInstancesReachedError:
                            self._logger.warning(
                                'Execution of job "%s" skipped: maximum number of running '
                                'instances reached (%d)', job, job.max_instances)
                            event = JobSubmissionEvent(EVENT_JOB_MAX_INSTANCES, job.id,
                                                       jobstore_alias, run_times)
                            events.append(event)
                        except BaseException:
                            self._logger.exception('Error submitting job "%s" to executor "%s"',
                                                   job, job.executor)
                        else:
                            event = JobSubmissionEvent(EVENT_JOB_SUBMITTED, job.id, jobstore_alias,
                                                       run_times)
                            events.append(event)

                        Calculate the next execution time of the task object
                        job_next_run = job.trigger.get_next_fire_time(run_times[- 1], now)
                        if job_next_run:
                            # Change job math
                            job._modify(next_run_time=job_next_run)
                            # Update the modified job to jobStore
                            jobstore.update_job(job)
                        else:
                            Remove jobStore from jobStore if there is no next run time
                            self.remove_job(job.id, jobstore_alias)
                            
                Set a new wake up time
                jobstore_next_run_time = jobstore.get_next_run_time()
                if jobstore_next_run_time and (next_wakeup_time is None or
                                               jobstore_next_run_time < next_wakeup_time):
                    next_wakeup_time = jobstore_next_run_time.astimezone(self.timezone)

        # Dispatch collected events
        for event in events:
            self._dispatch_event(event)

        # Determine the delay until this method should be called again
        if self.state == STATE_PAUSED:
            wait_seconds = None
            self._logger.debug('Scheduler is paused; waiting until resume() is called')
        elif next_wakeup_time is None:
            wait_seconds = None
            self._logger.debug('No jobs; waiting until a job is added')
        else:
            wait_seconds = min(max(timedelta_seconds(next_wakeup_time - now), 0), TIMEOUT_MAX)
            self._logger.debug('Next wakeup is due at %s (in %f seconds)', next_wakeup_time,
                               wait_seconds)

        return wait_seconds
Copy the code

The _process_jobs method seems long, but the logic is straightforward.

Get_due_jobs = get_due_jobs = get_due_jobs = get_due_jobs = get_due_jobs = get_due_jobs = get_due_jobs = get_due_jobs = get_due_jobs

# apscheduler/jobstores/memory.py/MemoryJobStore

    def __init__(self):
        super().__init__()
        # list of (job, timestamp), sorted by next_run_time and job id (ascending)
        self._jobs = []
        self._jobs_index = {}  # id -> (job, timestamp) lookup table

    def get_due_jobs(self, now):
        now_timestamp = datetime_to_utc_timestamp(now)
        pending = []
        for job, timestamp in self._jobs:
            if timestamp is None or timestamp > now_timestamp:
                break
            pending.append(job)

        return pending
Copy the code

The logic of the memoryJobStore. get_due_jobs method is to iterate over the next execution time of all current task objects. If the execution time is less than or equal to the current time, it is added to the pending list.

Once you have a list of task objects that you certainly want to execute, walk through the list and call the _lookup_executor method to retrieve the executor of the current task object. If there is no executor, remove the task from the corresponding Job Store using the remove_job method.

The _get_run_times method is then called to get the run time, which is coded as follows.

# apscheduler/job.py/Job

    def _get_run_times(self, now):
        run_times = []
        next_run_time = self.next_run_time
        while next_run_time and next_run_time <= now:
            run_times.append(next_run_time)
            next_run_time = self.trigger.get_next_fire_time(next_run_time, now)

        return run_times
Copy the code

The _get_run_times method runs by getting self.next_run_time and determining whether it is less than or equal to the current time. If so, add the run_times list and call get_next_fire_time to calculate the next time to run.

After returning to the _process_JOBS method and obtaining run_times, the submit_job method of the executor is called to submit the current task object and runtime as parameters to the executor, which will execute the specific logic of the task object.

Then, call get_next_fire_time to calculate the execution time of the next task, and modify the calculated result to the job task object using _modify. In addition, update the task to the corresponding Job store using update_job.

Get_next_run_time = get_next_run_time = get_next_run_time = get_next_run_time = get_next_run_time = get_next_run_time = get_next_run_time

#apscheduler/jobstores/memory.py/MemoryJobStore

   def get_next_run_time(self):
        return self._jobs[0] [0].next_run_time if self._jobs else None
Copy the code

At the end

This article shows a lot of details of the code, of course, there are a lot of details not covered, you see the explanation here is quite interesting, here is a brief summary of the previous content, a complete logic.

The logical chain for adding a job to the corresponding Job store is.

BaseScheduler. Add_job (); BaseScheduler._real_add_job (); BaseScheduler. _lookup_jobStore = jobstore –> store. Add_job (job) = jobstore –> The BlockingScheduler. Wakeup method wakes up the scheduler thread.

The logical chain of the main loop of the _main_loop method is.

The blockingScheduler. _main_loop method does the logic for scheduling the executor execution and calculating the task object for the next execution of the main loop uppermost method –> _process_jobs.

The APScheduler source code has not been read, the next article will look at how the executor executes task objects.

If the article is helpful to you, click on “Watching” to support it. See you in the next article.