This is the 17th day of my participation in the November Gwen Challenge. Check out the event details: The last Gwen Challenge 2021

Hardware and software Environment

  • windows 10 64bits
  • Anaconda with python 3.7
  • Apscheduler 3.6.3

preface

When it comes to scheduled tasks, the first thing you might think of is scheduled tasks on Windows or crontab on Linux. Check out this video on how to use crontab on Ubuntu. Apscheduler is a scheduled task tool developed in Python language, which provides a very rich and easy to use scheduled task interface

The installation

The installation is very simple, using PIP

pip install apscheduler
Copy the code

The four components of apScheduler

  • Triggers can be triggered by date, time interval, orcontabExpressions fire in three ways
  • Job stores Specifies where jobs are stored, either in memory by default or in various databases
  • Executors submit specified operations to the thread pool or process pool for running
  • Schedulers are used to schedule jobsBackgroundScheduler(Background run) andBlockingScheduler(Blocking)

Code practice

Here are a few examples of how to use apScheduler

import time from apscheduler.schedulers.background import BlockingScheduler from apscheduler.triggers.interval import IntervalTrigger def my_job(): print('my_job, {}'.format(time.ctime())) if __name__ == "__main__": Scheduler = BlockingScheduler() IntervalTrigger = intervalTrigger (seconds=1) # Scheduler. start() print('=== end. ===')Copy the code

When you execute the code, the output looks like this

Because we used BlockingScheduler, which was blocking, we only saw the output from the my_job method, and the statement print(‘=== end. === =’) was not executed. BackgroundScheduler, which runs in the background without blocking the main thread, looks at the following code

import time
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger


def my_job():
    print('my_job, {}'.format(time.ctime()))


if __name__ == "__main__":
    scheduler = BackgroundScheduler()
    intervalTrigger=IntervalTrigger(seconds=1)
    scheduler.add_job(my_job, intervalTrigger, id='my_job_id')
    scheduler.start()
    print('=== end. ===')
    while True:
        time.sleep(1)

Copy the code

The result of code execution looks like this

If triggers is set to DateTrigger, it becomes a job that executes at a certain point in time, as shown in the following example

import time
import datetime
from apscheduler.schedulers.background import BlockingScheduler
from apscheduler.triggers.date import DateTrigger


def my_job():
    print('my_job, {}'.format(time.ctime()))


if __name__ == "__main__":
    scheduler = BlockingScheduler()
    intervalTrigger=DateTrigger(run_date='2020-07-17 16:18:55')
    scheduler.add_job(my_job, intervalTrigger, id='my_job_id')
    scheduler.start()

Copy the code

When the set time is up, the homework will be done by itself

If you want to execute on a specified period, you need to use CronTrigger, which works much like the Crontab timed task in UNIX and can specify very detailed and complex rules. Look at the sample code again

import time from apscheduler.schedulers.background import BlockingScheduler from apscheduler.triggers.cron import CronTrigger def my_job(): print('my_job, {}'.format(time.ctime())) if __name__ == "__main__": IntervalTrigger =CronTrigger(second=1) # Scheduler = BlockingScheduler( IntervalTrigger =CronTrigger(hour=19, minute=30, second=1) day=1, hour=19) scheduler.add_job(my_job, intervalTrigger, id='my_job_id') scheduler.start()Copy the code

The code execution looks like this

The following code does not use executors because there is only one operation. But by default, apScheduler uses ThreadPoolExecutor and the thread pool size is 10

Let’s take a look at the use of executors

import time
from apscheduler.schedulers.background import BlockingScheduler
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.executors.pool import ThreadPoolExecutor


def my_job():
    print('my_job, {}'.format(time.ctime()))


if __name__ == "__main__":

    executors = {
        'default': ThreadPoolExecutor(20)
    }

    scheduler = BlockingScheduler(executors=executors)

    intervalTrigger=IntervalTrigger(seconds=1)
    scheduler.add_job(my_job, intervalTrigger, id='my_job_id')
    scheduler.start()

Copy the code

/ / Set the thread pool size to 20 and pass executors in while initializing the Scheduler. / / Operate exactly as before

As a rule of choice between ThreadPoolExecutor and ProcessPoolExecutor, use ProcessPoolExecutor for CPU-intensive jobs, Others use ThreadPoolExecutor, although ThreadPoolExecutor and ProcessPoolExecutor can also be used interchangeably

Finally, let’s look at the job store, which we’ll change to store in SQLite

import time
from apscheduler.schedulers.background import BlockingScheduler
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore


def my_job():
    print('my_job, {}'.format(time.ctime()))

jobstores = {
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}


if __name__ == "__main__":

    executors = {
        'default': ThreadPoolExecutor(20)
    }

    scheduler = BlockingScheduler(jobstores=jobstores, executors=executors)

    intervalTrigger=IntervalTrigger(seconds=1)
    scheduler.add_job(my_job, intervalTrigger, id='my_job_id')
    scheduler.start()

Copy the code

Sqlite database file jobs.sqlite will be generated in the source directory after the code is executed. We open it with graphical tools to view it, and we can see the ID of the job stored in the database and the time of the next execution of the job

The resources

  • apscheduler.readthedocs.io/
  • SQLAlchemy and sqlite