This article is participating in Python Theme Month. See the link for details
Introduction:
In a work scenario, a cache interface needs to be periodically implemented to synchronize device configuration. The first thing that comes to mind is the Crontab on Linux, which can execute tasks on a regular basis or at intervals. But if you want to integrate the scheduled task as a module into a Python project, or if you want to persist the task, crontab is obviously not a good fit. Python’s APScheduler module can solve such problems well, so THIS article will record the most basic usage scenarios of APScheduler from a simple introduction, and solve the problems of persistent tasks. Finally, it will combine other frameworks to further customize the scheduled task module.
A simple introduction
The Apscheduler module consists of four components:
- The Trigger flip-flop
- Job assignments
- Excutor actuators
- The Scheduler Scheduler
With an overview of the concepts involved in Apscheduler, let’s take a look at a simple example:
# -*- coding: utf-8 -*-
from apscheduler.schedulers.blocking import BlockingScheduler
import time
def hello() :
print(time.strftime("%c"))
if __name__ == "__main__":
scheduler = BlockingScheduler()
scheduler.add_job(hello, 'interval', seconds=5)
scheduler.start()
Copy the code
Output from the example:
Thu Dec 3 16:01:20 2020 Thu Dec 3 16:01:25 2020 Thu Dec 3 16:01:30 2020 Thu Dec 3 16:01:35 2020 Thu Dec 3 16:01:40 2020 .Copy the code
For this simple example, let’s examine the run logic with the components mentioned above:
- The first is the BlockingScheduler. This example uses the BlockingScheduler. As explained in the official documentation, BlockingScheduler works when your scheduled task is the only program running; In other words, the BlockingScheduler is a BlockingScheduler. When an application runs the BlockingScheduler, the process blocks and cannot perform other operations.
- The second one is jobs and triggers, and we put them together because when you define a Job, you need to choose a trigger, and in this case interval, which will run the Job at a fixed interval. In other words, add a hello job to the scheduler and execute the job at five-second intervals.
- Finally, there are the executors, which by default are ThreadPoolExcutor executors, which hand the callable objects in the task to the thread pool for action, and then notify the scheduler when the action is complete.
There are three built-in Trigger types:
- Date: A job is run only once at a specified time
- Interval: Runs a job at a fixed interval
- Cron: To run a job periodically at a specific time of day
Common Scheduler schedulers:
- BlockingScheduler: The scheduler is the only thing running in the process
- BackgroundScheduler: The scheduler is used when running in the background inside the application
- AsyncIOScheduler: Applications use the asyncio module
- GeventScheduler: The application uses the GEvent module
- TornadoScheduler: Used when building Tornado applications
- TwistedScheduler: Used when building Tornado applications
- QtScheduler: Used when building QT applications
Common JobStore:
- MemoryJobStore
- MongoDBJobStore
- SQLAlchemyJobStore
- RedisJobStore
Use the advanced
The Flask Web framework uses the Apscheduler timer to execute tasks on a regular basis.
# -*- coding: utf-8 -*-
from flask import Flask, Blueprint, request
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.redis import RedisJobStore
import time
app = Flask(__name__)
executors = {"default": ThreadPoolExecutor(5)}
default_redis_jobstore = RedisJobStore(db=2,
jobs_key="apschedulers.default_jobs",
run_times_key="apschedulers.default_run_times",
host = '127.0.0.1',
port = 6379
)
scheduler = BackgroundScheduler(executors=executors)
scheduler.add_jobstore(default_redis_jobstore)
scheduler.start()
def say_hello() :
print(time.strftime("%c"))
@app.route("/get_job", methods=['GET'])
def get_job() :
if scheduler.get_job("say_hello_test") :return "YES"
else:
return "NO"
@app.route("/start_job", methods=["GET"])
def start_job() :
if not scheduler.get_job("say_hello_test"):
scheduler.add_job(say_hello, "interval", seconds=5.id="say_hello_test")
return "Start Scuessfully!"
else:
return "Started Failed"
@app.route("/remove_job", methods=["GET"])
def remove_job() :
if scheduler.get_job("say_hello_test"):
scheduler.remove_job("say_hello_test")
return "Delete Successfully!"
else:
return "Delete Failed"
if __name__ == "__main__":
app.run(host="127.0.0.1", port=8787, debug=True)
Copy the code
Code details:
- First analyze Jobstore, RedisJobstore is used here, serialize the task into Redis database. By the way, the reason why it is necessary to set the job memory is that when the scheduler program crashes, the job can still be retained. Of course, the choice of job memory can be based on the specific working scenario, currently the mainstream mysql, mongodb, Redis, SQLite basically support;
- The BackgroundScheduler is used here. Because the Scheduler is required not to block normal flask requests, the BackgrounScheduler is used so that it runs in the background when the task starts and does not block the main thread.
- Get_job retrieves the job status and checks whether the job exists. Start_job checks whether the job is started and then starts the operation. Remove_job stops the job. The job definition here is to execute the say_hello task every five seconds with an interval trigger;
conclusion
To summarize the entire use of timers, first you need to set up a job store in which you can retrieve the job to continue if the scheduler crashes and resumes. Then you need to set up an executor. Depending on the type of job, for example, a CPU-intensive task, you can use a process pool executor. The default is a thread pool executor. Finally create the configuration scheduler, start scheduling, you can add jobs before startup, can also add, delete, get jobs after startup. (It is important to understand here that the application does not directly manipulate the job store, job, or actuator, but rather that the scheduler provides the appropriate interfaces to handle these interfaces.) ApScheduler is a good scheduled task library, which can be dynamically added and deleted. It also supports different trigger types, which is also its advantage. On the contrary, for static tasks, crontab tool like Linux can be used to perform scheduled tasks. The record of this aspect will be updated continuously. If you have any questions, please feel free to discuss with us.