preface

APScheduler is a well-known scheduled task framework in Python, which can meet the requirements of scheduled execution or periodic execution of program tasks. Similar to crontab on Linux, but more powerful than crontab, APScheduler can not only add and delete scheduled tasks, but also provide various functions of persistent tasks.

APScheduler is a weakly distributed framework because each task object is stored in the current node and can only be distributed in the form of human flesh, such as Redis.

When I first contacted APScheduler, I found it had many concepts. When I first contacted APScheduler, I felt comfortable using Crontab directly because there were too many concepts. However, many of the company’s projects are implemented based on APScheduler, so I would like to simply check its source code.

Concept of pre –

Mention the key concepts in APScheduler in the simplest possible language.

  • Job: The task object, the task that you are going to perform
  • JobStores: Stores jobs in memory by default and supports redis and mongodb
  • Executors: That’s the thing that does the work
  • Trigger: Trigger that triggers the corresponding invocation logic when a condition is reached
  • Scheduler: something that connects the above parts

APScheduler provides multiple schedulers, and different schedulers are suitable for different scenarios. At present, the BackgroundScheduler I most commonly use is the BackgroundScheduler, which is suitable for scheduling requiring running programs in the background.

There are a variety of other schedulers:

BlockingScheduler: Suitable for running a single task in a process, usually when the scheduler is the only thing you have to run.

AsyncIOScheduler: Suitable for situations where the Asyncio framework is used

GeventScheduler: Suitable for situations where the GEvent framework is used

TornadoScheduler: Suitable for applications using the Tornado framework

TwistedScheduler: Suitable for applications that use the Twisted framework

QtScheduler: Works well when QT is used

This paper only analyzes the logic related to BackgroundScheduler. First, it takes a brief look at the official example and then analyzes it layer by layer.

Analyze the BackgroundScheduler

The official example code is as follows.

from datetime import datetime
import time
import os
from apscheduler.schedulers.background import BackgroundScheduler

def tick(a):
    print('Tick! The time is: %s' % datetime.now())

if __name__ == '__main__':
    scheduler = BackgroundScheduler()
    scheduler.add_job(tick, 'interval', seconds=3) Add a task and run it 3 seconds later
    scheduler.start()
    print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))

    try:
        This is where the application activity is simulated (keeping the main thread active).
        while True:
            time.sleep(2)
    except (KeyboardInterrupt, SystemExit):
        # Close the scheduler
        scheduler.shutdown()
Copy the code

The above code is very simple. It instantiates a scheduler through BackgroundScheduler method, and then calls add_job method to add tasks to JobStores. By default, the tasks are stored in memory, or more specifically, in a dict. When the scheduler is finally started with the start method, the APScheduler fires a trigger named interval every three seconds, which causes the scheduler to schedule the default executor to execute the logic in the tick method.

When the program is complete, the shutdown method is called to shutdown the scheduler.

BackgroundScheduler is actually based on threads, and threads have the concept of daemon threads. If daemon mode is enabled, the scheduler does not have to be closed.

Take a look at the source of the BackgroundScheduler class.

# apscheduler/schedulers/background.py

class BackgroundScheduler(BlockingScheduler):

    _thread = None

    def _configure(self, config):
        self._daemon = asbool(config.pop('daemon'.True))
        super()._configure(config)

    def start(self, *args, **kwargs):
        Create event notifications
        Multiple threads can wait for an event to occur, and when the event occurs, all threads are activated.
        self._event = Event() 
        BaseScheduler.start(self, *args, **kwargs)
        self._thread = Thread(target=self._main_loop, name='APScheduler')
        # daemons are set to the main thread of Python.
        If it is a non-daemon thread, the main Python thread will wait for other non-daemon threads to finish running before terminating
        self._thread.daemon = self._daemon Daemon is a daemon thread
        self._thread.start() # start thread

    def shutdown(self, *args, **kwargs):
        super().shutdown(*args, **kwargs)
        self._thread.join()
        del self._thread
Copy the code

In the above code, detailed comments are given to explain briefly.

The _configure method is mainly used for parameter setting, where the self._daemon parameter is defined and the _configure method of the parent class is called via the super method.

Thread events are a thread synchronization mechanism. If you look at the source code, you will find that thread events are implemented based on conditional locks. Thread events provide set(), wait(), clear() these three main methods.

  • The set() method sets the event flag state to true.
  • The clear() method sets the event flag state to false
  • The wait() method blocks the thread until the event flag state is true.

After the Thread event is created, we call the start() method of its parent class, which is the real start method. After starting, we create a Thread through the Thread method. The target function of the Thread is self._main_loop, which is the main training of the scheduler. It will always execute the logic in the main loop to implement the various functions of APScheduler, which is very important method, again, for a while. Once created, it is ok to start the thread.

After a thread is created, it defines the daemon of the thread. If the daemon is True, the current thread is a daemon thread, and if the daemon is not a daemon thread, the current thread is a daemon thread.

Just to be brief, if a thread is a daemon thread, Python’s main thread will exit immediately after executing its logic, ignoring the daemon thread. If it is a non-daemon thread, Python’s main thread will exit only after all other non-daemon threads have finished executing.

The shutdown method first calls the shutdown method of the parent class, then calls the Join method, and finally removes the thread object directly from del.

After BackgroundScheduler instantiates the scheduler, add_job is called and three parameters are added to the add_job method. The name of the trigger is interval, and the parameter of this trigger is seconds=3.

Can I change the name of the trigger to any character? This is not possible, APScheduler actually uses the Entry Point technique in Python here, if you have gone through the process of making a Python package and packaging it and uploading it to PYPI, you will remember entry Point. Entry Point may not only be packaged forever, but it can also be used for modular plug-in architectures, which I’ll talk about later.

Add_job, in short, () method to the incoming trigger name, corresponding interval corresponding to apscheduler. Triggers. Interval. IntervalTrigger class, seconds parameters is of the class.

Parse the add_job method

The source code for the add_job method is as follows.

# apscheduler/schedulers/base.py/BaseScheduler

    def add_job(self, func, trigger=None, args=None, kwargs=None, id=None, name=None,
                misfire_grace_time=undefined, coalesce=undefined, max_instances=undefined,
                next_run_time=undefined, jobstore='default', executor='default',
                replace_existing=False, **trigger_args):
        job_kwargs = {
            'trigger': self._create_trigger(trigger, trigger_args),
            'executor': executor,
            'func': func,
            'args': tuple(args) if args is not None else (),
            'kwargs': dict(kwargs) if kwargs is not None else {},
            'id': id,
            'name': name,
            'misfire_grace_time': misfire_grace_time,
            'coalesce': coalesce,
            'max_instances': max_instances,
            'next_run_time': next_run_time
        }
        # filter
        job_kwargs = dict((key, value) for key, value in six.iteritems(job_kwargs) if
                          value is not undefined)
        Instantiate the specific task object
        job = Job(self, **job_kwargs)

        # Don't really add jobs to job stores before the scheduler is up and running
        with self._jobstores_lock:
            if self.state == STATE_STOPPED:
                self._pending_jobs.append((job, jobstore, replace_existing))
                self._logger.info('Adding job tentatively -- it will be properly scheduled when '
                                  'the scheduler starts')
            else:
                self._real_add_job(job, jobstore, replace_existing)

        return job
Copy the code

The add_job method doesn’t have much code. At first, we create the job_kwargs dictionary, which contains triggers, actuators, and so on.

  • Trigger trigger, created by the self._create_trigger() method. This method takes two arguments. Trigger in this code is the interval string, and trigger_args is the corresponding argument.
  • The exectuor executor is currently default, but I’ll talk about that later.
  • The func callback method is the logic that we really want to be executed. The trigger triggers the scheduler, and the scheduler calls the specific logic that the executor executes.
  • Misfire_grace_time: For example, a task used to run at 12:00, but 12:00 was not scheduled for some reason, and now it is 12:30. At this time, the scheduling will judge the difference between the current time and the pre-scheduled time. If misfire_GRACe_time is set to 20, the task that failed to be scheduled will not be executed, and if misfire_GRACe_time is set to 60, the task will be scheduled.
  • Coalesce: If a task does not run because of some reasons, task accumulation is caused, for example, 10 identical people are accumulated. If coalesce is True, only the last layer is executed. If coalesce is False, 10 consecutive tasks are executed.
  • Max_instances: A maximum of several instances can be running at one time through a task
  • Next_run_time: indicates the next running time of a task

We then do a filter and pass the parameters to the Job class to complete the instantiation of the task object.

Self. _jobstores_lock is a reentrant lock. In Python, a reentrant lock is implemented based on a normal mutex, but with a variable that counts. Each time a lock is added, the variable is increments by one, and each time a lock is unlocked, the variable is decrement by one. Only when this variable is 0 is the mutex actually released.

Once the lock is obtained, the current state of the scheduler is determined. If the state is STATE_STOPPED, the task is added to the _pending_JOBS backlog. If the state is not stopped, the _real_ADD_JOB method is called and the job object is returned.

The _real_add_job method is the real method to add the job object to the specified storage backend.

When a task object is added to the specified storage back end (by default, it is stored directly in memory), the scheduler retrieves it for execution.

Back in the example code, we execute the scheduler’s add_job method, followed by the scheduler’s start method.

At the end

Considering the number of words, this article will stop here and continue to analyze APScheduler later.

If the article is helpful to you, click on “Watching” to support it. See you in the next article.