This is the 17th day of my participation in the November Gwen Challenge. Check out the event details: The last Gwen Challenge 2021
Hardware and software Environment
- windows 10 64bits
- Anaconda with python 3.7
- Apscheduler 3.6.3
preface
When it comes to scheduled tasks, the first thing you might think of is scheduled tasks on Windows or crontab on Linux. Check out this video on how to use crontab on Ubuntu. Apscheduler is a scheduled task tool developed in Python language, which provides a very rich and easy to use scheduled task interface
The installation
The installation is very simple, using PIP
pip install apscheduler
Copy the code
The four components of apScheduler
- Triggers can be triggered by date, time interval, or
contab
Expressions fire in three ways - Job stores Specifies where jobs are stored, either in memory by default or in various databases
- Executors submit specified operations to the thread pool or process pool for running
- Schedulers are used to schedule jobs
BackgroundScheduler
(Background run) andBlockingScheduler
(Blocking)
Code practice
Here are a few examples of how to use apScheduler
import time from apscheduler.schedulers.background import BlockingScheduler from apscheduler.triggers.interval import IntervalTrigger def my_job(): print('my_job, {}'.format(time.ctime())) if __name__ == "__main__": Scheduler = BlockingScheduler() IntervalTrigger = intervalTrigger (seconds=1) # Scheduler. start() print('=== end. ===')Copy the code
When you execute the code, the output looks like this
Because we used BlockingScheduler, which was blocking, we only saw the output from the my_job method, and the statement print(‘=== end. === =’) was not executed. BackgroundScheduler, which runs in the background without blocking the main thread, looks at the following code
import time
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
def my_job():
print('my_job, {}'.format(time.ctime()))
if __name__ == "__main__":
scheduler = BackgroundScheduler()
intervalTrigger=IntervalTrigger(seconds=1)
scheduler.add_job(my_job, intervalTrigger, id='my_job_id')
scheduler.start()
print('=== end. ===')
while True:
time.sleep(1)
Copy the code
The result of code execution looks like this
If triggers is set to DateTrigger, it becomes a job that executes at a certain point in time, as shown in the following example
import time
import datetime
from apscheduler.schedulers.background import BlockingScheduler
from apscheduler.triggers.date import DateTrigger
def my_job():
print('my_job, {}'.format(time.ctime()))
if __name__ == "__main__":
scheduler = BlockingScheduler()
intervalTrigger=DateTrigger(run_date='2020-07-17 16:18:55')
scheduler.add_job(my_job, intervalTrigger, id='my_job_id')
scheduler.start()
Copy the code
When the set time is up, the homework will be done by itself
If you want to execute on a specified period, you need to use CronTrigger, which works much like the Crontab timed task in UNIX and can specify very detailed and complex rules. Look at the sample code again
import time from apscheduler.schedulers.background import BlockingScheduler from apscheduler.triggers.cron import CronTrigger def my_job(): print('my_job, {}'.format(time.ctime())) if __name__ == "__main__": IntervalTrigger =CronTrigger(second=1) # Scheduler = BlockingScheduler( IntervalTrigger =CronTrigger(hour=19, minute=30, second=1) day=1, hour=19) scheduler.add_job(my_job, intervalTrigger, id='my_job_id') scheduler.start()Copy the code
The code execution looks like this
The following code does not use executors because there is only one operation. But by default, apScheduler uses ThreadPoolExecutor and the thread pool size is 10
Let’s take a look at the use of executors
import time
from apscheduler.schedulers.background import BlockingScheduler
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.executors.pool import ThreadPoolExecutor
def my_job():
print('my_job, {}'.format(time.ctime()))
if __name__ == "__main__":
executors = {
'default': ThreadPoolExecutor(20)
}
scheduler = BlockingScheduler(executors=executors)
intervalTrigger=IntervalTrigger(seconds=1)
scheduler.add_job(my_job, intervalTrigger, id='my_job_id')
scheduler.start()
Copy the code
/ / Set the thread pool size to 20 and pass executors in while initializing the Scheduler. / / Operate exactly as before
As a rule of choice between ThreadPoolExecutor and ProcessPoolExecutor, use ProcessPoolExecutor for CPU-intensive jobs, Others use ThreadPoolExecutor, although ThreadPoolExecutor and ProcessPoolExecutor can also be used interchangeably
Finally, let’s look at the job store, which we’ll change to store in SQLite
import time
from apscheduler.schedulers.background import BlockingScheduler
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
def my_job():
print('my_job, {}'.format(time.ctime()))
jobstores = {
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
if __name__ == "__main__":
executors = {
'default': ThreadPoolExecutor(20)
}
scheduler = BlockingScheduler(jobstores=jobstores, executors=executors)
intervalTrigger=IntervalTrigger(seconds=1)
scheduler.add_job(my_job, intervalTrigger, id='my_job_id')
scheduler.start()
Copy the code
Sqlite database file jobs.sqlite will be generated in the source directory after the code is executed. We open it with graphical tools to view it, and we can see the ID of the job stored in the database and the time of the next execution of the job
The resources
- apscheduler.readthedocs.io/
- SQLAlchemy and sqlite