APScheduler (Advanceded Python Scheduler) is a scheduled task tool developed by Python.

The document address apscheduler. Readthedocs. IO/en/latest/u…

Features:

  • The crontab system is scheduled independently of the Linux operating system

  • You can dynamically add a new scheduled task, which must be paid within 30 minutes of the next order, otherwise cancel the order, you can use this tool (add a scheduled task for this order every time you place an order)

  • You can persist the added scheduled task

1 installation

pip install apschedulerCopy the code

2 of

APScheduler consists of the following four parts:

  • Triggers Specifies when a scheduled task is executed

  • Job stores stores can persist timing

  • Executors Execute scheduled tasks in process or thread mode

  • Common schedulers include BackgroundScheduler and BlockingScheduler.

3 Usage Mode

from apscheduler.schedulers.background import BlockingScheduler

Create a scheduler object for the scheduled task
scheduler = BlockingScheduler()
Create an executor
executors = {
    'default': ThreadPoolExecutor(20),}Define a scheduled task
def my_job(param1, param2):  The # argument is passed through the add_job()args pass
    print(param1)  # 100
    print(param2)  # python

Add a scheduled task to the scheduler
scheduler.add_job(my_job, 'date', args=[100.'python'], executors=executors)

Start the scheduled task scheduler
scheduler.start()Copy the code

4 Scheduler Scheduler

Manage scheduled tasks

  • BlockingScheduler: Used as a standalone process

    from apscheduler.schedulers.blocking import BlockingScheduler
    
    scheduler = BlockingScheduler()
    scheduler.start()  The program will block hereCopy the code
  • BackgroundScheduler: Used in framework programs such as Django, Flask.

    from apscheduler.schedulers.background import BackgroundScheduler
    
    scheduler = BackgroundScheduler()
    scheduler.start()  # the program does not blockCopy the code
  • AsyncIOScheduler: Used when your program uses Asyncio.

  • GeventScheduler: Used when your application uses GEvent.

  • TornadoScheduler: Used when your app is based on Tornado.

  • TwistedScheduler: Used when your program uses Twisted

  • QtScheduler: Can be used if your application is a Qt application.

4 Actuator Executors

When a scheduled task is executed, the task is executed in process or thread mode

  • ThreadPoolExecutor

    from apscheduler.executors.pool import ThreadPoolExecutor
    ThreadPoolExecutor(max_workers)  Copy the code

    Method of use

    from apscheduler.executors.pool import ThreadPoolExecutor
    
    executors = {
          'default': ThreadPoolExecutor(20) A maximum of 20 threads execute simultaneously
      }
      scheduler = BackgroundScheduler(executors=executors)Copy the code
  • ProcessPoolExecutor  

    from apscheduler.executors.pool import ProcessPoolExecutor
    ProcessPoolExecutor(max_workers)Copy the code

    Method of use

    from apscheduler.executors.pool import ProcessPoolExecutor
    
    executors = {
          'default': ProcessPoolExecutor(5) A maximum of 5 processes can be executed simultaneously
      }
    
    scheduler = BackgroundScheduler(executors=executors)Copy the code

5 Trigger Trigger

Specify the execution time of a scheduled task.

1) Date is executed at a specific time and date

from datetime import date

# implemented on November 6, 2019 at 00:00:00
sched.add_job(my_job, 'date', run_date=date(2019.11.6))

# On November 6, 2019 at 16:30:05
sched.add_job(my_job, 'date', run_date=datetime(2009.11.6.16.30.5))
sched.add_job(my_job, 'date', run_date='the 2009-11-06 16:30:05')

# execute immediately
sched.add_job(my_job, 'date')  
sched.start()Copy the code

2) Interval executes at a specified interval

  • Weeks (int) – Number of weeks to wait

  • Days (int) – Number of days to wait

  • Hours (int) – Number of hours to wait

  • Minutes (int) – Number of minutes to wait

  • Seconds (int) – Number of seconds to wait

  • The start_date (datetime | STR) – starting point for the interval calculation

  • End_date (datetime | STR) – the latest possible date/time to trigger on

  • Timezone (datetime tzinfo | STR) – time zone to use for the date/time calculations

from datetime import datetime

# execute every 2 hours
sched.add_job(job_function, 'interval', hours=2)

From 09:30:00, October 10, 2012 to 11:00:00, June 15, 2014, every two hours
sched.add_job(job_function, 'interval', hours=2, start_date='the 2012-10-10 09:30:00', end_date='the 2014-06-15 11:00:00')Copy the code

3) CrON executes according to the specified period

  • Year (int | STR) – 4 – digit year

  • The month (int | STR) – the month (1-12)

  • – day of the day (int | STR) (1-31)

  • Week (int | STR) – ISO week (1 to 53)

  • Day_of_week (int | STR) – the number or name of weekday (0 to 6 or mon, tue, wed and thu, fri, sat, sun)

  • Hour (int | STR) – hour (0-23)

  • Minute (int | STR) – minute (0-59)

  • The second (int | STR) – second (0-59)

  • The start_date (datetime | STR) – the earliest possible date/time to trigger on (inclusive)

  • End_date (datetime | STR) – the latest possible date/time to trigger on (inclusive)

  • Timezone (datetime tzinfo | STR) – time zone to use for the date/time calculations (defaults to the scheduler timezone)

Execute at 00:00, 01:00, 02:00 and 03:00 on the third Friday of June, July, August, November and December
sched.add_job(job_function, 'cron', month='6-8, 11-12', day='3rd fri', hour='0-3')

# will be executed Monday through Friday at 5:30 before May 30, 2014
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')Copy the code

6. Task storage

  • MemoryJobStore Default memory storage

  • Save the MongoDBJobStore task to MongoDB

    from apscheduler.jobstores.mongodb import MongoDB
    
    JobStoreMongoDBJobStore()Copy the code
  • RedisJobStore Jobs are saved to Redis

    from apscheduler.jobstores.redis import RedisJobStore
    RedisJobStore()Copy the code

7 Configuration Methods

Method 1

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor

executors = {
    'default': ThreadPoolExecutor(20),
}
conf = { # redis configuration
    "host":127.0. 01.."port":6379."db":15.Connect to database 15
    "max_connections":10 # Redis supports up to 300 connections
}
scheduler = BackgroundScheduler(executors=executors)
scheduler.add_jobstore(jobstore='redis', **conf) Add task persistent storage. If redis is not installed, skip this stepCopy the code

Method 2

from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor

executors = {
    'default': {'type': 'threadpool'.'max_workers': 20},
    'processpool': ProcessPoolExecutor(max_workers=5)
}

scheduler = BackgroundScheduler()

#.. You can write additional code here

Configure (
scheduler.configure(executors=executors)Copy the code

8 start

scheduler.start()Copy the code
  • For BlockingScheduler, the program blocks here to prevent exit and is used as a standalone process. (Can be used to generate static pages)

  • For BackgroundScheduler, it can be used in applications. No longer used as a separate process. (if cancelled within 30 minutes)

9 extension

Task management

Method 1

job = scheduler.add_job(myfunc, 'interval', minutes=2)  # add task
job.remove()  # delete task
job.pause() # Tentative tasks
job.resume()  # Restore taskCopy the code

Way 2

scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')  # add task
scheduler.remove_job('my_job_id')  # delete task
scheduler.pause_job('my_job_id')  # Tentative tasks
scheduler.resume_job('my_job_id')  # Restore taskCopy the code

Adjust the task scheduling period

job.modify(max_instances=6, name='Alternate name')

scheduler.reschedule_job('my_job_id', trigger='cron', minute='* / 5')Copy the code

Stop the APScheduler

scheduler.shutdown()Copy the code

10 Comprehensive use

A 30-minute order cancellation is implemented by Flask or a Django application, which dynamically adds a scheduled task to a Django application using the BackgroundScheduler. Let’s first define the task that performs the order cancellation.

from apscheduler.executors.pool import ThreadPoolExecutor
from datetime import datetime, timedelta
from apscheduler.schedulers.blocking import BackgroundScheduler

from goods.models import SKU
from orders.models import OrderGoods


def cancel_order_job(order_id, sku_id, stock, sales):
    Filter out order items and order information
    order_goods = OrderGoods.objects.filter( order_id=order_id, sku_id=sku_id)
    order_goods.delete() # delete order
    try:
        sku = SKU.objects.get(id=sku_id)
        sku.stock += stock  The inventory in the goods list is restored after the order is deleted
        sku.sales -= sales  # Goods inside sales restore
        sku.save()
    except Exception as e:
        print(e)
         Copy the code

The specific operation of which table to determine according to the design of their own table, is roughly the above idea. The cancel order task is then also generated in the view that generates the order. Then pass in the parameters needed to cancel the order cancel_order_JOB (), noting that the current order is in an unpaid state.

From datetime import datetime, timedelta class OrderCommitView(View) : def post(self, request): #... Relevant logic if the status is omitted to generate order = = OrderInfo. Status. UNPADED: # to pay state executors = {' default ': ThreadPoolExecutor(10)} now = datetime.now() delay = now + timedelta(minutes=30 Add_job (cancel_order_job, 'date', run_date=delay, args=[order_id, sku.id, sku.stock, sku.sales]) scheduler.start() # .... Omit other business and returnCopy the code

Note: If you need to perform a scheduled task periodically, you need to import the framework configuration information if you use django model classes or Flask configuration information. In Flask, you also import the context.