\
About the author: WeDO Experimental Jun, data analyst; Love life, love writing.
Application scenarios of task scheduling
The so-called task scheduling is to arrange the task execution plan, that is, when to execute, how to execute and so on. They often appear in real projects; In particular, data projects, such as real-time statistics of website visits every 5 minutes, need to analyze visits from log data every 5 minutes.
The following are the application scenarios of task scheduling:
- Offline job scheduling: Executes a task by time granularity
- Shared cache update: periodically refresh the cache, such as redis cache; Shared data between different processes
Task scheduling tool
- The Crontab of Linux supports task execution by minute, hour, day, month, and week
- Java Quartz,
- Task planning for Windows
This article introduces the task scheduler library in Python, APScheduler (Advance Python Scheduler). If you know Quartz, APScheduler is a Python implementation of Quartz; APScheduler provides scheduling schemes based on time, fixed point in time and Crontab, which can be used as a cross-platform scheduling tool.
APScheduler
Component is introduced
APScheduler consists of five parts: trigger, scheduler, task memory, actuator, and task event.
- Job: task ID and task execution func
- Triggers: Determine when a task starts executing
- Job Stores: Stores the state of tasks
- Executors: Sure How to execute the task
- Event: Monitors the abnormal task execution
- Schedulers: Concatenates the entire life cycle of a task to add edits
task
toTask memory
In thetask
The execution time comes when thetask
toactuator
Execute return result; At the same time issue event listening, monitoringTask event
。
The installation
pip install apscheduler
Copy the code
A simple example
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
import logging
import datetime
# task execution function
def job_func(job_id) :
print('job %s is runed at %s' % (job_id, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
# event listener
def job_exception_listener(event) :
if event.exception:
# todo: handle exceptions, alarms, etc
print('The job crashed :(')
else:
print('The job worked :)')
# log
logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)
Define a background task non-blocking scheduler
scheduler = BackgroundScheduler()
Add a task to memory
# trigger: trigger='interval' seconds=10
# executor='default' thread execution
# jobstore: jobstore='default' default memory store
# number of concurrent instances: max_instances
scheduler.add_job(job_func, trigger='interval', args=[1].id='1', name='a test job', max_instances=10, jobstore='default', executor='default', seconds=10)
Set task listening
scheduler.add_listener(job_exception_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
Start the scheduler
scheduler.start()
Copy the code
Operating Conditions:
job 1 is runed at 2020-03-21 20:00:38 The job worked :) job 1 is runed at 2020-03-21 20:00:48 The job worked :) job 1 is runed at 2020-03-21 20:00:58 The job worked :)Copy the code
The trigger
Triggers determine when tasks are executed, and APScheduler supports three types of triggers
-
Trigger =’interval’ : This parameter is executed at a fixed time interval, including weeks, days, hours, minutes, seconds, and a specified time range
sched.add_job(job_function, 'interval', hours=2, start_date='the 2010-10-10 09:30:00', end_date='the 2014-06-15 11:00:00') Copy the code
-
Trigger =’date’: fixed time, execute once
sched.add_job(my_job, 'date', run_date=datetime(2009.11.6.16.30.5), args=['text']) Copy the code
-
Trigger =’cron’: Supports the crontab mode to execute tasks
-
-
Parameter: minute/hour/day/month/week granularity, also can specify the time range
year (int|str) -4-digit year month (int|str) - the month (1-12) day (int|str-- Day of the (1-31) week (int|str) -- ISO Week (1-53) day_of_week (int|str) - the numberor name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) hour (int|str) – hour (0-23) minute (int|str) – minute (0-59) second (int|strThe second () -0-59) start_date (datetime|str) - the earliest possible date/time to trigger on (inclusive) end_date (datetime |str) -- Latest possible date/time to trigger on (inclusive)Copy the code
-
example
Run job_function until 2014-05-30 00:00:00 sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30') Run the crontab command in the following format: minute, hour, day, month, week. * indicates all # Execute job_function from 1st to 15th of May to August at 00:00 sched.add_job(job_function, CronTrigger.from_crontab('0 0 1-15 may-aug *')) Copy the code
-
actuator
Actuators decide how to perform tasks; APScheduler supports four different actuators, including pool(thread/process) and GEvent (IO multiplexing, supporting high concurrency). The default is pool thread. Different actuators can be configured in the scheduler configuration (see scheduler).
- Apscheduler. Executors. Asyncio: synchronous IO, blocking
- Apscheduler. Executors. Gevent: IO multiplexing, non-blocking
- Apscheduler. Executors. The pool: thread ThreadPoolExecutor and processes ProcessPoolExecutor
- Apscheduler. Executors. Twisted: based on the event-driven
Task memory
The task memory determines how tasks are saved and is stored in memory (MemoryJobStore) by default and is gone after a restart. APScheduler supports the following task stores:
- Apscheduler. Jobstores. Memory: memory
- Apscheduler. Jobstores. Mongo: stored in the mongo
- Apscheduler. Jobstores. Redis: stored in redis
- Apscheduler. Jobstores. Rethinkdb: stored in rethinkdb
- Apscheduler. Jobstores. Sqlalchemy: support sqlalchemy database such as mysql, sqlite, etc
- Apscheduler. Jobstores. They are: they are
Different task storage can be configured in the scheduler configuration (see Scheduler)
The scheduler
The scheduler modes supported by APScheduler are as follows, BlockingScheduler and BackgroundScheduler are commonly used
- BlockingScheduler: Applies when the scheduler is the only running process in the process. Calling start blocks the current thread and cannot return immediately.
- BackgroundScheduler: Applies when the scheduler runs in the background of the application. The main thread does not block after a call to start.
- AsyncIOScheduler: Applies to applications that use the Asyncio module.
- GeventScheduler: For applications that use the GEvent module.
- TwistedScheduler: For applications that build Twisted.
- QtScheduler: For applications that build Qt.
From the previous example, we saw that the scheduler can manipulate tasks (and specify triggers, task storage, and actuators for tasks) and monitor tasks.
scheduler.add_job(job_func, trigger='interval', args=[1].id='1', name='a test job', max_instances=10, jobstore='default', executor='default', seconds=10)
Copy the code
Let’s take a look at each part in detail
-
Scheduler configuration: In add_job, we see that jobStore and executor are both default. APScheduler can specify different task stores and executors as well as initial parameters when defining the scheduler
from pytz import utc from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.mongodb import MongoDBJobStore from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor Run different JobStores, Executors, and default parameters using dict jobstores = { 'mongo': MongoDBJobStore(), 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') } executors = { 'default': ThreadPoolExecutor(20), 'processpool': ProcessPoolExecutor(5) } job_defaults = { 'coalesce': False.'max_instances': 3 } Define the scheduler scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc) def job_func(job_id) : print('job %s is runed at %s' % (job_id, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) # add task scheduler.add_job(job_func, trigger='interval', args=[1].id='1', name='a test job', jobstore='default', executor='processpool', seconds=10) Start the scheduler scheduler.start() Copy the code
-
Action Tasks: The scheduler can add, delete, pause, resume, and modify tasks. Note that the operations only take effect on unexecuted tasks. Executed and ongoing tasks are not affected by these operations.
-
-
add_job
scheduler.add_job(job_func, trigger='interval', args=[1].id='1', name='a test job', max_instances=10, jobstore='default', executor='default', seconds=10) Copy the code
-
Remove_job: Based on the unique ID of a job, records in the task storage will be deleted when the job is deleted
scheduler.add_job(myfunc, 'interval', minutes=2.id='my_job_id') scheduler.remove_job('my_job_id') Copy the code
-
Pausing and root jobs: Pauses and restarts tasks
scheduler.add_job(myfunc, 'interval', minutes=2.id='my_job_id') scheduler.pause_job('my_job_id') scheduler.resume_job('my_job_id') Copy the code
-
Modifying Jobs: Modifies the configuration of tasks
job = scheduler.add_job(myfunc, 'interval', minutes=2.id='my_job_id', max_instances=10) Modify the attributes of the task job.modify(max_instances=6, name='Alternate name') Change the trigger of the task scheduler.reschedule_job('my_job_id', trigger='cron', minute='* / 5') Copy the code
-
-
Monitoring task event types. Common types are as follows:
-
-
EVENT_JOB_ERROR: Indicates that an exception occurs during task execution
-
EVENT_JOB_EXECUTED: when a task is executed successfully
-
EVENT_JOB_MAX_INSTANCES: when the number of tasks performed on the scheduler exceeds the configured value
scheduler.add_listener(job_exception_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) Copy the code
-
Reference: apscheduler. Readthedocs. IO/en/stable/u…
The author is the director of the weDO Maker Lab official account.
Recommended reading:
Forecasting COVID-19 with Python \
Writing music in Python \
Use Pyecharts to draw the epidemic map of China (attached source code) \
Technical circle core call for contributions, up to 1000 yuan/article! \