1. APScheduler profile

APscheduler stands for Advanced Python Scheduler

Function to execute the specified job at the specified time rule.

  • You can specify a time rule by interval, by date and time, or by Crontab in Linux.

  • The specified task is a Python function.

2. APScheduler components

APScheduler version 3.6.3

2.1. Several important concepts in APScheduler

2.1.1. The Job assignments

role

Job serves as the minimum execution unit of the APScheduler. Function to be executed when creating a Job, parameters to be specified in the function, and setting information about Job execution.Copy the code

Build instructions

Id: indicates the unique ID of the specified Job. Name: indicates the name of the specified Job. Trigger: indicates a trigger defined by apscheduler that determines the execution time of the Job. When a job is created, the name of the actuator is set to the apscheduler. When a job is created, the name of the actuator is set to the apscheduler. When a job is created, the name of the actuator is set to the apscheduler and the name of the actuator is obtained from the apscheduler. When executor executes a job, it counts the number of times according to the JOB ID and determines whether the job can be executed according to the set number of instances next_run_time: When creating a Job, you can specify a time [datetime]. If you do not specify this time, the time misfire_grace_time is obtained by default based on the trigger. For example, the scheduled execution time of a Job is 21:00:00 but is executed at 21:00:31 due to service restart or other reasons. If this key is set to 40, the Job continues to be executed. Otherwise, the Job coalesce is discarded: Whether jobs are combined is a bool. For example, the scheduler stops for 20 seconds and the job is restarted after the scheduler stops, but the job's trigger is set to execute once in 5s. Therefore, the job misses four execution times. If the trigger is set to yes, the job is executed in one execution. Kwargs: indicates the keyword parameter required by the JobCopy the code

2.1.2

Trigger is bound to a Job. When the scheduler schedules and filters the Job, the Trigger time of the Job is calculated according to the rules of the Trigger, and then the current time is compared to determine whether the Job will be executed. In short, the next execution time is calculated according to the rules of Trigger. There are many types of Trigger: DateTrigger for a specified time, IntervalTrigger for a specified interval, and CronTrigger like the Linux crontabCopy the code

Currently, APScheduler supports triggers:

DateTrigger
IntervalTrigger
CronTrigger
Copy the code

2.1.3. Executor

Executors are initialized in the scheduler and can also be added dynamically through add_executor of the scheduler. Each executor is bound to an alias. This alias is bound to a Job as a unique identifier. During execution, the executor bound to the Job will find the actual executor object, and the type of executor to execute the Job will be selected based on the scheduling. If AsyncIO is selected as the scheduling library, then AsyncIOExecutor is selected; Tornado is selected as the scheduling library; TornadoExecutor is selected as the starting process. You can choose either ThreadPoolExecutor or ProcessPoolExecutor. Executor selection depends on the actual schedulerCopy the code

APScheduler currently supports executors:

AsyncIOExecutor
GeventExecutor
ThreadPoolExecutor
ProcessPoolExecutor
TornadoExecutor
TwistedExecutor
Copy the code

2.1.4. Jobstore Job storage

Jobstore is initialized in scheduler and can also be dynamically added through add_jobstore of scheduler. Each Jobstore is bound to an alias. When the scheduler adds a Job, it finds the corresponding Jobstore based on the specified jobstore and adds the Job to the jobstore. Loads and dumps are stored dynamically in Jobstore by pickling libraries with loads and dumps from __getState__ and __setState__. The storage can be Redis, a database (sqlarchemy integrates multiple databases), mongodb, etcCopy the code

Jobstore currently supported by APScheduler:

MemoryJobStore
MongoDBJobStore
RedisJobStore
RethinkDBJobStore
SQLAlchemyJobStore
ZooKeeperJobStore
Copy the code

2.1.5. Event Event

Event refers to the Event that the APScheduler triggers when it performs certain operations. Users can customize some functions to monitor these events. When certain events are triggered, specific operations are common, such as. Job Execution exception EVENT_JOB_ERROR. Job Event EVENT_JOB_MISSED.Copy the code

Events currently defined by APScheduler

EVENT_SCHEDULER_STARTED
EVENT_SCHEDULER_START
EVENT_SCHEDULER_SHUTDOWN
EVENT_SCHEDULER_PAUSED
EVENT_SCHEDULER_RESUMED
EVENT_EXECUTOR_ADDED
EVENT_EXECUTOR_REMOVED
EVENT_JOBSTORE_ADDED
EVENT_JOBSTORE_REMOVED
EVENT_ALL_JOBS_REMOVED
EVENT_JOB_ADDED
EVENT_JOB_REMOVED
EVENT_JOB_MODIFIED
EVENT_JOB_EXECUTED
EVENT_JOB_ERROR
EVENT_JOB_MISSED
EVENT_JOB_SUBMITTED
EVENT_JOB_MAX_INSTANCES
Copy the code

2.1.6. Listener Listening events

Listener indicates user-defined events to be listened to. When the EVENT_JOB_MISSED Event is triggered by a Job, other processes can be performed as required.Copy the code

2.1.7. Scheduler

The Scheduler is the heart of the APScheduler, through which all related components are defined. Once scheduler is started, it starts scheduling tasks based on the configured tasks. In addition to waking up the schedule based on all the triggers that define the Job at the time it will be scheduled. Scheduling is also triggered when Job information changes. Scheduler can select different components according to its own needs. If AsyncIO is used, AsyncIOScheduler is selected, while Tornado is used, TornadoScheduler is selected.Copy the code

Current Schedulers supported by APScheduler:

AsyncIOScheduler
BackgroundScheduler
BlockingScheduler
GeventScheduler
QtScheduler
TornadoScheduler
TwistedScheduler
Copy the code

2.2. Scheduler Workflow Flowchart

Here, two important processes are selected to draw a crude flowchart to see how scheduler works. One is to add an Add job, and the other is the execution process of the scheduler each time it wakes up the schedule

2.2.1. Scheduler Adds a Job process

2.2.2 Scheduler Scheduling process

3. An example of using APScheduler

Example of AsyncIO scheduling

import asyncio
import datetime

from apscheduler.events import EVENT_JOB_EXECUTED
from apscheduler.executors.asyncio import AsyncIOExecutor
from apscheduler.jobstores.redis import RedisJobStore  Need to install Redis
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.cron import CronTrigger

Jobstore uses redis to store job information
default_redis_jobstore = RedisJobStore(
    db=2,
    jobs_key="apschedulers.default_jobs",
    run_times_key="apschedulers.default_run_times",
    host="127.0.0.1",
    port=6379,
    password="test"
)

# define the rule for executor to use asyncio to schedule execution
first_executor = AsyncIOExecutor()

Jobstore and Executor can be specified directly when the scheduler is initialized
init_scheduler_options = {
    "jobstores": {
        # first is the name of jobStore
        "default": default_redis_jobstore
    },
    "executors": {
        # first is the executor name. This name is used when creating a Job and this executor is used when executing the Job
        "first": first_executor
    },
    The default parameter for creating a job
    "job_defaults": {
        'coalesce': False.# Whether to merge execution
        'max_instances': 1  # maximum number of instances}}# to create the scheduler
scheduler = AsyncIOScheduler(**init_scheduler_options)

# start scheduling
scheduler.start()

second_redis_jobstore = RedisJobStore(
    db=2,
    jobs_key="apschedulers.second_jobs",
    run_times_key="apschedulers.second_run_times",
    host="127.0.0.1",
    port=6379,
    password="test"
)

scheduler.add_jobstore(second_redis_jobstore, 'second')
# define the rule for executor to use asyncio to schedule execution
second_executor = AsyncIOExecutor()
scheduler.add_executor(second_executor, "second")


# * * * * * * * * * * * about about the Event related in APScheduler using the sample * * * * * * * * * * * * *
Define a function to listen for events
def job_execute(event):
    """ Listener event handler :param Event: :return: ""
    print(
        "Job to perform the job: \ ncode = > {} \ njob id = > {} \ njobstore = > {}".format(
            event.code,
            event.job_id,
            event.jobstore
        ))


# add a callback to EVENT_JOB_EXECUTED
scheduler.add_listener(job_execute, EVENT_JOB_EXECUTED)


# * * * * * * * * * * * about APScheduler about Job in using the sample * * * * * * * * * * * * *
The AsyncIOExecutor job can execute either a coroutine or a normal function. The AsyncIOExecutor will schedule the job based on the configured function.
If it is a coroutine, it will be thrown directly into the loop. If it is a normal function, thread processing will be enabled
Let's define two functions to see the results of the execution

def interval_func(message):
    print("Present time: {}".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
    print("I'm a normal function.")
    print(message)


async def async_func(message):
    print("Present time: {}".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
    print(I'm a coroutine)
    print(message)


Create triggers to execute the above two functions in different ways
# * * * * * * * * * * * on the Trigger in APScheduler using the sample * * * * * * * * * * * * *
There are two ways to use Trigger, one is to create a class and the other is to use a string
When scheduler initializes, the trigger defined by scheduler is loaded, so the specified string can be used directly


if scheduler.get_job("interval_func_test"."default") :Delete it if it exists
    scheduler.remove_job("interval_func_test"."default")

# Start immediately and finish 2 minutes later. Save the execution to first JobStore second every 10 seconds
scheduler.add_job(interval_func, "interval",
                  args=["I'm doing it every 10 seconds, jobStore Default, Executor Default."],
                  seconds=10,
                  id="interval_func_test",
                  jobstore="default",
                  executor="default",
                  start_date=datetime.datetime.now(),
                  end_date=datetime.datetime.now() + datetime.timedelta(seconds=240))

Create tigger first
trigger = IntervalTrigger(seconds=5)

if scheduler.get_job("interval_func_test_2"."second") :Delete it if it exists
    scheduler.remove_job("interval_func_test_2"."second")
Run this command every 5s
scheduler.add_job(async_func, trigger, args=["I'm going to execute it every 5s and store it in JobStore Second, executor = second"],
                  id="interval_func_test_2",
                  jobstore="second",
                  executor="second")

Function execution using coroutines and using cron to configure triggers

if scheduler.get_job("cron_func_test"."default") :Delete it if it exists
    scheduler.remove_job("cron_func_test"."default")

# Execute every 10 seconds immediately
scheduler.add_job(async_func, "cron",
                  args=["I'm doing it every 30 seconds, in JobStore Default, executor Default."],
                  second='30',
                  id="cron_func_test",
                  jobstore="default",
                  executor="default")

Create tigger first
trigger = CronTrigger(second='20 or 40')

if scheduler.get_job("cron_func_test_2"."second") :Delete it if it exists
    scheduler.remove_job("cron_func_test_2"."second")
Run this command every 5s
scheduler.add_job(async_func, trigger, args=["I'm going to execute them every 20s and store them in JobStore Second, executor = second"],
                  id="cron_func_test_2",
                  jobstore="second",
                  executor="second")

Create trigger object directly using create trigger object
print("Start: {}".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
asyncio.get_event_loop().run_forever()
Copy the code

Partial interception of output results

After starting the JOB, run the JOB every 5s

Activation: The coroutine function is defined as a coroutine function. [jobStore second, executor = second] [jobStore second, executor = second] [jobStore second] code => 4096 job.id => interval_func_test_2 jobstore=>secondCopy the code

Jobs executed in the 20s and 40s

Jobstore second, executor = second job execute job: code => 4096 job.id => cron_func_test_2 jobstore=>secondCopy the code

The job is executed every 10 seconds

Executor default (jobStore default, executor Default) Jobstore second, executor = second job execute job: Code => 4096 job.id => interval_func_test jobstore=>default job Execute job: code => 4096 job.id => interval_func_test_2 jobstore=>secondCopy the code

The Job is executed every 5s

Jobstore second, executor = second job execute job: code => 4096 job.id => interval_func_test_2 jobstore=>secondCopy the code

This command is executed every 30 seconds

Jobstore Default, executor Default job execute job: code => 4096 job.id => cron_func_test jobstore=>defaultCopy the code

conclusion

This is how apscheduler works and how it is used.

The power of ApScheduler is that it can be integrated into frameworks like Tornado, Django, flask, etc., as well as run alone. For example, CronTrigger has more powerful uses. See the cron usage on the website

The above examples only list some general usages. In fact, there are some more practical usages, which use the features of APSchedulder to dynamically add, suspend, delete, restart jobs, etc. Define the different functions according to their functional properties, and then develop a Web service. Working with jobs dynamically in Web services, it would be nice to imagine adding tasks on demand to a monitoring system.

Do an example of this section when you have time to share.