APScheduler (Advanceded Python Scheduler) is a scheduled task tool developed by Python.
The document address apscheduler. Readthedocs. IO/en/latest/u…
Features:
-
The crontab system is scheduled independently of the Linux operating system
-
You can dynamically add a new scheduled task, which must be paid within 30 minutes of the next order, otherwise cancel the order, you can use this tool (add a scheduled task for this order every time you place an order)
-
You can persist the added scheduled task
1 installation
pip install apschedulerCopy the code
2 of
APScheduler consists of the following four parts:
-
Triggers Specifies when a scheduled task is executed
-
Job stores stores can persist timing
-
Executors Execute scheduled tasks in process or thread mode
-
Common schedulers include BackgroundScheduler and BlockingScheduler.
3 Usage Mode
from apscheduler.schedulers.background import BlockingScheduler
Create a scheduler object for the scheduled task
scheduler = BlockingScheduler()
Create an executor
executors = {
'default': ThreadPoolExecutor(20),}Define a scheduled task
def my_job(param1, param2): The # argument is passed through the add_job()args pass
print(param1) # 100
print(param2) # python
Add a scheduled task to the scheduler
scheduler.add_job(my_job, 'date', args=[100.'python'], executors=executors)
Start the scheduled task scheduler
scheduler.start()Copy the code
4 Scheduler Scheduler
Manage scheduled tasks
-
BlockingScheduler: Used as a standalone process
from apscheduler.schedulers.blocking import BlockingScheduler scheduler = BlockingScheduler() scheduler.start() The program will block hereCopy the code
-
BackgroundScheduler: Used in framework programs such as Django, Flask.
from apscheduler.schedulers.background import BackgroundScheduler scheduler = BackgroundScheduler() scheduler.start() # the program does not blockCopy the code
-
AsyncIOScheduler: Used when your program uses Asyncio.
-
GeventScheduler: Used when your application uses GEvent.
-
TornadoScheduler: Used when your app is based on Tornado.
-
TwistedScheduler: Used when your program uses Twisted
-
QtScheduler: Can be used if your application is a Qt application.
4 Actuator Executors
When a scheduled task is executed, the task is executed in process or thread mode
-
ThreadPoolExecutor
from apscheduler.executors.pool import ThreadPoolExecutor ThreadPoolExecutor(max_workers) Copy the code
Method of use
from apscheduler.executors.pool import ThreadPoolExecutor executors = { 'default': ThreadPoolExecutor(20) A maximum of 20 threads execute simultaneously } scheduler = BackgroundScheduler(executors=executors)Copy the code
-
ProcessPoolExecutor
from apscheduler.executors.pool import ProcessPoolExecutor ProcessPoolExecutor(max_workers)Copy the code
Method of use
from apscheduler.executors.pool import ProcessPoolExecutor executors = { 'default': ProcessPoolExecutor(5) A maximum of 5 processes can be executed simultaneously } scheduler = BackgroundScheduler(executors=executors)Copy the code
5 Trigger Trigger
Specify the execution time of a scheduled task.
1) Date is executed at a specific time and date
from datetime import date
# implemented on November 6, 2019 at 00:00:00
sched.add_job(my_job, 'date', run_date=date(2019.11.6))
# On November 6, 2019 at 16:30:05
sched.add_job(my_job, 'date', run_date=datetime(2009.11.6.16.30.5))
sched.add_job(my_job, 'date', run_date='the 2009-11-06 16:30:05')
# execute immediately
sched.add_job(my_job, 'date')
sched.start()Copy the code
2) Interval executes at a specified interval
-
Weeks (int) – Number of weeks to wait
-
Days (int) – Number of days to wait
-
Hours (int) – Number of hours to wait
-
Minutes (int) – Number of minutes to wait
-
Seconds (int) – Number of seconds to wait
-
The start_date (datetime | STR) – starting point for the interval calculation
-
End_date (datetime | STR) – the latest possible date/time to trigger on
-
Timezone (datetime tzinfo | STR) – time zone to use for the date/time calculations
from datetime import datetime
# execute every 2 hours
sched.add_job(job_function, 'interval', hours=2)
From 09:30:00, October 10, 2012 to 11:00:00, June 15, 2014, every two hours
sched.add_job(job_function, 'interval', hours=2, start_date='the 2012-10-10 09:30:00', end_date='the 2014-06-15 11:00:00')Copy the code
3) CrON executes according to the specified period
-
Year (int | STR) – 4 – digit year
-
The month (int | STR) – the month (1-12)
-
– day of the day (int | STR) (1-31)
-
Week (int | STR) – ISO week (1 to 53)
-
Day_of_week (int | STR) – the number or name of weekday (0 to 6 or mon, tue, wed and thu, fri, sat, sun)
-
Hour (int | STR) – hour (0-23)
-
Minute (int | STR) – minute (0-59)
-
The second (int | STR) – second (0-59)
-
The start_date (datetime | STR) – the earliest possible date/time to trigger on (inclusive)
-
End_date (datetime | STR) – the latest possible date/time to trigger on (inclusive)
-
Timezone (datetime tzinfo | STR) – time zone to use for the date/time calculations (defaults to the scheduler timezone)
Execute at 00:00, 01:00, 02:00 and 03:00 on the third Friday of June, July, August, November and December
sched.add_job(job_function, 'cron', month='6-8, 11-12', day='3rd fri', hour='0-3')
# will be executed Monday through Friday at 5:30 before May 30, 2014
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')Copy the code
6. Task storage
-
MemoryJobStore Default memory storage
-
Save the MongoDBJobStore task to MongoDB
from apscheduler.jobstores.mongodb import MongoDB JobStoreMongoDBJobStore()Copy the code
-
RedisJobStore Jobs are saved to Redis
from apscheduler.jobstores.redis import RedisJobStore RedisJobStore()Copy the code
7 Configuration Methods
Method 1
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor
executors = {
'default': ThreadPoolExecutor(20),
}
conf = { # redis configuration
"host":127.0. 01.."port":6379."db":15.Connect to database 15
"max_connections":10 # Redis supports up to 300 connections
}
scheduler = BackgroundScheduler(executors=executors)
scheduler.add_jobstore(jobstore='redis', **conf) Add task persistent storage. If redis is not installed, skip this stepCopy the code
Method 2
from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor
executors = {
'default': {'type': 'threadpool'.'max_workers': 20},
'processpool': ProcessPoolExecutor(max_workers=5)
}
scheduler = BackgroundScheduler()
#.. You can write additional code here
Configure (
scheduler.configure(executors=executors)Copy the code
8 start
scheduler.start()Copy the code
-
For BlockingScheduler, the program blocks here to prevent exit and is used as a standalone process. (Can be used to generate static pages)
-
For BackgroundScheduler, it can be used in applications. No longer used as a separate process. (if cancelled within 30 minutes)
9 extension
Task management
Method 1
job = scheduler.add_job(myfunc, 'interval', minutes=2) # add task
job.remove() # delete task
job.pause() # Tentative tasks
job.resume() # Restore taskCopy the code
Way 2
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id') # add task
scheduler.remove_job('my_job_id') # delete task
scheduler.pause_job('my_job_id') # Tentative tasks
scheduler.resume_job('my_job_id') # Restore taskCopy the code
Adjust the task scheduling period
job.modify(max_instances=6, name='Alternate name')
scheduler.reschedule_job('my_job_id', trigger='cron', minute='* / 5')Copy the code
Stop the APScheduler
scheduler.shutdown()Copy the code
10 Comprehensive use
A 30-minute order cancellation is implemented by Flask or a Django application, which dynamically adds a scheduled task to a Django application using the BackgroundScheduler. Let’s first define the task that performs the order cancellation.
from apscheduler.executors.pool import ThreadPoolExecutor
from datetime import datetime, timedelta
from apscheduler.schedulers.blocking import BackgroundScheduler
from goods.models import SKU
from orders.models import OrderGoods
def cancel_order_job(order_id, sku_id, stock, sales):
Filter out order items and order information
order_goods = OrderGoods.objects.filter( order_id=order_id, sku_id=sku_id)
order_goods.delete() # delete order
try:
sku = SKU.objects.get(id=sku_id)
sku.stock += stock The inventory in the goods list is restored after the order is deleted
sku.sales -= sales # Goods inside sales restore
sku.save()
except Exception as e:
print(e)
Copy the code
The specific operation of which table to determine according to the design of their own table, is roughly the above idea. The cancel order task is then also generated in the view that generates the order. Then pass in the parameters needed to cancel the order cancel_order_JOB (), noting that the current order is in an unpaid state.
From datetime import datetime, timedelta class OrderCommitView(View) : def post(self, request): #... Relevant logic if the status is omitted to generate order = = OrderInfo. Status. UNPADED: # to pay state executors = {' default ': ThreadPoolExecutor(10)} now = datetime.now() delay = now + timedelta(minutes=30 Add_job (cancel_order_job, 'date', run_date=delay, args=[order_id, sku.id, sku.stock, sku.sales]) scheduler.start() # .... Omit other business and returnCopy the code
Note: If you need to perform a scheduled task periodically, you need to import the framework configuration information if you use django model classes or Flask configuration information. In Flask, you also import the context.