说到定时任务,大家肯定会想起使用linux的crontab,或者windows自带的任务计划。虽然他们都可以实现定时任务,但是如果想要实现任务的精细化灵活控制,以及任务程序跨平台运行,此时就需要使用定时任务框架去实现。Python的apscheduler就提供了非常丰富而且方便易用的定时任务接口,本文介绍如何使用 apscheduler 实现你的定时任务。
简介
定时任务库对比
库名称 | 简介 | 优点 | 缺点 |
Apscheduler | 基于Quartz的一个Python定时任务框架,提供了基于日期、固定时间间隔以及crontab类型的任务,并且可以持久化作业 | 支持定时、定期、一次性任务,支持任务持久化及动态添加 | 配置可选项较多,配置起来较为复杂,有一定的学习成本。 |
Celery | 是一个简单,灵活,可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具, 也可用于任务调度 | 支持配置定期任务、支持 crontab 模式配置 | 不支持一次性定时任务,单独为定时任务功能而搭建celery显得过于重量级。 |
schedule | 轻量级,无需配置的作业调度库 | 轻量级、无需配置、语法简单 | 阻塞式调用、无法动态添加或删除任务,无任务状态存储 |
python-crontab | 针对系统 Cron 操作 crontab 文件的作业调度库 | 支持定时、定期任务,能够动态添加任务 | 不能实现一次性任务需求,没有状态存储,无法跨平台执行 |
APScheduler概念与组件
触发器(trigger)
触发器包含调度逻辑,描述一个任务何时被触发,按日期或按时间间隔或按 cronjob 表达式三种方式触发。每个作业都有它自己的触发器,除了初始配置之外,触发器是完全无状态的。
任务存储器(job stores)
任务存储器指定了作业被存放的位置,默认情况下作业保存在内存,也可将作业保存在各种数据库中,当任务被存放在数据库中时,它会被序列化,当被重新加载时会反序列化。作业存储器充当保存、加载、更新和查找作业的中间商。在调度器之间不能共享作业存储。
执行器(executors)
执行器是将指定的作业(调用函数)提交到线程池或进程池中运行,当任务完成时,执行器通知调度器触发相应的事件。
调度器(schedulers)
任务调度器,属于控制角色,通过它配置作业存储器、执行器和触发器,添加、修改和删除任务。调度器协调触发器、作业存储器、执行器的运行,通常只有一个调度程序运行在应用程序中,开发人员通常不需要直接处理作业存储器、执行器或触发器,配置作业存储器和执行器是通过调度器来完成的。
工作流程
APScheduler快速上手
安装
安装非常简单,通过pip install apscheduler即可。
快速体验
# main.py
# 导入调度器,此处使用BlockingScheduler阻塞调度器
from apscheduler.schedulers.blocking import BlockingScheduler
# 导入触发器,此处使用IntervalTrigger特定时间间隔触发
from apscheduler.triggers.interval import IntervalTrigger
# 导入日志记录器
from loguru import logger
# 定时任务执行函数
def my_task():
logger.info(开始执行任务)
if __name__ == __main__:
# 实例化调度器对象
scheduler = BlockingScheduler()
# 添加定时任务,指定任务函数和触发器
scheduler.add_job(my_task, trigger=IntervalTrigger(seconds=3))
# 开始执行定时任务
scheduler.start()
- 执行结果
/Users/cuiliang/PycharmProjects/test/venv/bin/python /Users/cuiliang/PycharmProjects/test/crontab.py
:: | INFO | __main__:my_task:7 - 开始执行任务
:: | INFO | __main__:my_task:7 - 开始执行任务
:: | INFO | __main__:my_task:7 - 开始执行任务
APScheduler配置详解
触发器(triggers)
APScheduler支持的触发器主要有:
- DateTrigger:日期触发器。日期触发器主要是在某一日期时间点上运行任务时调用,是 APScheduler 里面最简单的一种触发器。所以通常也适用于一次性的任务或作业调度。
# 指定任务在年8月日执行
scheduler.add_job(my_task, trigger=DateTrigger(run_date=date, 8, ), timezone=Asia/Shanghai))
# 指定任务在年8月日8时5分秒执行
scheduler.add_job(my_task, trigger=DateTrigger(run_date=datetime, 8, , 8, 8, ), timezone=Asia/Shanghai))
# 指定任务在年8月日8时5分秒执行
scheduler.add_job(my_task, trigger=DateTrigger(run_date= ::, timezone=Asia/Shanghai))
- IntervalTrigger:间隔触发器。间隔触发器是在日期触发器基础上扩展了对时间部分,比如时、分、秒、天、周这几个部分的设定。是我们用以对重复性任务进行设定或调度的一个常用调度器。设定了时间部分之后,从起始日期开始(默认是当前)会按照设定的时间去执行任务。
interval触发器支持设置如下参数:
参数 | 含义 | 类型 |
weeks | 周 | 整形 |
days | 一个月中的第几天 | 整形 |
hours | 小时 | 整形 |
minutes | 分钟 | 整形 |
seconds | 秒 | 整形 |
start_date | 间隔触发的起始时间 | 时间格式字符串 |
end_date | 间隔触发的结束时间 | 时间格式字符串 |
jitter | 触发的时间误差 | 整形 |
# 指定任务每分钟执行一次
scheduler.add_job(my_task, trigger=IntervalTrigger(minutes=, timezone=Asia/Shanghai))
# 指定任务在年8月日9时到时区间内,每分钟执行一次
scheduler.add_job(my_task, trigger=IntervalTrigger(minutes=, start_date= ::,
end_date= ::, timezone=Asia/Shanghai))
- CronTrigger:cron 表达式触发器。cron 表达式触发器就等价于我们 Linux 上的 crontab,它主要用于更复杂的日期时间进行设定。
cron触发器支持设置如下参数:
参数 | 含义 |
year | 4位数字的年份 |
month | 月份 |
day | 日 |
week | 周 |
day_of_week | 一个礼拜中的第几天( 或者 mon、 tue、 wed、 thu、 fri、 sat、 sun) |
hour | 小时 |
minute | 分钟 |
second | 秒 |
start_date | datetime类型或者字符串类型,起始时间 |
end_date | datetime类型或者字符串类型,结束时间 |
timezone | 时区 |
jitter | 任务触发的误差时间 |
也可以用表达式类型,可以用以下方式:
表达式 | 字段 | 描述 |
* | 任何 | 在每个值都触发 |
*/a | 任何 | 每隔a触发一次 |
a-b | 任何 | 在a-b区间内任何一个时间触发 |
a-b/c | 任何 | 在a-b区间内每隔c触发一次 |
xth y | day | 在x个星期y触发 |
last x | day | 在最后一个星期x触发 |
last | day | 在一个月中的最后一天触发 |
x,y,z | 任何 | 将上面的表达式进行组合 |
# 指定任务在月和月,每个月第三个星期5那天的点每2个小时执行一次
scheduler.add_job(my_task,
trigger=CronTrigger(month=,, day=3th 5, hour=/2, timezone=Asia/Shanghai))
# 使用crontab表达式,指定任务在每天日每天0点0分执行一次
scheduler.add_job(my_task, trigger=CronTrigger.from_crontab( * *, timezone=Asia/Shanghai))
调度器(schedulers)
APScheduler 提供了以下几种调度器:
- BlockingScheduler:阻塞调度器,当程序中没有任何存在主进程之中运行东西时,就则使用该调度器。
from datetime import datetime
import os
from apscheduler.schedulers.blocking import BlockingScheduler
def tick():
print(Tick! The time is: %s % datetime.now())
if __name__ == __main__:
scheduler = BlockingScheduler()
scheduler.add_job(tick, interval, seconds=3)
print(Press Ctrl+{0} to exit.format(Break if os.name == nt else C))
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
pass
- BackgroundScheduler:后台调度器,使用单独的线程执行,在不使用后面任何的调度器且希望在应用程序内部运行时的后台启动时才进行使用,如当前你已经开启了一个 Django 或 Flask 服务。
from datetime import datetime
import time
import os
from apscheduler.schedulers.background import BackgroundScheduler
def tick():
print(Tick! The time is: %s % datetime.now())
if __name__ == __main__:
scheduler = BackgroundScheduler()
scheduler.add_job(tick, interval, seconds=3)
scheduler.start()
print(Press Ctrl+{0} to exit.format(Break if os.name == nt else C))
try:
# 模拟主线程持续运行。
while True:
time.sleep(2)
except (KeyboardInterrupt, SystemExit):
scheduler.shutdown()
- AsyncIOScheduler:AsyncIO 调度器,如果代码是通过 asyncio 模块进行异步操作,使用该调度器。
- GeventScheduler:Gevent 调度器,如果代码是通过 gevent 模块进行协程操作,使用该调度器
- TornadoScheduler:Tornado 调度器,在 Tornado 框架中使用
- TwistedScheduler:Twisted 调度器,在基于 Twisted 的框架或应用程序中使用
- QtScheduler:Qt 调度器,在构建 Qt 应用中进行使用。
通常情况下如果不是和 Web 项目或应用集成共存,那么往往都首选 BlockingScheduler 调度器来进行操作,它会在当前进程中启动相应的线程来进行任务调度与处理;反之,如果是和 Web 项目或应用共存,那么需要选择 BackgroundScheduler 调度器,因为它不会干扰当前应用的线程或进程状况。
执行器(executors)
APScheduler 提供了以下几种执行器:
- ThreadPoolExecutor:默认的线程池执行器。大部分情况下是可以满足我们需求
- ProcessPoolExecutor:进程池执行器。涉及到一些 CPU密集计算的操作,使用此执行器
- AsyncIOExecutor:asyncio程序执行器,如果代码是通过 asyncio 模块进行异步操作,使用该执行器。
- TornadoExecutor:Tornado程序执行器,在 Tornado 框架中使用
- TwistedExecutor:Twisted程序执行器,在基于 Twisted 的框架或应用程序中使用
- GeventExecutor:Gevent程序执行器,在Gevent框架中使用
任务存储器(job stores)
APScheduler支持的数据库主要有:
- sqlalchemy :关系型的数据库。这里就主要是指各种传统的关系型数据库,如 MySQL、PostgreSQL、SQLite 等。
# main.py
# 导入调度器,此处使用BlockingScheduler阻塞调度器
from apscheduler.schedulers.blocking import BlockingScheduler
# 导入触发器,此处使用IntervalTrigger特定时间间隔触发
from apscheduler.triggers.interval import IntervalTrigger
# 导入任务存储器,此处使用SQLAlchemyJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
# 导入日志记录器
from loguru import logger
# 定时任务执行函数
def my_task():
logger.info(开始执行任务)
if __name__ == __main__:
# 实例化调度器对象
scheduler = BlockingScheduler()
# 指定使用MySQL存储任务
url = mysql://root:.com@:/job?charset=utf8
scheduler.add_jobstore(jobstore=SQLAlchemyJobStore(url=url))
# 指定任务每分钟执行一次
scheduler.add_job(my_task, trigger=IntervalTrigger(seconds=, timezone=Asia/Shanghai))
# 开始执行定时任务
scheduler.start()
查看数据库表内容
- mongodb :非结构化Mongodb数据库。该类型数据库经常用于对非结构化或版结构化数据的存储或操作,如 JSON。
# main.py
# 导入调度器,此处使用BlockingScheduler阻塞调度器
from apscheduler.schedulers.blocking import BlockingScheduler
# 导入触发器,此处使用IntervalTrigger特定时间间隔触发
from apscheduler.triggers.interval import IntervalTrigger
# 导入任务存储器,此处使用MongoDBJobStore
from apscheduler.jobstores.mongodb import MongoDBJobStore
# 导入日志记录器
from loguru import logger
# 导入MongoDB客户端
from pymongo import MongoClient
# 定时任务执行函数
def my_task():
logger.info(开始执行任务)
if __name__ == __main__:
# 实例化调度器对象
scheduler = BlockingScheduler()
# 指定使用MongoDB存储任务
url = mongodb://:/
scheduler.add_jobstore(jobstore=MongoDBJobStore(client=MongoClient(host=url)))
# 指定任务每分钟执行一次
scheduler.add_job(my_task, trigger=IntervalTrigger(seconds=, timezone=Asia/Shanghai))
# 开始执行定时任务
scheduler.start()
查看数据库表内容
- redis :内存数据库。通常用作数据缓存来使用,当然通过一些主从复制等方式也能实现当中数据的持久化或保存。
# main.py
# 导入调度器,此处使用BlockingScheduler阻塞调度器
from apscheduler.schedulers.blocking import BlockingScheduler
# 导入触发器,此处使用IntervalTrigger特定时间间隔触发
from apscheduler.triggers.interval import IntervalTrigger
# 导入任务存储器,此处使用RedisJobStore
from apscheduler.jobstores.redis import RedisJobStore
# 导入日志记录器
from loguru import logger
# 定时任务执行函数
def my_task():
logger.info(开始执行任务)
if __name__ == __main__:
# 实例化调度器对象
scheduler = BlockingScheduler()
# 指定使用redis存储任务
REDIS = {
host: ,
port: ,
db: 0,
password: .com
}
scheduler.add_jobstore(jobstore=RedisJobStore(**REDIS))
# 指定任务每分钟执行一次
scheduler.add_job(my_task, trigger=IntervalTrigger(seconds=, timezone=Asia/Shanghai))
# 开始执行定时任务
scheduler.start()
查询redis数据库内容
常见job属性
- coalesce=True
比如由于某个原因导致某个任务积攒了很多次没有执行(比如有一个任务是1分钟跑一次,但是系统原因断了5分钟),如果 coalesce=True,那么下次恢复运行的时候,会只执行一次,而如果设置 coalesce=False,那么就不会合并,会5次全部执行
- max_instances=3
比如一个分钟的job,指定每分钟运行1次,如果max_instance=3,那么在第3~分钟上,新的运行实例不会被执行,因为已经有3个实例在运行。
- name=my_job
job的名称
- misfire_grace_time=3
这个参数的前提是使用可持续化的jobstore,如果使用默认内存的jobstore,这个参数是没有意义的。一般需要使用misfire_grace_time的场景,就是但是那个持久化jobstore的服务挂掉了,任务需要被调度的时候没有被调度成功,后期持久化的jobstore启动了,这个任务重新被调度了(从jobstore中获取job),misfire_grace_time决定这个任务在错过执行时间之后还需不需要执行
定时任务完整流程演示
选择合适的scheduler
在使用之前我们需要先实例化一个 scheduler 对象,所有的 scheduler 对象都被放在了 apscheduler.schedulers 模块下,根据需求选择BlockingScheduler或者BackgroundScheduler调度器引入即可,此处以最基础的阻塞调度器BlockingScheduler为例:
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
配置schedulers
对于 scheduler 属性的配置,支持以下的方式灵活配置:
- 在创建实例化对象时配置参数
- 在创建实例化之后再配置参数
对于scheduler参数的配置,支持以下的方式灵活配置:
- 使用配置字典参数配置
- 将关键字参数传递配置
假设现在有这样一个需求:
组件 | 模块 | 需求 |
调度器(schedulers) | 阻塞调度器(BlockingScheduler) | 为新任务关闭合并模式 |
触发器(triggers) | cron表达式触发器(CronTrigger) | 使用cron表达式,每分钟执行一次 |
执行器(executors) | 线程池执行器(ThreadPoolExecutor) | 最大个线程 |
任务存储器(job stores) | 关系型的数据库(sqlalchemy ) | 将结果保存到MySQL数据库 |
接下来分别使用属性+参数的四种组合演示如何配置schedulers
- 在创建实例化对象时配置参数+将关键字参数传递配置
# main.py
# 导入线程池执行器
from apscheduler.executors.pool import ThreadPoolExecutor
# 导入sqlalchemy,使用MySQL数据库存储
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
# 导入阻塞调度器
from apscheduler.schedulers.blocking import BlockingScheduler
# 导入cron表达式触发器
from apscheduler.triggers.cron import CronTrigger
# 导入日志模块
from loguru import logger
# 导入生成随机数模块
import random
# 创建定时任务执行函数
def my_task(number):
logger.info(开始执行任务,传入的随机数为%s % number)
# 作业存储器配置 使用MySQL数据库存储
job_stores = {
default: {
type: sqlalchemy,
url: mysql://root:.com@:/job?charset=utf8
}
}
# 执行器配置 使用线程池执行器,最大个线程
executors = {
default: ThreadPoolExecutor(),
}
# Job相关配置,更多选项参见官方文档
job_defaults = {
coalesce: False,
# 设置这个目的是,比如由于某个原因导致某个任务积攒了很多次没有执行(比如有一个任务是1分钟跑一次,但是系统原因断了5分钟),
# 如果 coalesce=True,那么下次恢复运行的时候,会只执行一次,而如果设置 coalesce=False,那么就不会合并,会5次全部执行
max_instances: 3
# 同一个任务同一时间最多只能有3个实例在运行。
# 比如一个分钟的job,指定每分钟运行1次,如果max_instance=3,那么在第3~分钟上,新的运行实例不会被执行,因为已经有3个实例在运行。
}
# 实例化调度器
scheduler = BlockingScheduler(
jobstores=job_stores,
executors=executors,
job_defaults=job_defaults,
timezone=Asia/Shanghai # 指定时区
)
if __name__ == __main__:
number = random.randint(0, 9)
scheduler.add_job(my_task, trigger=CronTrigger.from_crontab(* * * * *), args=[number])
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
logger.error(进程已结束运行)
- 在创建实例化对象时配置参数+使用配置字典参数配置
# main.py
# 导入线程池执行器
from apscheduler.executors.pool import ThreadPoolExecutor
# 导入sqlalchemy,使用MySQL数据库存储
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
# 导入阻塞调度器
from apscheduler.schedulers.blocking import BlockingScheduler
# 导入cron表达式触发器
from apscheduler.triggers.cron import CronTrigger
# 导入日志模块
from loguru import logger
# 导入生成随机数模块
import random
# 创建定时任务执行函数
def my_task(number):
logger.info(开始执行任务,传入的随机数为%s % number)
config = {
# 作业存储器配置 使用MySQL数据库存储
apscheduler.jobstores.default: {
type: sqlalchemy,
url: mysql://root:.com@:/job?charset=utf8
},
# 执行器配置 使用线程池执行器,最大个线程
apscheduler.executors.default: {
class: apscheduler.executors.pool:ThreadPoolExecutor,
max_workers:
},
# Job配置,为新任务关闭合并模式
apscheduler.job_defaults.coalesce: false,
# Job配置,同一个任务同一时间最多只能有3个实例在运行
apscheduler.job_defaults.max_instances: 3,
# Job配置,指定时区
apscheduler.timezone: Asia/Shanghai,
}
# 实例化调度器
scheduler = BlockingScheduler(config)
if __name__ == __main__:
# 生成随机数传入定时任务函数
number = random.randint(0, 9)
# 注册定时任务job,执行频率为每分钟执行一次
scheduler.add_job(my_task, trigger=CronTrigger.from_crontab(* * * * *), args=[number])
try:
# 开始执行定时任务调度器
scheduler.start()
except (KeyboardInterrupt, SystemExit):
logger.error(进程已结束运行)
- 在创建实例化之后再配置参数+将关键字参数传递配置
# main.py
# 导入线程池执行器
from apscheduler.executors.pool import ThreadPoolExecutor
# 导入sqlalchemy,使用MySQL数据库存储
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
# 导入阻塞调度器
from apscheduler.schedulers.blocking import BlockingScheduler
# 导入cron表达式触发器
from apscheduler.triggers.cron import CronTrigger
# 导入日志模块
from loguru import logger
# 导入生成随机数模块
import random
# 创建定时任务执行函数
def my_task(number):
logger.info(开始执行任务,传入的随机数为%s % number)
# 实例化调度器
scheduler = BlockingScheduler()
# 作业存储器配置 使用MySQL数据库存储
url = mysql://root:.com@:/job?charset=utf8
#
executors = {
default: ThreadPoolExecutor(),
}
# Job相关配置,更多选项参见官方文档
job_defaults = {
coalesce: False,
# 设置这个目的是,比如由于某个原因导致某个任务积攒了很多次没有执行(比如有一个任务是1分钟跑一次,但是系统原因断了5分钟),
# 如果 coalesce=True,那么下次恢复运行的时候,会只执行一次,而如果设置 coalesce=False,那么就不会合并,会5次全部执行
max_instances: 3
# 同一个任务同一时间最多只能有3个实例在运行。
# 比如一个分钟的job,指定每分钟运行1次,如果max_instance=3,那么在第3~分钟上,新的运行实例不会被执行,因为已经有3个实例在运行。
}
# 调度器对象配置参数
scheduler.configure(
job_defaults=job_defaults,
timezone=Asia/Shanghai)
# 添加任务存储器参数
scheduler.add_jobstore(jobstore=SQLAlchemyJobStore(url=url))
# 添加执行器参数,使用线程池执行器,最大个线程
scheduler.add_executor(executor=ThreadPoolExecutor(max_workers=))
if __name__ == __main__:
# 生成随机数传入定时任务函数
number = random.randint(0, 9)
# 注册定时任务job,执行频率为每分钟执行一次
scheduler.add_job(my_task, trigger=CronTrigger.from_crontab(* * * * *), args=[number])
try:
# 开始执行定时任务调度器
scheduler.start()
except (KeyboardInterrupt, SystemExit):
logger.error(进程已结束运行)
- 在创建实例化之后再配置参数+使用配置字典参数配置
# main.py
# 导入线程池执行器
from apscheduler.executors.pool import ThreadPoolExecutor
# 导入sqlalchemy,使用MySQL数据库存储
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
# 导入阻塞调度器
from apscheduler.schedulers.blocking import BlockingScheduler
# 导入cron表达式触发器
from apscheduler.triggers.cron import CronTrigger
# 导入日志模块
from loguru import logger
# 导入生成随机数模块
import random
# 创建定时任务执行函数
def my_task(number):
logger.info(开始执行任务,传入的随机数为%s % number)
# 实例化调度器
scheduler = BlockingScheduler()
config = {
# 作业存储器配置 使用MySQL数据库存储
apscheduler.jobstores.default: {
type: sqlalchemy,
url: mysql://root:.com@:/job?charset=utf8
},
# 执行器配置 使用线程池执行器,最大个线程
apscheduler.executors.default: {
class: apscheduler.executors.pool:ThreadPoolExecutor,
max_workers:
},
# Job配置,为新任务关闭合并模式
apscheduler.job_defaults.coalesce: false,
# Job配置,同一个任务同一时间最多只能有3个实例在运行
apscheduler.job_defaults.max_instances: 3,
# Job配置,指定时区
apscheduler.timezone: Asia/Shanghai,
}
# 调度器对象配置参数
scheduler.configure(config)
if __name__ == __main__:
# 生成随机数传入定时任务函数
number = random.randint(0, 9)
# 注册定时任务job,执行频率为每分钟执行一次
scheduler.add_job(my_task, trigger=CronTrigger.from_crontab(* * * * *), args=[number])
try:
# 开始执行定时任务调度器
scheduler.start()
except (KeyboardInterrupt, SystemExit):
logger.error(进程已结束运行)
添加调度任务
创建 scheduler 对象之后,我们需要调用add_job()方法或是装饰器scheduled_job方法来将我们需要执行的函数进行注册。
以上面的demo代码为例,使用add_job()方法,指定执行任务函数,触发器既可添加调度任务。
运行调度任务
调用 start() 方法之后调度器就会开始执行,并在控制台上看到对应的结果
:: | INFO | __main__:my_task: - 开始执行任务,生成随机数为7
- 查看数据库存储记录
任务常用操作
添加任务
add_job()是以传参的形式指定对应的函数名这种方法是最常用的,推荐使用此方法。此方法会返回一个apscheduler.job.Job实例,这样就可以在运行时,修改或删除任务。
scheduled_job() 是以装饰器的形式直接对我们要执行的函数进行修饰,这种方法最方便,但缺点就是运行时,不能修改任务。
- 通过add_job()添加任务在上面的配置schedulers时已经演示,此处不再赘述。
- 通过装饰器scheduled_job添加任务
# main.py
# 导入调度器,此处使用BlockingScheduler阻塞调度器
from apscheduler.schedulers.blocking import BlockingScheduler
# 导入触发器,此处使用IntervalTrigger特定时间间隔触发
from apscheduler.triggers.interval import IntervalTrigger
# 导入日志记录器
from loguru import logger
# 实例化调度器对象
scheduler = BlockingScheduler()
# 定时任务执行函数
@scheduler.scheduled_job(trigger=interval, args=(1,), seconds=3)
def my_task(number):
logger.info(开始执行任务,传入的参数是%s % number)
if __name__ == __main__:
try:
# 开始执行定时任务调度器
scheduler.start()
except (KeyboardInterrupt, SystemExit):
logger.error(进程已结束运行)
重要提醒:
- 如果添加 job 时,scheduler 尚未运行,job 会被临时地进行排列,直到 scheduler 启动之后,它的首次运行时间才会被确切地计算出来。
- 如果在程序初始化时,是从数据库读取任务的,那么必须为每个任务定义一个明确的ID,并且使replace_existing=True,否则每次重启程序,你都会得到一份新的任务拷贝,也就意味着任务的状态不会保存。
- 内置任务储存器中,只有MemoryJobStore不会序列化任务;内置执行器中,只有ProcessPoolExecutor会序列化任务。
- 如果想要立刻运行任务,可以在添加任务时省略trigger参数
获取任务列表
可以使用get_jobs方法来获得机器上可处理的作业调度列表。方法会返回一个Job实例的列表,如果你仅仅对特定的 job store 中的 job 感兴趣,可以将 job store 的别名作为第二个参数。
也可以使用print_jobs()来格式化输出作业列表以及它们的触发器和下一次的运行时间。
# main.py
# 导入调度器,此处使用BackgroundScheduler阻塞调度器
from apscheduler.schedulers.background import BackgroundScheduler
# 导入触发器,此处使用IntervalTrigger特定时间间隔触发
from apscheduler.triggers.interval import IntervalTrigger
# 导入日志记录器
from loguru import logger
import time
# 定时任务执行函数
def my_task():
logger.info(执行任务)
if __name__ == __main__:
try:
# 实例化调度器对象
scheduler = BackgroundScheduler()
# 添加定时任务,指定任务函数和触发器
my_job = scheduler.add_job(my_task, trigger=IntervalTrigger(seconds=1))
# 开始执行定时任务调度器
scheduler.start()
scheduler.print_jobs()
print(scheduler.get_jobs())
except (KeyboardInterrupt, SystemExit):
logger.error(进程已结束运行)
# 运行结果
Jobstore default:
my_task (trigger: interval[0::], next run at: :: CST)
[<Job (id=2e1b64f0422e4f2a9163ad2b7e634bd5 name=my_task)>]
删除任务
当从 scheduler 中移除一个 job 时,它会从关联的 job store 中被移除,不再被执行。如果想从调度器移除一个任务,那么你就要从相应的任务储存器中移除它,这样才算移除了。有两种方式:
- 调用remove_job(),参数为:任务ID,任务储存器名称。通过修饰器添加的任务,使用此方法删除任务。
# 导入调度器,此处使用BlockingScheduler阻塞调度器
from apscheduler.schedulers.background import BackgroundScheduler
# 导入触发器,此处使用IntervalTrigger特定时间间隔触发
from apscheduler.triggers.interval import IntervalTrigger
# 导入日志记录器
from loguru import logger
import time
# 实例化调度器对象
scheduler = BackgroundScheduler()
# 定时任务执行函数
@scheduler.scheduled_job(trigger=interval, seconds=1, id=my_task)
def my_task():
logger.info(开始执行任务)
if __name__ == __main__:
try:
# 开始执行定时任务调度器
logger.error(开始定时任务)
scheduler.start()
time.sleep(3)
logger.error(删除定时任务)
scheduler.remove_job(job_id=my_task)
except (KeyboardInterrupt, SystemExit):
logger.error(进程已结束运行)
- 在通过add_job()创建的任务实例上调用remove()方法
# main.py
# 导入调度器,此处使用BackgroundScheduler阻塞调度器
from apscheduler.schedulers.background import BackgroundScheduler
# 导入触发器,此处使用IntervalTrigger特定时间间隔触发
from apscheduler.triggers.interval import IntervalTrigger
# 导入日志记录器
from loguru import logger
import time
# 定时任务执行函数
def my_task():
logger.info(开始执行任务)
if __name__ == __main__:
try:
# 实例化调度器对象
scheduler = BackgroundScheduler()
# 添加定时任务,指定任务函数和触发器
my_job = scheduler.add_job(my_task, trigger=IntervalTrigger(seconds=1))
# 开始执行定时任务调度器
logger.error(开始定时任务)
scheduler.start()
time.sleep(3)
logger.error(删除定时任务)
my_job.remove()
except (KeyboardInterrupt, SystemExit):
logger.error(进程已结束运行)
- 执行结果(任务开始执行后,过了3秒钟删除定时任务,期间定时任务每秒执行一次,共执行3次)
:: | ERROR | __main__:<module>: - 开始定时任务
:: | INFO | __main__:my_task: - 开始执行任务
:: | INFO | __main__:my_task: - 开始执行任务
:: | INFO | __main__:my_task: - 开始执行任务
:: | ERROR | __main__:<module>: - 删除定时任务
注意点:如果使用BlockingScheduler调度器的话,在其start之后的任何操作都不会去执行。因此想要修改删除任务,必须使用BackgroundScheduler。
暂停/恢复任务
通过Job实例或者 scheduler 本身你可以轻易地暂停和恢复 job 。当一个 job 被暂停,它的下一次运行时间将会被清空,同时不再计算之后的运行时间,直到这个 job 被恢复。
- 对于使用add_job添加的任务,可以使用pause()方法暂停任务,使用resume()方法恢复任务
# main.py
# 导入调度器,此处使用BackgroundScheduler阻塞调度器
from apscheduler.schedulers.background import BackgroundScheduler
# 导入触发器,此处使用IntervalTrigger特定时间间隔触发
from apscheduler.triggers.interval import IntervalTrigger
# 导入日志记录器
from loguru import logger
import time
# 定时任务执行函数
def my_task():
logger.info(执行任务)
if __name__ == __main__:
try:
# 实例化调度器对象
scheduler = BackgroundScheduler()
# 添加定时任务,指定任务函数和触发器
my_job = scheduler.add_job(my_task, trigger=IntervalTrigger(seconds=1))
# 开始执行定时任务调度器
logger.error(开始定时任务)
scheduler.start()
time.sleep(3)
logger.error(暂停定时任务)
my_job.pause()
time.sleep(3)
logger.error(恢复定时任务)
my_job.resume()
time.sleep(3)
except (KeyboardInterrupt, SystemExit):
logger.error(进程已结束运行)
- 通过修饰器添加的任务,可以使用pause_job()方法暂停任务,使用resume_job()方法恢复任务
# 导入调度器,此处使用BlockingScheduler阻塞调度器
from apscheduler.schedulers.background import BackgroundScheduler
# 导入触发器,此处使用IntervalTrigger特定时间间隔触发
from apscheduler.triggers.interval import IntervalTrigger
# 导入日志记录器
from loguru import logger
import time
# 实例化调度器对象
scheduler = BackgroundScheduler()
# 定时任务执行函数
@scheduler.scheduled_job(trigger=interval, seconds=1, id=my_task)
def my_task():
logger.info(执行任务)
if __name__ == __main__:
try:
logger.error(开始定时任务)
# 开始执行定时任务调度器
scheduler.start()
time.sleep(3)
logger.error(暂停定时任务)
scheduler.pause_job(job_id=my_task)
time.sleep(3)
logger.error(恢复定时任务)
scheduler.resume_job(job_id=my_task)
time.sleep(3)
except (KeyboardInterrupt, SystemExit):
logger.error(进程已结束运行)
- 执行结果(开始执行定时任务后,每秒执行一次,执行了3次,然后暂停定时任务,过了3秒后恢复定时任务,执行3秒后进程结束)
:: | ERROR | __main__:<module>: - 开始定时任务
:: | INFO | __main__:my_task: - 执行任务
:: | INFO | __main__:my_task: - 执行任务
:: | INFO | __main__:my_task: - 执行任务
:: | ERROR | __main__:<module>: - 暂停定时任务
:: | ERROR | __main__:<module>: - 恢复定时任务
:: | INFO | __main__:my_task: - 执行任务
:: | INFO | __main__:my_task: - 执行任务
:: | INFO | __main__:my_task: - 执行任务
修改任务属性
apscheduler支持修改job 的属性,例如max_instances,coalesce等属性信息。
- 对于使用add_job添加的任务,可以使用modify()方法修改任务属性
# main.py
# 导入调度器,此处使用BackgroundScheduler阻塞调度器
from apscheduler.schedulers.background import BackgroundScheduler
# 导入触发器,此处使用IntervalTrigger特定时间间隔触发
from apscheduler.triggers.interval import IntervalTrigger
# 导入日志记录器
from loguru import logger
import time
# 定时任务执行函数
def my_task():
logger.info(执行task任务)
if __name__ == __main__:
try:
# 实例化调度器对象
scheduler = BackgroundScheduler()
# 添加定时任务,指定任务函数和触发器
my_job = scheduler.add_job(my_task, trigger=IntervalTrigger(seconds=1))
logger.error(开始定时任务)
# 开始执行定时任务调度器
scheduler.start()
time.sleep(3)
logger.error(修改定时任务属性)
my_job.modify(max_instances=3, name=new task)
time.sleep(3)
except (KeyboardInterrupt, SystemExit):
logger.error(进程已结束运行)
- 通过修饰器添加的任务,可以使用modify_job()方法修改任务属性
# 导入调度器,此处使用BlockingScheduler阻塞调度器
from apscheduler.schedulers.background import BackgroundScheduler
# 导入触发器,此处使用IntervalTrigger特定时间间隔触发
from apscheduler.triggers.interval import IntervalTrigger
# 导入日志记录器
from loguru import logger
import time
# 实例化调度器对象
scheduler = BackgroundScheduler()
# 定时任务执行函数
@scheduler.scheduled_job(trigger=interval, seconds=1, id=my_task)
def my_task():
logger.info(执行任务)
if __name__ == __main__:
try:
logger.error(开始定时任务)
# 开始执行定时任务调度器
scheduler.start()
time.sleep(3)
logger.error(修改定时任务属性)
scheduler.modify_job(job_id=my_task)
time.sleep(3)
except (KeyboardInterrupt, SystemExit):
logger.error(进程已结束运行)
- 执行结果
:: | ERROR | __main__:<module>: - 开始定时任务
:: | INFO | __main__:my_task: - 执行task任务
:: | INFO | __main__:my_task: - 执行task任务
:: | INFO | __main__:my_task: - 执行task任务
:: | ERROR | __main__:<module>: - 修改定时任务属性
:: | INFO | __main__:my_task: - 执行task任务
:: | INFO | __main__:my_task: - 执行task任务
:: | INFO | __main__:my_task: - 执行task任务
修改任务触发器
如果你想重新调度一个 job (这意味着要修改其 trigger),你可以使用apscheduler.job.Job.reschedule()或reschedule_job()方法。这些方法都会为 job 构建新的 trigger ,然后根据新的 trigger 重新计算其下一次的运行时间:
- 对于使用add_job添加的任务,可以使用reschedule()方法修改任务触发器
# main.py
# 导入调度器,此处使用BackgroundScheduler阻塞调度器
from apscheduler.schedulers.background import BackgroundScheduler
# 导入触发器,此处使用IntervalTrigger特定时间间隔触发
from apscheduler.triggers.interval import IntervalTrigger
# 导入日志记录器
from loguru import logger
import time
# 定时任务执行函数
def my_task():
logger.info(执行task任务)
if __name__ == __main__:
try:
# 实例化调度器对象
scheduler = BackgroundScheduler()
# 添加定时任务,指定任务函数和触发器
my_job = scheduler.add_job(my_task, trigger=IntervalTrigger(seconds=1))
logger.error(开始定时任务)
# 开始执行定时任务调度器
scheduler.start()
time.sleep(3)
logger.error(修改定时任务触发器)
my_job.reschedule(trigger=IntervalTrigger(seconds=2))
time.sleep(3)
except (KeyboardInterrupt, SystemExit):
logger.error(进程已结束运行)
- 通过修饰器添加的任务,可以使用reschedule_job()方法修改任务触发器
# 导入调度器,此处使用BlockingScheduler阻塞调度器
from apscheduler.schedulers.background import BackgroundScheduler
# 导入触发器,此处使用IntervalTrigger特定时间间隔触发
from apscheduler.triggers.interval import IntervalTrigger
# 导入日志记录器
from loguru import logger
import time
# 实例化调度器对象
scheduler = BackgroundScheduler()
# 定时任务执行函数
@scheduler.scheduled_job(trigger=interval, seconds=1, id=my_task)
def my_task():
logger.info(执行任务)
if __name__ == __main__:
try:
logger.error(开始定时任务)
# 开始执行定时任务调度器
scheduler.start()
time.sleep(3)
logger.error(修改定时任务属性)
scheduler.reschedule_job(job_id=my_task)
time.sleep(3)
except (KeyboardInterrupt, SystemExit):
logger.error(进程已结束运行)
- 执行结果(开始定时任务后,3秒钟时间,每1秒执行一次任务,共执行3次。然后修改触发器为每2秒执行一次,在3秒时间内执行了1次任务)
:: | ERROR | __main__:<module>: - 开始定时任务
:: | INFO | __main__:my_task: - 执行task任务
:: | INFO | __main__:my_task: - 执行task任务
:: | INFO | __main__:my_task: - 执行task任务
:: | ERROR | __main__:<module>: - 修改定时任务触发器
:: | INFO | __main__:my_task: - 执行task任务
调度器常用操作
终止调度器
默认情况,会终止任务存储器以及执行器,然后等待所有目前执行的job完成后(自动终止)
如果使用wait=False,不会等待任何运行中的任务完成,直接终止
- 使用默认情况,等待任务完成后终止
# main.py
# 导入调度器,此处使用BackgroundScheduler阻塞调度器
from apscheduler.schedulers.background import BackgroundScheduler
# 导入触发器,此处使用IntervalTrigger特定时间间隔触发
from apscheduler.triggers.interval import IntervalTrigger
# 导入日志记录器
from loguru import logger
import time
# 定时任务执行函数
def my_task():
logger.info(开始执行task任务)
time.sleep(2)
logger.info(task任务执行完成)
if __name__ == __main__:
try:
# 实例化调度器对象
scheduler = BackgroundScheduler()
# 添加定时任务,指定任务函数和触发器
my_job = scheduler.add_job(my_task, trigger=IntervalTrigger(seconds=3))
logger.error(开始定时任务)
# 开始执行定时任务调度器
scheduler.start()
time.sleep(6)
logger.error(终止调度器)
scheduler.shutdown()
logger.error(scheduler.get_jobs())
except (KeyboardInterrupt, SystemExit):
logger.error(进程已结束运行)
- 执行结果
:: | ERROR | __main__:<module>: - 开始定时任务
:: | INFO | __main__:my_task: - 开始执行task任务
:: | INFO | __main__:my_task: - task任务执行完成
:: | INFO | __main__:my_task: - 开始执行task任务
:: | ERROR | __main__:<module>: - 终止调度器
:: | INFO | __main__:my_task: - task任务执行完成
:: | ERROR | __main__:<module>: - []
- 使用wait=False参数直接终止
# main.py
# 导入调度器,此处使用BackgroundScheduler阻塞调度器
from apscheduler.schedulers.background import BackgroundScheduler
# 导入触发器,此处使用IntervalTrigger特定时间间隔触发
from apscheduler.triggers.interval import IntervalTrigger
# 导入日志记录器
from loguru import logger
import time
# 定时任务执行函数
def my_task():
logger.info(开始执行task任务)
time.sleep(2)
logger.info(task任务执行完成)
if __name__ == __main__:
try:
# 实例化调度器对象
scheduler = BackgroundScheduler()
# 添加定时任务,指定任务函数和触发器
my_job = scheduler.add_job(my_task, trigger=IntervalTrigger(seconds=3))
logger.error(开始定时任务)
# 开始执行定时任务调度器
scheduler.start()
time.sleep(6)
logger.error(终止调度器)
scheduler.shutdown(wait=False)
logger.error(scheduler.get_jobs())
except (KeyboardInterrupt, SystemExit):
logger.error(进程已结束运行)
- 执行结果
:: | ERROR | __main__:<module>: - 开始定时任务
:: | INFO | __main__:my_task: - 开始执行task任务
:: | INFO | __main__:my_task: - task任务执行完成
:: | INFO | __main__:my_task: - 开始执行task任务
:: | ERROR | __main__:<module>: - 终止调度器
:: | ERROR | __main__:<module>: - []
:: | INFO | __main__:my_task: - task任务执行完成
调度器事件监听
可以为 scheduler 绑定事件监听器(event listen)。Scheduler 事件在某些情况下会被触发,而且它可能携带有关特定事件的细节信息。为add_listener()函数提供适当的掩码参数(mask argument)或者是将不同的常数组合到一起,可以监听特定类型的事件。可调用的listener可以通过event object作为参数而被调用。
事件 | 对应枚举值 | 描述 | 归属类 |
EVENT_SCHEDULER_STARTED | 1 | 调度程序启动 | SchedulerEvent |
EVENT_SCHEDULER_SHUTDOWN | 2 | 调度程序关闭 | SchedulerEvent |
EVENT_SCHEDULER_PAUSED | 4 | 调度程序中任务处理暂停 | SchedulerEvent |
EVENT_SCHEDULER_RESUMED | 8 | 调度程序中任务处理恢复 | SchedulerEvent |
EVENT_EXECUTOR_ADDED | 将执行器添加到调度程序中 | SchedulerEvent | |
EVENT_EXECUTOR_REMOVED | 执行器从调度程序中删除 | SchedulerEvent | |
EVENT_JOBSTORE_ADDED | 将任务存储添加到调度程序中 | SchedulerEvent | |
EVENT_JOBSTORE_REMOVED | 任务存储从调度程序中删除 | SchedulerEvent | |
EVENT_ALL_JOBS_REMOVED | 所有任务从所有任务存储中删除或从一个特定的任务存储中删除 | SchedulerEvent | |
EVENT_JOB_ADDED | 任务添加到任务存储中 | JobEvent | |
EVENT_JOB_REMOVED | 从任务存储中删除了任务 | JobEvent | |
EVENT_JOB_MODIFIED | 从调度程序外部修改了任务 | JobEvent | |
EVENT_JOB_EXECUTED | 任务被成功执行 | JobExecutionEvent | |
EVENT_JOB_ERROR | 任务在执行期间引发异常 | JobExecutionEvent | |
EVENT_JOB_MISSED | 错过了任务执行 | JobExecutionEvent | |
EVENT_JOB_SUBMITTED | 任务已经提交到执行器中执行 | JobSubmissionEvent | |
EVENT_JOB_MAX_INSTANCES | 任务因为达到最大并发执行时,触发的事件 | JobSubmissionEvent | |
EVENT_ALL | 包含以上的所有事件 |
- 示例代码
# main.py
# 导入调度器,此处使用BackgroundScheduler阻塞调度器
from apscheduler.schedulers.background import BackgroundScheduler
# 导入触发器,此处使用IntervalTrigger特定时间间隔触发
from apscheduler.triggers.interval import IntervalTrigger
# 导入事件类
from apscheduler.events import EVENT_ALL
# 导入日志记录器
from loguru import logger
import time
# 定时任务执行函数
def my_task():
logger.info(执行task任务)
# 事件监听函数
def my_listener(event):
match event.code:
case :
logger.info(任务被成功执行)
case :
logger.info(任务已经提交到执行器中执行)
case _:
logger.info(event.code)
if __name__ == __main__:
try:
# 实例化调度器对象
scheduler = BackgroundScheduler()
# 添加定时任务,指定任务函数和触发器
my_job = scheduler.add_job(my_task, trigger=IntervalTrigger(seconds=2))
logger.error(开始定时任务)
# 开始执行定时任务调度器
scheduler.start()
scheduler.add_listener(my_listener, mask=EVENT_ALL)
time.sleep(4)
except (KeyboardInterrupt, SystemExit):
logger.error(进程已结束运行)
- 执行执行结果
:: | ERROR | __main__:<module>: - 开始定时任务
:: | INFO | __main__:my_task: - 执行task任务
:: | INFO | __main__:my_listener: - 任务被成功执行
:: | INFO | __main__:my_listener: - 任务已经提交到执行器中执行
:: | INFO | __main__:my_listener: - 任务已经提交到执行器中执行
:: | INFO | __main__:my_task: - 执行task任务
:: | INFO | __main__:my_listener: - 任务被成功执行
故障排查
如果 scheduler 没有如预期般正常运行,可以尝试将apscheduler的 logger 的日志级别提升到DEBUG等级。
- 示例代码
# main.py
# 导入调度器,此处使用BackgroundScheduler阻塞调度器
from apscheduler.schedulers.background import BackgroundScheduler
# 导入触发器,此处使用IntervalTrigger特定时间间隔触发
from apscheduler.triggers.interval import IntervalTrigger
# 导入事件类
from apscheduler.events import EVENT_ALL
# 导入日志记录器
from loguru import logger
import time
import logging
logging.basicConfig()
logging.getLogger(apscheduler).setLevel(logging.DEBUG)
# 定时任务执行函数
def my_task():
logger.info(执行task任务)
if __name__ == __main__:
try:
# 实例化调度器对象
scheduler = BackgroundScheduler()
# 添加定时任务,指定任务函数和触发器
my_job = scheduler.add_job(my_task, trigger=IntervalTrigger(seconds=2))
logger.error(开始定时任务)
# 开始执行定时任务调度器
scheduler.start()
time.sleep(3)
except (KeyboardInterrupt, SystemExit):
logger.error(进程已结束运行)
- 执行结果
INFO:apscheduler.scheduler:Adding job tentatively -- it will be properly scheduled when the scheduler starts
:: | ERROR | __main__:<module>: - 开始定时任务
INFO:apscheduler.scheduler:Added job my_task to job store default
INFO:apscheduler.scheduler:Scheduler started
DEBUG:apscheduler.scheduler:Looking for jobs to run
DEBUG:apscheduler.scheduler:Next wakeup is due at ::+: (in seconds)
DEBUG:apscheduler.scheduler:Looking for jobs to run
INFO:apscheduler.executors.default:Running job my_task (trigger: interval[0::], next run at: :: CST) (scheduled at ::+:-:: | INFO | __main__:my_task: - 执行task任务
INFO:apscheduler.executors.default:Job my_task (trigger: interval[0::], next run at: :: CST) executed successfully
DEBUG:apscheduler.scheduler:Next wakeup is due at ::+: (in seconds)
APScheduler方法总结
控制调度程序
方法 | 描述 | 异常 |
configure | 对给定的调度程序重新设置配置 | SchedulerAlreadyRunningError –如果调度程序已经在运行 |
start | 启动已配置的执行程序和任务存储,并开始处理计划的任务。 | SchedulerAlreadyRunningError –如果调度程序已经在运行 RuntimeError –如果在禁用线程的uWSGI下运行 |
shutdown | 关闭调度程序及其执行程序和任务存储。 | SchedulerNotRunningError –如果尚未启动调度程序 |
pause | 暂停调度程序中的任务处理。 | |
resume | 在调度程序中恢复任务处理。 | |
wakeup | 通知调度程序可能有任务需要执行。 |
控制执行器
方法 | 描述 | 异常 |
add_executor | 将执行程序添加到此调度程序。 | ValueError –如果已有给定别名的执行程序 |
remove_executor | 从此调度程序中删除具有给定别名的执行程序。 |
控制任务存储
方法 | 描述 | 异常 |
add_jobstore | 将任务存储添加到此调度程序。 | ValueError –如果已经存在给定别名的任务存储 |
remove_jobstore | 从此调度程序中通过给定别名删除任务存储。 |
控制事件侦听器
方法 | 描述 |
add_listener | 添加调度程序事件的侦听器。 |
remove_listener | 删除以前添加的事件侦听器。 |
控制任务
Job 的配置参数和前面 Job 里介绍的差不多, BaseScheduler 只是将 Job 的接口重新封装了
但是也实现了很多任务是如何添加到任务存储,任务是如何分配给执行器等等实现,所以在 Job 那一部分提到,官方是不希望由用户自己实例化 Job
方法 | 描述 | 异常 |
add_job | 将给定的任务添加到任务列表中,如果调度程序已经在运行,则将其唤醒。 | |
scheduled_job | 使用 scheduled_job 装饰器来动态装饰 Job 的实际函数 | |
modify_job | 修改单个任务的属性。 | |
reschedule_job | 为任务构造一个新触发器,并更新其下一个运行时间。 | |
pause_job | 使给定任务在明确恢复之前不执行。 | |
resume_job | 恢复给定任务的计划,如果计划已完成,则将其删除。 | |
get_jobs | 从特定的任务存储或所有任务中返回挂起的任务 | |
get_job | 返回与给定 job_id 匹配的 Job | |
remove_job | 删除任务,使其无法再运行。 | JobLookupError – 如果没有找到任务 |
remove_all_jobs | 从指定的任务存储中删除所有任务,或者如果没有给出任何任务,则删除所有任务存储。 | |
print_jobs | 打印出当前计划在所有任务存储库或仅特定任务存储库上的所有任务的文本列表。 |
django-apscheduler
如果你正在开发的Web项目需要实现定时任务的功能,得益于 APScheduler支持多样的调度器,我们可以很容易的将APScheduler和我们的DRF项目结合到一起。
功能简介
在这里强烈推荐使用django-apscheduler库,对比APScheduler,他添加了以下功能:
- 任务查看功能:将任务数据可以持久化保存的Django数据库,可以直接通过admin页面查看当前的任务列表和任务平均执行时间、下次任务执行时间等信息。
- 执行历史记录功能:查看当前所有定时任务的执行历史和状态码等信息。
- 任务管理功能:可以选择删除定时任务和通过管理页面手动触发任务
安装模块
pip install django-apscheduler
配置
- settings配置
# 注册app
INSTALLED_APPS = (
# ...
django_apscheduler,
)
# apscheduler全局配置
APSCHEDULER_DATETIME_FORMAT = N j, Y, f:s a # Django admin中显示带秒的时间
APSCHEDULER_RUN_NOW_TIMEOUT = # admin手动触发的作业最大运行时间
- 迁移数据库
python manage.py migrate
- 查看数据库表结构
django_apscheduler_djangojob:用于存放任务列表
django_apscheduler_djangojobexecution:用于存放任务执行历史
使用
由于在生产环境通常会使用uwsgi启动多个进程运行服务,会导致每个工作进程都有自己独立的定时任务,最终会使得定时任务多次重复执行,因此,在Django中使用apscheduler时,推荐使用自定义命令,在一个单独的专用进程中执行单个定时任务。
添加自定义命令到项目中
创建一个名为public的APP并注册,然后在public目录里面创建commands的python文件夹,最后在commands文件夹下创建crontab.py文件,文件目录结构如下所示:
我们使用BlockingScheduler后台调度器,并使用Django ORM作为任务存储器,并添加了一个my_job的定时任务和一个清理过期记录的定时任务,自定义命令crontab内容如下:
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.interval import IntervalTrigger
from django.core.management.base import BaseCommand
from django_apscheduler import util
from apscheduler.triggers.cron import CronTrigger
from django_apscheduler.jobstores import DjangoJobStore
from django_apscheduler.models import DjangoJobExecution
from django.conf import settings
from loguru import logger
def my_job():
logger.info(定时任务开始执行)
logger.info(定时任务执行完毕)
@util.close_old_connections
def delete_old_job_executions(max_age=604_800):
DjangoJobExecution.objects.delete_old_job_executions(max_age)
class Command(BaseCommand):
help = Runs APScheduler.
def handle(self, *args, **options):
scheduler = BlockingScheduler(timezone=settings.TIME_ZONE)
scheduler.add_jobstore(DjangoJobStore(), default)
scheduler.add_job(
my_job,
trigger=IntervalTrigger(seconds=1, timezone=settings.TIME_ZONE), # Every seconds
id=my_job, # The `id` assigned to each job MUST be unique
max_instances=5,
replace_existing=True,
misfire_grace_time=
)
logger.info(添加my_job任务成功)
scheduler.add_job(
delete_old_job_executions,
trigger=CronTrigger(
day_of_week=mon, hour=, minute=
), # Midnight on Monday, before start of the next work week.
id=delete_old_job_executions,
max_instances=1,
replace_existing=True
)
logger.info(添加delete_old_job_executions任务成功)
try:
logger.info(scheduler开始执行...)
scheduler.start()
except KeyboardInterrupt:
logger.info(scheduler停止执行...)
scheduler.shutdown()
logger.info(Scheduler成功停止!)
运行自定义命令并查看结果
(venv) drf_apscheduler git:(master) python manage.py crontab
:: | INFO | public.management.commands.crontab:handle: - 添加my_job任务成功
:: | INFO | public.management.commands.crontab:handle: - 添加delete_old_job_executions任务成功
:: | INFO | public.management.commands.crontab:handle: - scheduler开始执行...
:: | INFO | public.management.commands.crontab:my_job: - 定时任务开始执行
:: | INFO | public.management.commands.crontab:my_job: - 定时任务执行完毕
查看admin管理页任务详情
django-apscheduler功能扩展
虽然使用django-apscheduler的admin实现了任务历史记录查看、手动执行、删除等操作,但是在实际前后端分离开发过程中,需要提供相关的API接口供前端调用,因此还需要在django-apscheduler的基础上做二次开发,扩展相关功能。
源码分析
模型分析(venv/lib/python3./site-packages/django_apscheduler/models.py):
可以看到一共有两个模型,一个是DjangoJob,用于存放定时任务id和下次运行时间。另一个是DjangoJobExecutionManager,用户存放定时任务执行历史记录。
admin分析(venv/lib/python3./site-packages/django_apscheduler/admin.py):
主要关注run_selected_jobsh函数,他实现了手动执行选定的任务的功能。
关键函数分析(venv/lib/python3./site-packages/django_apscheduler/jobstores.py):
主要关注DjangoJobStore这个类。DjangoJobStore实现了任务存储器使用Django数据库的功能,同时还封装了一些任务的修改、删除、查询等方法
- 列出所有作业get_all_jobs()
- 查找作业lookup_job(job_id)
- 删除作业remove_job(job_id)
- 删除所有作业remove_all_jobs()
- 更新作业update_job(job)
- 新增作业add_job(**job_state)
项目开发
对django-apscheduler源码分析后可发现,对定时任务系统的需求都可以使用django-apscheduler来实现,梳理一下各个功能模块的开发思路:
- 获取定时任务执行历史记录:定义一个模型序列化器和只读模型视图集,关联DjangoJobExecutionManager模型即可。
- 获取定时任务列表:定义一个模型序列化器和模型视图集,关联DjangoJob模型即可。
- 删除任务:重写模型视图集的perform_destroy方法,使用DjangoJobStore类的remove_job方法即可。
- 暂停任务:先使用DjangoJobStore类的lookup_job方法找到对应的任务实例,然后将next_run_time属性设置为None,最后使用update_job方法更新实例即可。
- 恢复任务:使用DjangoJobStore类的lookup_job方法找到对应的任务实例,并使用__getstate__将任务属性转为字典,然后使用job_add方法,重新将任务信息重新添加到Django数据库,添加时将replace_existing设置为True。
- 修改任务触发器:使用DjangoJobStore类的lookup_job方法找到对应的任务实例,然后将trigger属性设置为新的触发器,最后使用update_job方法更新实例即可。
- 手动执行任务:与恢复任务类似,区别在于任务存储器使用内存存储即可。
路由配置(public/urls.py)
from rest_framework import routers
from public import views
from django.urls import path
app_name = public
urlpatterns = [
# 定时作业暂停/恢复
path(job_pause/<str:job_id>/, views.JobPauseAPIView.as_view(), name=job_pause),
# 更改定时作业触发器
path(job_triggers/<str:job_id>/, views.JobTriggersAPIView.as_view(), name=job_triggers),
# 立即执行一次定时作业
path(job_run/<str:job_id>/, views.JobRunAPIView.as_view(), name=job_triggers)
]
router.register(user, views.UserDemoModelViewSet, user)
# 获取定时任务执行历史记录
router.register(job_history, views.JobHistoryReadOnlyModelViewSet, job_history)
# 获取定时任务列表
router.register(job, views.JobModelViewSet, job)
urlpatterns += router.urls
视图配置(public/views.py)
from django.shortcuts import render
from django_apscheduler.models import DjangoJobExecution, DjangoJob
from rest_framework import viewsets, status
from rest_framework.response import Response
from rest_framework.views import APIView
from public.serializers import DjangoJobExecutionSerializer, \
DjangoJobSerializer
from public.utils import MyPageNumber
from apscheduler.triggers.cron import CronTrigger
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.job import Job as AppSchedulerJob
from django_apscheduler.jobstores import DjangoJobStore, DjangoMemoryJobStore
django_job_store = DjangoJobStore()
class JobHistoryReadOnlyModelViewSet(viewsets.ReadOnlyModelViewSet):
定时作业执行历史
queryset = DjangoJobExecution.objects.all()
serializer_class = DjangoJobExecutionSerializer
pagination_class = MyPageNumber
class JobModelViewSet(viewsets.ModelViewSet):
定时作业列表
queryset = DjangoJob.objects.all()
serializer_class = DjangoJobSerializer
# 重写删除方法
def perform_destroy(self, instance):
django_job_store.remove_job(instance.id)
class JobPauseAPIView(APIView):
定时作业暂停/恢复
@staticmethod
def post(request, job_id):
action = request.data.get(action)
job: AppSchedulerJob = django_job_store.lookup_job(job_id)
if action == pause:
job.next_run_time = None
django_job_store.update_job(job)
result = {id: job_id, message: 任务暂停成功!}
else:
job_state = job.__getstate__()
del job_state[next_run_time]
scheduler = BackgroundScheduler()
scheduler.add_jobstore(django_job_store)
scheduler.add_job(replace_existing=True, **job_state)
scheduler.start()
result = {id: job_id, message: 任务恢复成功!}
return Response(result, status=status.HTTP_200_OK)
class JobTriggersAPIView(APIView):
更改定时作业触发器
@staticmethod
def post(request, job_id):
crontab_exp = request.data.get(crontab_exp)
job: AppSchedulerJob = django_job_store.lookup_job(job_id)
job.trigger = CronTrigger.from_crontab(crontab_exp)
django_job_store.update_job(job)
result = {id: job_id, message: 修改定时任务触发器成功!}
return Response(result, status=status.HTTP_200_OK)
class JobRunAPIView(APIView):
立即手动执行一次定时作业
@staticmethod
def post(request, job_id):
scheduler = BackgroundScheduler()
scheduler.add_jobstore(DjangoMemoryJobStore())
scheduler.start()
job: AppSchedulerJob = django_job_store.lookup_job(job_id)
job_state = job.__getstate__()
del job_state[next_run_time]
del job_state[version]
del job_state[executor]
job_state[trigger] = None
scheduler.add_job(replace_existing=True, **job_state)
result = {id: job_id, message: 定时任务手动执行成功!}
return Response(result, status=status.HTTP_200_OK)
序列化器配置(public/serializers.py)
import time
from datetime import datetime
from django_apscheduler.models import DjangoJobExecution, DjangoJob
from rest_framework import serializers
class DjangoJobExecutionSerializer(serializers.ModelSerializer):
定时作业执行历史列化器
class Meta:
model = DjangoJobExecution
fields = __all__
class DjangoJobSerializer(serializers.ModelSerializer):
定时作业列表列化器
class Meta:
model = DjangoJob
exclude = [job_state]
项目地址
github:https://github.com/cuiliang0302/drf_template
gitee:https://gitee.com/cuiliang0302/drf_template
API接口文档:https://www.apifox.cn/apidoc/shared-34bb4a27-bf7b-432d-9d51-0a767a259e6e 访问密码 : 4UoQc75S
从0开始写API接口开发思路
如果想从头开始写API接口实现apscheduler定时任务的CRUD,也是很容易的,在此提供一个开发思路给大家。
和Django-apscheduler一样,我们首先创建一个模型,用于存放定时任务id,状态,执行时间,关联的函数等信息。
然后定义一个scheduler类,在初始化时选择添加默认的调度器、任务存储器、执行器等参数。
最后通过引入我们创建好的 scheduler 对象之后就可以直接用来做增删改查的操作:
- 增:使用 add_job() 方法,其主要的参数是要运行的函数(或方法)、触发器以及触发器参数等
- 删:使用 delete_job() 方法,我们需要传入一个对应任务的 id 参数,用以能够查找到对应的任务
- 改:使用 reschedule_job() 方法,这里也需要一个对应任务的 id 参数,以及需要重新修改的触发器及其参数
- 查:使用 get_jobs() 和 get_job() 两个方法,前者是直接获取到当前调度的所有任务,返回的是一个包含了 APScheduler.job.Job 对象的列表,而后者是通过 id 参数来查找对应的任务对象;这里我通过底层源码使用 getstate() 来获取到任务的相关信息,这些信息我们通过事先设定好的 Job 对象来对其进行序列化,最后将信息从接口中返回。
常见问题
为什么scheduler不执行job ?
导致这种情况的原因很多,最常见的两种情况是:
- scheduler 在 uWSGI 的工作进程中运行,但是(uWSGI)并没有启用多线程
- 运行了BackgroundScheduler但是已经执行到了脚本的末尾进程已经退出运行。
示例代码:
from apscheduler.schedulers.background import BackgroundScheduler
def myjob():
print(hello)
scheduler = BackgroundScheduler()
scheduler.start()
scheduler.add_job(myjob, cron, hour=0)
可见,以上脚本在运行完add_job()之后就直接退出了,因此 scheduler 根本没有机会去运行其调度好的 job 。
如何在 uWSGI 中使用 APScheduler
方案一:uWSGI 使用了一些技巧来禁用掉 GIL 锁,但多线程的使用对于 APScheduler 的操作来说至关重要。为了修复这个问题,你需要使用--enalbe-threads选项来重新启用 GIL 。
方案二:在一个单独的专用进程中执行单个定时任务。
如何在一个或多个工作进程中共享独立的 job store
目前版本是不支持的,但是未来apscheduler4计划会支持这个功能,详情参考文档:https://github.com/agronholm/apscheduler/issues/
在两个或更多的进程中共享一个持久化的 job store 会导致 scheduler 的行为不正常:如重复执行或作业丢失,等等。这是因为 APScheduler 目前没有任何进程间同步和信号量机制,因此当一个 job 被添加、修改或从 scheduler 中移除时 scheduler 无法得到通知。
变通方案:在专用的进程中来运行 scheduler,然后通过一些远程访问的途径 —— 如 RPyC、gRPC 或一个 HTTP 服务器 —— 来将其连接起来。在源码仓库中包含了一个使用 RPyC 的示例。
如何在 web 应用中使用 APScheduler
如果你想在 Django 中运行,可以考虑django_apscheduler,推荐使用自定义命令,在一个单独的专用进程中执行单个定时任务。
如果你想在 Flask 中使用 APScheduler ,这里也有一个非官方的插件Flask-APScheduler。
参考文档
date触发器:https://apscheduler.readthedocs.io/en/stable/modules/triggers/date.html#module-apscheduler.triggers.date
interval触发器:https://apscheduler.readthedocs.io/en/stable/modules/triggers/interval.html#module-apscheduler.triggers.interval
crontab触发器:https://apscheduler.readthedocs.io/en/stable/modules/triggers/cron.html#module-apscheduler.triggers.cron
job配置项:https://apscheduler.readthedocs.io/en/stable/modules/job.html#module-apscheduler.job
apscheduler方法示例:https://apscheduler.readthedocs.io/en/stable/py-modindex.html
django-apscheduler地址:https://github.com/jcass77/django-apscheduler
查看更多
崔亮的博客-专注devops自动化运维,传播优秀it运维技术文章。更多原创运维开发相关文章,欢迎访问www.cuiliangblog.cn