Hi,大家好,我是编程小6,很荣幸遇见你,我把这些年在开发过程中遇到的问题或想法写出来,今天说一说
python定时任务框架APScheduler详解,希望能够帮助你!!!。
说到定时任务,大家肯定会想起使用linux的crontab,或者windows自带的任务计划。虽然他们都可以实现定时任务,但是如果想要实现任务的精细化灵活控制,以及任务程序跨平台运行,此时就需要使用定时任务框架去实现。Python的apscheduler就提供了非常丰富而且方便易用的定时任务接口,本文介绍如何使用 apscheduler 实现你的定时任务。
库名称 |
简介 |
优点 |
缺点 |
Apscheduler |
基于Quartz的一个Python定时任务框架,提供了基于日期、固定时间间隔以及crontab类型的任务,并且可以持久化作业 |
支持定时、定期、一次性任务,支持任务持久化及动态添加 |
配置可选项较多,配置起来较为复杂,有一定的学习成本。 |
Celery |
是一个简单,灵活,可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具, 也可用于任务调度 |
支持配置定期任务、支持 crontab 模式配置 |
不支持一次性定时任务,单独为定时任务功能而搭建celery显得过于重量级。 |
schedule |
轻量级,无需配置的作业调度库 |
轻量级、无需配置、语法简单 |
阻塞式调用、无法动态添加或删除任务,无任务状态存储 |
python-crontab |
针对系统 Cron 操作 crontab 文件的作业调度库 |
支持定时、定期任务,能够动态添加任务 |
不能实现一次性任务需求,没有状态存储,无法跨平台执行 |
触发器包含调度逻辑,描述一个任务何时被触发,按日期或按时间间隔或按 cronjob 表达式三种方式触发。每个作业都有它自己的触发器,除了初始配置之外,触发器是完全无状态的。
任务存储器指定了作业被存放的位置,默认情况下作业保存在内存,也可将作业保存在各种数据库中,当任务被存放在数据库中时,它会被序列化,当被重新加载时会反序列化。作业存储器充当保存、加载、更新和查找作业的中间商。在调度器之间不能共享作业存储。
执行器是将指定的作业(调用函数)提交到线程池或进程池中运行,当任务完成时,执行器通知调度器触发相应的事件。
任务调度器,属于控制角色,通过它配置作业存储器、执行器和触发器,添加、修改和删除任务。调度器协调触发器、作业存储器、执行器的运行,通常只有一个调度程序运行在应用程序中,开发人员通常不需要直接处理作业存储器、执行器或触发器,配置作业存储器和执行器是通过调度器来完成的。
安装非常简单,通过pip install apscheduler即可。
# main.py # 导入调度器,此处使用BlockingScheduler阻塞调度器 from apscheduler.schedulers.blocking import BlockingScheduler # 导入触发器,此处使用IntervalTrigger特定时间间隔触发 from apscheduler.triggers.interval import IntervalTrigger # 导入日志记录器 from loguru import logger # 定时任务执行函数 def my_task(): logger.info("开始执行任务") if __name__ == '__main__': # 实例化调度器对象 scheduler = BlockingScheduler() # 添加定时任务,指定任务函数和触发器 scheduler.add_job(my_task, trigger=IntervalTrigger(seconds=3)) # 开始执行定时任务 scheduler.start()
/Users/cuiliang/PycharmProjects/test/venv/bin/python /Users/cuiliang/PycharmProjects/test/crontab.py 2022-08-21 22:09:10.063 | INFO | __main__:my_task:7 - 开始执行任务 2022-08-21 22:09:13.068 | INFO | __main__:my_task:7 - 开始执行任务 2022-08-21 22:09:16.067 | INFO | __main__:my_task:7 - 开始执行任务
APScheduler支持的触发器主要有:
# 指定任务在2022年8月23日执行 scheduler.add_job(my_task, trigger=DateTrigger(run_date=date(2022, 8, 23), timezone="Asia/Shanghai")) # 指定任务在2022年8月23日8时5分30秒执行 scheduler.add_job(my_task, trigger=DateTrigger(run_date=datetime(2022, 8, 23, 8, 8, 30), timezone="Asia/Shanghai")) # 指定任务在2022年8月23日8时5分30秒执行 scheduler.add_job(my_task, trigger=DateTrigger(run_date="2022-08-23 08:08:30", timezone="Asia/Shanghai"))
interval触发器支持设置如下参数:
参数 |
含义 |
类型 |
weeks |
周 |
整形 |
days |
一个月中的第几天 |
整形 |
hours |
小时 |
整形 |
minutes |
分钟 |
整形 |
seconds |
秒 |
整形 |
start_date |
间隔触发的起始时间 |
时间格式字符串 |
end_date |
间隔触发的结束时间 |
时间格式字符串 |
jitter |
触发的时间误差 |
整形 |
# 指定任务每10分钟执行一次 scheduler.add_job(my_task, trigger=IntervalTrigger(minutes=10, timezone="Asia/Shanghai")) # 指定任务在2022年8月22日9时到10时区间内,每10分钟执行一次 scheduler.add_job(my_task, trigger=IntervalTrigger(minutes=10, start_date="2022-08-22 09:00:00", end_date="2022-08-22 10:00:00", timezone="Asia/Shanghai"))
cron触发器支持设置如下参数:
参数 |
含义 |
year |
4位数字的年份 |
month |
1-12月份 |
day |
1-31日 |
week |
1-53周 |
day_of_week |
一个礼拜中的第几天( 0-6或者 mon、 tue、 wed、 thu、 fri、 sat、 sun) |
hour |
0-23小时 |
minute |
0-59分钟 |
second |
0-59秒 |
start_date |
datetime类型或者字符串类型,起始时间 |
end_date |
datetime类型或者字符串类型,结束时间 |
timezone |
时区 |
jitter |
任务触发的误差时间 |
也可以用表达式类型,可以用以下方式:
表达式 |
字段 |
描述 |
* |
任何 |
在每个值都触发 |
*/a |
任何 |
每隔a触发一次 |
a-b |
任何 |
在a-b区间内任何一个时间触发 |
a-b/c |
任何 |
在a-b区间内每隔c触发一次 |
xth y |
day |
在x个星期y触发 |
last x |
day |
在最后一个星期x触发 |
last |
day |
在一个月中的最后一天触发 |
x,y,z |
任何 |
将上面的表达式进行组合 |
# 指定任务在1-3月和6-9月,每个月第三个星期5那天的0-4点每2个小时执行一次 scheduler.add_job(my_task, trigger=CronTrigger(month="1-3,6-9", day="3th 5", hour="0-4/2", timezone="Asia/Shanghai")) # 使用crontab表达式,指定任务在每天1-15日每天0点0分执行一次 scheduler.add_job(my_task, trigger=CronTrigger.from_crontab("0 0 1-15 * *", timezone="Asia/Shanghai"))
APScheduler 提供了以下几种调度器:
from datetime import datetime import os from apscheduler.schedulers.blocking import BlockingScheduler def tick(): print('Tick! The time is: %s' % datetime.now()) if __name__ == '__main__': scheduler = BlockingScheduler() scheduler.add_job(tick, 'interval', seconds=3) print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C')) try: scheduler.start() except (KeyboardInterrupt, SystemExit): pass
from datetime import datetime import time import os from apscheduler.schedulers.background import BackgroundScheduler def tick(): print('Tick! The time is: %s' % datetime.now()) if __name__ == '__main__': scheduler = BackgroundScheduler() scheduler.add_job(tick, 'interval', seconds=3) scheduler.start() print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C')) try: # 模拟主线程持续运行。 while True: time.sleep(2) except (KeyboardInterrupt, SystemExit): scheduler.shutdown()
通常情况下如果不是和 Web 项目或应用集成共存,那么往往都首选 BlockingScheduler 调度器来进行操作,它会在当前进程中启动相应的线程来进行任务调度与处理;反之,如果是和 Web 项目或应用共存,那么需要选择 BackgroundScheduler 调度器,因为它不会干扰当前应用的线程或进程状况。
APScheduler 提供了以下几种执行器:
APScheduler支持的数据库主要有:
# main.py # 导入调度器,此处使用BlockingScheduler阻塞调度器 from apscheduler.schedulers.blocking import BlockingScheduler # 导入触发器,此处使用IntervalTrigger特定时间间隔触发 from apscheduler.triggers.interval import IntervalTrigger # 导入任务存储器,此处使用SQLAlchemyJobStore from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore # 导入日志记录器 from loguru import logger # 定时任务执行函数 def my_task(): logger.info("开始执行任务") if __name__ == '__main__': # 实例化调度器对象 scheduler = BlockingScheduler() # 指定使用MySQL存储任务 url = 'mysql://root:123.com@127.0.0.1:3306/job?charset=utf8' scheduler.add_jobstore(jobstore=SQLAlchemyJobStore(url=url)) # 指定任务每10分钟执行一次 scheduler.add_job(my_task, trigger=IntervalTrigger(seconds=10, timezone="Asia/Shanghai")) # 开始执行定时任务 scheduler.start()
查看数据库表内容
# main.py # 导入调度器,此处使用BlockingScheduler阻塞调度器 from apscheduler.schedulers.blocking import BlockingScheduler # 导入触发器,此处使用IntervalTrigger特定时间间隔触发 from apscheduler.triggers.interval import IntervalTrigger # 导入任务存储器,此处使用MongoDBJobStore from apscheduler.jobstores.mongodb import MongoDBJobStore # 导入日志记录器 from loguru import logger # 导入MongoDB客户端 from pymongo import MongoClient # 定时任务执行函数 def my_task(): logger.info("开始执行任务") if __name__ == '__main__': # 实例化调度器对象 scheduler = BlockingScheduler() # 指定使用MongoDB存储任务 url = 'mongodb://127.0.0.1:27017/' scheduler.add_jobstore(jobstore=MongoDBJobStore(client=MongoClient(host=url))) # 指定任务每10分钟执行一次 scheduler.add_job(my_task, trigger=IntervalTrigger(seconds=10, timezone="Asia/Shanghai")) # 开始执行定时任务 scheduler.start()
查看数据库表内容
# main.py # 导入调度器,此处使用BlockingScheduler阻塞调度器 from apscheduler.schedulers.blocking import BlockingScheduler # 导入触发器,此处使用IntervalTrigger特定时间间隔触发 from apscheduler.triggers.interval import IntervalTrigger # 导入任务存储器,此处使用RedisJobStore from apscheduler.jobstores.redis import RedisJobStore # 导入日志记录器 from loguru import logger # 定时任务执行函数 def my_task(): logger.info("开始执行任务") if __name__ == '__main__': # 实例化调度器对象 scheduler = BlockingScheduler() # 指定使用redis存储任务 REDIS = { 'host': '127.0.0.1', 'port': '6379', 'db': 0, 'password': '123.com' } scheduler.add_jobstore(jobstore=RedisJobStore(**REDIS)) # 指定任务每10分钟执行一次 scheduler.add_job(my_task, trigger=IntervalTrigger(seconds=10, timezone="Asia/Shanghai")) # 开始执行定时任务 scheduler.start()
查询redis数据库内容
比如由于某个原因导致某个任务积攒了很多次没有执行(比如有一个任务是1分钟跑一次,但是系统原因断了5分钟),如果 coalesce=True,那么下次恢复运行的时候,会只执行一次,而如果设置 coalesce=False,那么就不会合并,会5次全部执行
比如一个10分钟的job,指定每分钟运行1次,如果max_instance=3,那么在第3~10分钟上,新的运行实例不会被执行,因为已经有3个实例在运行。
job的名称
这个参数的前提是使用可持续化的jobstore,如果使用默认内存的jobstore,这个参数是没有意义的。一般需要使用misfire_grace_time的场景,就是但是那个持久化jobstore的服务挂掉了,任务需要被调度的时候没有被调度成功,后期持久化的jobstore启动了,这个任务重新被调度了(从jobstore中获取job),misfire_grace_time决定这个任务在错过执行时间之后还需不需要执行
在使用之前我们需要先实例化一个 scheduler 对象,所有的 scheduler 对象都被放在了 apscheduler.schedulers 模块下,根据需求选择BlockingScheduler或者BackgroundScheduler调度器引入即可,此处以最基础的阻塞调度器BlockingScheduler为例:
from apscheduler.schedulers.blocking import BlockingScheduler scheduler = BlockingScheduler()
对于 scheduler 属性的配置,支持以下的方式灵活配置:
对于scheduler参数的配置,支持以下的方式灵活配置:
假设现在有这样一个需求:
组件 |
模块 |
需求 |
调度器(schedulers) |
阻塞调度器(BlockingScheduler) |
为新任务关闭合并模式 |
触发器(triggers) |
cron表达式触发器(CronTrigger) |
使用cron表达式,每分钟执行一次 |
执行器(executors) |
线程池执行器(ThreadPoolExecutor) |
最大10个线程 |
任务存储器(job stores) |
关系型的数据库(sqlalchemy ) |
将结果保存到MySQL数据库 |
接下来分别使用属性+参数的四种组合演示如何配置schedulers
# main.py # 导入线程池执行器 from apscheduler.executors.pool import ThreadPoolExecutor # 导入sqlalchemy,使用MySQL数据库存储 from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore # 导入阻塞调度器 from apscheduler.schedulers.blocking import BlockingScheduler # 导入cron表达式触发器 from apscheduler.triggers.cron import CronTrigger # 导入日志模块 from loguru import logger # 导入生成随机数模块 import random # 创建定时任务执行函数 def my_task(number): logger.info("开始执行任务,传入的随机数为%s" % number) # 作业存储器配置 使用MySQL数据库存储 job_stores = { 'default': { 'type': 'sqlalchemy', 'url': 'mysql://root:123.com@127.0.0.1:3306/job?charset=utf8' } } # 执行器配置 使用线程池执行器,最大10个线程 executors = { 'default': ThreadPoolExecutor(10), } # Job相关配置,更多选项参见官方文档 job_defaults = { 'coalesce': False, # 设置这个目的是,比如由于某个原因导致某个任务积攒了很多次没有执行(比如有一个任务是1分钟跑一次,但是系统原因断了5分钟), # 如果 coalesce=True,那么下次恢复运行的时候,会只执行一次,而如果设置 coalesce=False,那么就不会合并,会5次全部执行 'max_instances': 3 # 同一个任务同一时间最多只能有3个实例在运行。 # 比如一个10分钟的job,指定每分钟运行1次,如果max_instance=3,那么在第3~10分钟上,新的运行实例不会被执行,因为已经有3个实例在运行。 } # 实例化调度器 scheduler = BlockingScheduler( jobstores=job_stores, executors=executors, job_defaults=job_defaults, timezone='Asia/Shanghai' # 指定时区 ) if __name__ == '__main__': number = random.randint(0, 9) scheduler.add_job(my_task, trigger=CronTrigger.from_crontab("* * * * *"), args=[number]) try: scheduler.start() except (KeyboardInterrupt, SystemExit): logger.error("进程已结束运行")
# main.py # 导入线程池执行器 from apscheduler.executors.pool import ThreadPoolExecutor # 导入sqlalchemy,使用MySQL数据库存储 from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore # 导入阻塞调度器 from apscheduler.schedulers.blocking import BlockingScheduler # 导入cron表达式触发器 from apscheduler.triggers.cron import CronTrigger # 导入日志模块 from loguru import logger # 导入生成随机数模块 import random # 创建定时任务执行函数 def my_task(number): logger.info("开始执行任务,传入的随机数为%s" % number) config = { # 作业存储器配置 使用MySQL数据库存储 'apscheduler.jobstores.default': { 'type': 'sqlalchemy', 'url': 'mysql://root:123.com@127.0.0.1:3306/job?charset=utf8' }, # 执行器配置 使用线程池执行器,最大10个线程 'apscheduler.executors.default': { 'class': 'apscheduler.executors.pool:ThreadPoolExecutor', 'max_workers': '10' }, # Job配置,为新任务关闭合并模式 'apscheduler.job_defaults.coalesce': 'false', # Job配置,同一个任务同一时间最多只能有3个实例在运行 'apscheduler.job_defaults.max_instances': '3', # Job配置,指定时区 'apscheduler.timezone': 'Asia/Shanghai', } # 实例化调度器 scheduler = BlockingScheduler(config) if __name__ == '__main__': # 生成随机数传入定时任务函数 number = random.randint(0, 9) # 注册定时任务job,执行频率为每分钟执行一次 scheduler.add_job(my_task, trigger=CronTrigger.from_crontab("* * * * *"), args=[number]) try: # 开始执行定时任务调度器 scheduler.start() except (KeyboardInterrupt, SystemExit): logger.error("进程已结束运行")
# main.py # 导入线程池执行器 from apscheduler.executors.pool import ThreadPoolExecutor # 导入sqlalchemy,使用MySQL数据库存储 from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore # 导入阻塞调度器 from apscheduler.schedulers.blocking import BlockingScheduler # 导入cron表达式触发器 from apscheduler.triggers.cron import CronTrigger # 导入日志模块 from loguru import logger # 导入生成随机数模块 import random # 创建定时任务执行函数 def my_task(number): logger.info("开始执行任务,传入的随机数为%s" % number) # 实例化调度器 scheduler = BlockingScheduler() # 作业存储器配置 使用MySQL数据库存储 url = 'mysql://root:123.com@127.0.0.1:3306/job?charset=utf8' # executors = { 'default': ThreadPoolExecutor(10), } # Job相关配置,更多选项参见官方文档 job_defaults = { 'coalesce': False, # 设置这个目的是,比如由于某个原因导致某个任务积攒了很多次没有执行(比如有一个任务是1分钟跑一次,但是系统原因断了5分钟), # 如果 coalesce=True,那么下次恢复运行的时候,会只执行一次,而如果设置 coalesce=False,那么就不会合并,会5次全部执行 'max_instances': 3 # 同一个任务同一时间最多只能有3个实例在运行。 # 比如一个10分钟的job,指定每分钟运行1次,如果max_instance=3,那么在第3~10分钟上,新的运行实例不会被执行,因为已经有3个实例在运行。 } # 调度器对象配置参数 scheduler.configure( job_defaults=job_defaults, timezone='Asia/Shanghai') # 添加任务存储器参数 scheduler.add_jobstore(jobstore=SQLAlchemyJobStore(url=url)) # 添加执行器参数,使用线程池执行器,最大10个线程 scheduler.add_executor(executor=ThreadPoolExecutor(max_workers=10)) if __name__ == '__main__': # 生成随机数传入定时任务函数 number = random.randint(0, 9) # 注册定时任务job,执行频率为每分钟执行一次 scheduler.add_job(my_task, trigger=CronTrigger.from_crontab("* * * * *"), args=[number]) try: # 开始执行定时任务调度器 scheduler.start() except (KeyboardInterrupt, SystemExit): logger.error("进程已结束运行")
# main.py # 导入线程池执行器 from apscheduler.executors.pool import ThreadPoolExecutor # 导入sqlalchemy,使用MySQL数据库存储 from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore # 导入阻塞调度器 from apscheduler.schedulers.blocking import BlockingScheduler # 导入cron表达式触发器 from apscheduler.triggers.cron import CronTrigger # 导入日志模块 from loguru import logger # 导入生成随机数模块 import random # 创建定时任务执行函数 def my_task(number): logger.info("开始执行任务,传入的随机数为%s" % number) # 实例化调度器 scheduler = BlockingScheduler() config = { # 作业存储器配置 使用MySQL数据库存储 'apscheduler.jobstores.default': { 'type': 'sqlalchemy', 'url': 'mysql://root:123.com@127.0.0.1:3306/job?charset=utf8' }, # 执行器配置 使用线程池执行器,最大10个线程 'apscheduler.executors.default': { 'class': 'apscheduler.executors.pool:ThreadPoolExecutor', 'max_workers': '10' }, # Job配置,为新任务关闭合并模式 'apscheduler.job_defaults.coalesce': 'false', # Job配置,同一个任务同一时间最多只能有3个实例在运行 'apscheduler.job_defaults.max_instances': '3', # Job配置,指定时区 'apscheduler.timezone': 'Asia/Shanghai', } # 调度器对象配置参数 scheduler.configure(config) if __name__ == '__main__': # 生成随机数传入定时任务函数 number = random.randint(0, 9) # 注册定时任务job,执行频率为每分钟执行一次 scheduler.add_job(my_task, trigger=CronTrigger.from_crontab("* * * * *"), args=[number]) try: # 开始执行定时任务调度器 scheduler.start() except (KeyboardInterrupt, SystemExit): logger.error("进程已结束运行")
创建 scheduler 对象之后,我们需要调用add_job()方法或是装饰器scheduled_job方法来将我们需要执行的函数进行注册。
以上面的demo代码为例,使用add_job()方法,指定执行任务函数,触发器既可添加调度任务。
调用 start() 方法之后调度器就会开始执行,并在控制台上看到对应的结果
2022-08-22 07:53:00.019 | INFO | __main__:my_task:11 - 开始执行任务,生成随机数为7
add_job()是以传参的形式指定对应的函数名这种方法是最常用的,推荐使用此方法。此方法会返回一个apscheduler.job.Job实例,这样就可以在运行时,修改或删除任务。
scheduled_job() 是以装饰器的形式直接对我们要执行的函数进行修饰,这种方法最方便,但缺点就是运行时,不能修改任务。
# main.py # 导入调度器,此处使用BlockingScheduler阻塞调度器 from apscheduler.schedulers.blocking import BlockingScheduler # 导入触发器,此处使用IntervalTrigger特定时间间隔触发 from apscheduler.triggers.interval import IntervalTrigger # 导入日志记录器 from loguru import logger # 实例化调度器对象 scheduler = BlockingScheduler() # 定时任务执行函数 @scheduler.scheduled_job(trigger="interval", args=(1,), seconds=3) def my_task(number): logger.info("开始执行任务,传入的参数是%s" % number) if __name__ == '__main__': try: # 开始执行定时任务调度器 scheduler.start() except (KeyboardInterrupt, SystemExit): logger.error("进程已结束运行")
重要提醒:
可以使用get_jobs方法来获得机器上可处理的作业调度列表。方法会返回一个Job实例的列表,如果你仅仅对特定的 job store 中的 job 感兴趣,可以将 job store 的别名作为第二个参数。
也可以使用print_jobs()来格式化输出作业列表以及它们的触发器和下一次的运行时间。
# main.py # 导入调度器,此处使用BackgroundScheduler阻塞调度器 from apscheduler.schedulers.background import BackgroundScheduler # 导入触发器,此处使用IntervalTrigger特定时间间隔触发 from apscheduler.triggers.interval import IntervalTrigger # 导入日志记录器 from loguru import logger import time # 定时任务执行函数 def my_task(): logger.info("执行任务") if __name__ == '__main__': try: # 实例化调度器对象 scheduler = BackgroundScheduler() # 添加定时任务,指定任务函数和触发器 my_job = scheduler.add_job(my_task, trigger=IntervalTrigger(seconds=1)) # 开始执行定时任务调度器 scheduler.start() scheduler.print_jobs() print(scheduler.get_jobs()) except (KeyboardInterrupt, SystemExit): logger.error("进程已结束运行") # 运行结果 Jobstore default: my_task (trigger: interval[0:00:01], next run at: 2022-08-28 21:36:24 CST) [<Job (id=2e1b64f0422e4f2a9163ad2b7e634bd5 name=my_task)>]
当从 scheduler 中移除一个 job 时,它会从关联的 job store 中被移除,不再被执行。如果想从调度器移除一个任务,那么你就要从相应的任务储存器中移除它,这样才算移除了。有两种方式:
# 导入调度器,此处使用BlockingScheduler阻塞调度器 from apscheduler.schedulers.background import BackgroundScheduler # 导入触发器,此处使用IntervalTrigger特定时间间隔触发 from apscheduler.triggers.interval import IntervalTrigger # 导入日志记录器 from loguru import logger import time # 实例化调度器对象 scheduler = BackgroundScheduler() # 定时任务执行函数 @scheduler.scheduled_job(trigger="interval", seconds=1, id="my_task") def my_task(): logger.info("开始执行任务") if __name__ == '__main__': try: # 开始执行定时任务调度器 logger.error("开始定时任务") scheduler.start() time.sleep(3) logger.error("删除定时任务") scheduler.remove_job(job_id="my_task") except (KeyboardInterrupt, SystemExit): logger.error("进程已结束运行")
# main.py # 导入调度器,此处使用BackgroundScheduler阻塞调度器 from apscheduler.schedulers.background import BackgroundScheduler # 导入触发器,此处使用IntervalTrigger特定时间间隔触发 from apscheduler.triggers.interval import IntervalTrigger # 导入日志记录器 from loguru import logger import time # 定时任务执行函数 def my_task(): logger.info("开始执行任务") if __name__ == '__main__': try: # 实例化调度器对象 scheduler = BackgroundScheduler() # 添加定时任务,指定任务函数和触发器 my_job = scheduler.add_job(my_task, trigger=IntervalTrigger(seconds=1)) # 开始执行定时任务调度器 logger.error("开始定时任务") scheduler.start() time.sleep(3) logger.error("删除定时任务") my_job.remove() except (KeyboardInterrupt, SystemExit): logger.error("进程已结束运行")
2022-08-28 21:25:26.458 | ERROR | __main__:<module>:24 - 开始定时任务 2022-08-28 21:25:27.458 | INFO | __main__:my_task:17 - 开始执行任务 2022-08-28 21:25:28.461 | INFO | __main__:my_task:17 - 开始执行任务 2022-08-28 21:25:29.461 | INFO | __main__:my_task:17 - 开始执行任务 2022-08-28 21:25:29.463 | ERROR | __main__:<module>:27 - 删除定时任务
注意点:如果使用BlockingScheduler调度器的话,在其start之后的任何操作都不会去执行。因此想要修改删除任务,必须使用BackgroundScheduler。
通过Job实例或者 scheduler 本身你可以轻易地暂停和恢复 job 。当一个 job 被暂停,它的下一次运行时间将会被清空,同时不再计算之后的运行时间,直到这个 job 被恢复。
# main.py # 导入调度器,此处使用BackgroundScheduler阻塞调度器 from apscheduler.schedulers.background import BackgroundScheduler # 导入触发器,此处使用IntervalTrigger特定时间间隔触发 from apscheduler.triggers.interval import IntervalTrigger # 导入日志记录器 from loguru import logger import time # 定时任务执行函数 def my_task(): logger.info("执行任务") if __name__ == '__main__': try: # 实例化调度器对象 scheduler = BackgroundScheduler() # 添加定时任务,指定任务函数和触发器 my_job = scheduler.add_job(my_task, trigger=IntervalTrigger(seconds=1)) # 开始执行定时任务调度器 logger.error("开始定时任务") scheduler.start() time.sleep(3) logger.error("暂停定时任务") my_job.pause() time.sleep(3) logger.error("恢复定时任务") my_job.resume() time.sleep(3) except (KeyboardInterrupt, SystemExit): logger.error("进程已结束运行")
# 导入调度器,此处使用BlockingScheduler阻塞调度器 from apscheduler.schedulers.background import BackgroundScheduler # 导入触发器,此处使用IntervalTrigger特定时间间隔触发 from apscheduler.triggers.interval import IntervalTrigger # 导入日志记录器 from loguru import logger import time # 实例化调度器对象 scheduler = BackgroundScheduler() # 定时任务执行函数 @scheduler.scheduled_job(trigger="interval", seconds=1, id="my_task") def my_task(): logger.info("执行任务") if __name__ == '__main__': try: logger.error("开始定时任务") # 开始执行定时任务调度器 scheduler.start() time.sleep(3) logger.error("暂停定时任务") scheduler.pause_job(job_id="my_task") time.sleep(3) logger.error("恢复定时任务") scheduler.resume_job(job_id="my_task") time.sleep(3) except (KeyboardInterrupt, SystemExit): logger.error("进程已结束运行")
2022-08-28 21:27:59.955 | ERROR | __main__:<module>:24 - 开始定时任务 2022-08-28 21:28:00.959 | INFO | __main__:my_task:17 - 执行任务 2022-08-28 21:28:01.959 | INFO | __main__:my_task:17 - 执行任务 2022-08-28 21:28:02.958 | INFO | __main__:my_task:17 - 执行任务 2022-08-28 21:28:02.960 | ERROR | __main__:<module>:27 - 暂停定时任务 2022-08-28 21:28:05.965 | ERROR | __main__:<module>:30 - 恢复定时任务 2022-08-28 21:28:06.959 | INFO | __main__:my_task:17 - 执行任务 2022-08-28 21:28:07.957 | INFO | __main__:my_task:17 - 执行任务 2022-08-28 21:28:08.959 | INFO | __main__:my_task:17 - 执行任务
apscheduler支持修改job 的属性,例如max_instances,coalesce等属性信息。
# main.py # 导入调度器,此处使用BackgroundScheduler阻塞调度器 from apscheduler.schedulers.background import BackgroundScheduler # 导入触发器,此处使用IntervalTrigger特定时间间隔触发 from apscheduler.triggers.interval import IntervalTrigger # 导入日志记录器 from loguru import logger import time # 定时任务执行函数 def my_task(): logger.info("执行task任务") if __name__ == '__main__': try: # 实例化调度器对象 scheduler = BackgroundScheduler() # 添加定时任务,指定任务函数和触发器 my_job = scheduler.add_job(my_task, trigger=IntervalTrigger(seconds=1)) logger.error("开始定时任务") # 开始执行定时任务调度器 scheduler.start() time.sleep(3) logger.error("修改定时任务属性") my_job.modify(max_instances=3, name='new task') time.sleep(3) except (KeyboardInterrupt, SystemExit): logger.error("进程已结束运行")
# 导入调度器,此处使用BlockingScheduler阻塞调度器 from apscheduler.schedulers.background import BackgroundScheduler # 导入触发器,此处使用IntervalTrigger特定时间间隔触发 from apscheduler.triggers.interval import IntervalTrigger # 导入日志记录器 from loguru import logger import time # 实例化调度器对象 scheduler = BackgroundScheduler() # 定时任务执行函数 @scheduler.scheduled_job(trigger="interval", seconds=1, id="my_task") def my_task(): logger.info("执行任务") if __name__ == '__main__': try: logger.error("开始定时任务") # 开始执行定时任务调度器 scheduler.start() time.sleep(3) logger.error("修改定时任务属性") scheduler.modify_job(job_id="my_task") time.sleep(3) except (KeyboardInterrupt, SystemExit): logger.error("进程已结束运行")
2022-08-28 21:48:07.075 | ERROR | __main__:<module>:22 - 开始定时任务 2022-08-28 21:48:08.078 | INFO | __main__:my_task:13 - 执行task任务 2022-08-28 21:48:09.078 | INFO | __main__:my_task:13 - 执行task任务 2022-08-28 21:48:10.076 | INFO | __main__:my_task:13 - 执行task任务 2022-08-28 21:48:10.080 | ERROR | __main__:<module>:26 - 修改定时任务属性 2022-08-28 21:48:11.078 | INFO | __main__:my_task:13 - 执行task任务 2022-08-28 21:48:12.078 | INFO | __main__:my_task:13 - 执行task任务 2022-08-28 21:48:13.077 | INFO | __main__:my_task:13 - 执行task任务
如果你想重新调度一个 job (这意味着要修改其 trigger),你可以使用apscheduler.job.Job.reschedule()或reschedule_job()方法。这些方法都会为 job 构建新的 trigger ,然后根据新的 trigger 重新计算其下一次的运行时间:
# main.py # 导入调度器,此处使用BackgroundScheduler阻塞调度器 from apscheduler.schedulers.background import BackgroundScheduler # 导入触发器,此处使用IntervalTrigger特定时间间隔触发 from apscheduler.triggers.interval import IntervalTrigger # 导入日志记录器 from loguru import logger import time # 定时任务执行函数 def my_task(): logger.info("执行task任务") if __name__ == '__main__': try: # 实例化调度器对象 scheduler = BackgroundScheduler() # 添加定时任务,指定任务函数和触发器 my_job = scheduler.add_job(my_task, trigger=IntervalTrigger(seconds=1)) logger.error("开始定时任务") # 开始执行定时任务调度器 scheduler.start() time.sleep(3) logger.error("修改定时任务触发器") my_job.reschedule(trigger=IntervalTrigger(seconds=2)) time.sleep(3) except (KeyboardInterrupt, SystemExit): logger.error("进程已结束运行")
# 导入调度器,此处使用BlockingScheduler阻塞调度器 from apscheduler.schedulers.background import BackgroundScheduler # 导入触发器,此处使用IntervalTrigger特定时间间隔触发 from apscheduler.triggers.interval import IntervalTrigger # 导入日志记录器 from loguru import logger import time # 实例化调度器对象 scheduler = BackgroundScheduler() # 定时任务执行函数 @scheduler.scheduled_job(trigger="interval", seconds=1, id="my_task") def my_task(): logger.info("执行任务") if __name__ == '__main__': try: logger.error("开始定时任务") # 开始执行定时任务调度器 scheduler.start() time.sleep(3) logger.error("修改定时任务属性") scheduler.reschedule_job(job_id="my_task") time.sleep(3) except (KeyboardInterrupt, SystemExit): logger.error("进程已结束运行")
2022-08-28 22:14:03.958 | ERROR | __main__:<module>:22 - 开始定时任务 2022-08-28 22:14:04.960 | INFO | __main__:my_task:13 - 执行task任务 2022-08-28 22:14:05.960 | INFO | __main__:my_task:13 - 执行task任务 2022-08-28 22:14:06.959 | INFO | __main__:my_task:13 - 执行task任务 2022-08-28 22:14:06.959 | ERROR | __main__:<module>:26 - 修改定时任务触发器 2022-08-28 22:14:08.963 | INFO | __main__:my_task:13 - 执行task任务
默认情况,会终止任务存储器以及执行器,然后等待所有目前执行的job完成后(自动终止)
如果使用wait=False,不会等待任何运行中的任务完成,直接终止
# main.py # 导入调度器,此处使用BackgroundScheduler阻塞调度器 from apscheduler.schedulers.background import BackgroundScheduler # 导入触发器,此处使用IntervalTrigger特定时间间隔触发 from apscheduler.triggers.interval import IntervalTrigger # 导入日志记录器 from loguru import logger import time # 定时任务执行函数 def my_task(): logger.info("开始执行task任务") time.sleep(2) logger.info("task任务执行完成") if __name__ == '__main__': try: # 实例化调度器对象 scheduler = BackgroundScheduler() # 添加定时任务,指定任务函数和触发器 my_job = scheduler.add_job(my_task, trigger=IntervalTrigger(seconds=3)) logger.error("开始定时任务") # 开始执行定时任务调度器 scheduler.start() time.sleep(6) logger.error("终止调度器") scheduler.shutdown() logger.error(scheduler.get_jobs()) except (KeyboardInterrupt, SystemExit): logger.error("进程已结束运行")
2022-08-28 22:41:46.859 | ERROR | __main__:<module>:24 - 开始定时任务 2022-08-28 22:41:49.864 | INFO | __main__:my_task:13 - 开始执行task任务 2022-08-28 22:41:51.865 | INFO | __main__:my_task:15 - task任务执行完成 2022-08-28 22:41:52.860 | INFO | __main__:my_task:13 - 开始执行task任务 2022-08-28 22:41:52.861 | ERROR | __main__:<module>:28 - 终止调度器 2022-08-28 22:41:54.863 | INFO | __main__:my_task:15 - task任务执行完成 2022-08-28 22:41:54.864 | ERROR | __main__:<module>:30 - []
# main.py # 导入调度器,此处使用BackgroundScheduler阻塞调度器 from apscheduler.schedulers.background import BackgroundScheduler # 导入触发器,此处使用IntervalTrigger特定时间间隔触发 from apscheduler.triggers.interval import IntervalTrigger # 导入日志记录器 from loguru import logger import time # 定时任务执行函数 def my_task(): logger.info("开始执行task任务") time.sleep(2) logger.info("task任务执行完成") if __name__ == '__main__': try: # 实例化调度器对象 scheduler = BackgroundScheduler() # 添加定时任务,指定任务函数和触发器 my_job = scheduler.add_job(my_task, trigger=IntervalTrigger(seconds=3)) logger.error("开始定时任务") # 开始执行定时任务调度器 scheduler.start() time.sleep(6) logger.error("终止调度器") scheduler.shutdown(wait=False) logger.error(scheduler.get_jobs()) except (KeyboardInterrupt, SystemExit): logger.error("进程已结束运行")
2022-08-28 22:43:40.258 | ERROR | __main__:<module>:24 - 开始定时任务 2022-08-28 22:43:43.259 | INFO | __main__:my_task:13 - 开始执行task任务 2022-08-28 22:43:45.261 | INFO | __main__:my_task:15 - task任务执行完成 2022-08-28 22:43:46.260 | INFO | __main__:my_task:13 - 开始执行task任务 2022-08-28 22:43:46.260 | ERROR | __main__:<module>:28 - 终止调度器 2022-08-28 22:43:46.260 | ERROR | __main__:<module>:30 - [] 2022-08-28 22:43:48.263 | INFO | __main__:my_task:15 - task任务执行完成
可以为 scheduler 绑定事件监听器(event listen)。Scheduler 事件在某些情况下会被触发,而且它可能携带有关特定事件的细节信息。为add_listener()函数提供适当的掩码参数(mask argument)或者是将不同的常数组合到一起,可以监听特定类型的事件。可调用的listener可以通过event object作为参数而被调用。
事件 |
对应枚举值 |
描述 |
归属类 |
EVENT_SCHEDULER_STARTED |
1 |
调度程序启动 |
SchedulerEvent |
EVENT_SCHEDULER_SHUTDOWN |
2 |
调度程序关闭 |
SchedulerEvent |
EVENT_SCHEDULER_PAUSED |
4 |
调度程序中任务处理暂停 |
SchedulerEvent |
EVENT_SCHEDULER_RESUMED |
8 |
调度程序中任务处理恢复 |
SchedulerEvent |
EVENT_EXECUTOR_ADDED |
16 |
将执行器添加到调度程序中 |
SchedulerEvent |
EVENT_EXECUTOR_REMOVED |
32 |
执行器从调度程序中删除 |
SchedulerEvent |
EVENT_JOBSTORE_ADDED |
64 |
将任务存储添加到调度程序中 |
SchedulerEvent |
EVENT_JOBSTORE_REMOVED |
128 |
任务存储从调度程序中删除 |
SchedulerEvent |
EVENT_ALL_JOBS_REMOVED |
256 |
所有任务从所有任务存储中删除或从一个特定的任务存储中删除 |
SchedulerEvent |
EVENT_JOB_ADDED |
512 |
任务添加到任务存储中 |
JobEvent |
EVENT_JOB_REMOVED |
1024 |
从任务存储中删除了任务 |
JobEvent |
EVENT_JOB_MODIFIED |
2048 |
从调度程序外部修改了任务 |
JobEvent |
EVENT_JOB_EXECUTED |
4096 |
任务被成功执行 |
JobExecutionEvent |
EVENT_JOB_ERROR |
8192 |
任务在执行期间引发异常 |
JobExecutionEvent |
EVENT_JOB_MISSED |
16384 |
错过了任务执行 |
JobExecutionEvent |
EVENT_JOB_SUBMITTED |
32768 |
任务已经提交到执行器中执行 |
JobSubmissionEvent |
EVENT_JOB_MAX_INSTANCES |
65536 |
任务因为达到最大并发执行时,触发的事件 |
JobSubmissionEvent |
EVENT_ALL |
包含以上的所有事件 |
# main.py # 导入调度器,此处使用BackgroundScheduler阻塞调度器 from apscheduler.schedulers.background import BackgroundScheduler # 导入触发器,此处使用IntervalTrigger特定时间间隔触发 from apscheduler.triggers.interval import IntervalTrigger # 导入事件类 from apscheduler.events import EVENT_ALL # 导入日志记录器 from loguru import logger import time # 定时任务执行函数 def my_task(): logger.info("执行task任务") # 事件监听函数 def my_listener(event): match event.code: case 4096: logger.info("任务被成功执行") case 32768: logger.info("任务已经提交到执行器中执行") case _: logger.info(event.code) if __name__ == '__main__': try: # 实例化调度器对象 scheduler = BackgroundScheduler() # 添加定时任务,指定任务函数和触发器 my_job = scheduler.add_job(my_task, trigger=IntervalTrigger(seconds=2)) logger.error("开始定时任务") # 开始执行定时任务调度器 scheduler.start() scheduler.add_listener(my_listener, mask=EVENT_ALL) time.sleep(4) except (KeyboardInterrupt, SystemExit): logger.error("进程已结束运行")
2022-08-28 22:59:58.734 | ERROR | __main__:<module>:34 - 开始定时任务 2022-08-28 23:00:00.739 | INFO | __main__:my_task:14 - 执行task任务 2022-08-28 23:00:00.739 | INFO | __main__:my_listener:21 - 任务被成功执行 2022-08-28 23:00:00.740 | INFO | __main__:my_listener:23 - 任务已经提交到执行器中执行 2022-08-28 23:00:02.739 | INFO | __main__:my_listener:23 - 任务已经提交到执行器中执行 2022-08-28 23:00:02.740 | INFO | __main__:my_task:14 - 执行task任务 2022-08-28 23:00:02.740 | INFO | __main__:my_listener:21 - 任务被成功执行
如果 scheduler 没有如预期般正常运行,可以尝试将apscheduler的 logger 的日志级别提升到DEBUG等级。
# main.py # 导入调度器,此处使用BackgroundScheduler阻塞调度器 from apscheduler.schedulers.background import BackgroundScheduler # 导入触发器,此处使用IntervalTrigger特定时间间隔触发 from apscheduler.triggers.interval import IntervalTrigger # 导入事件类 from apscheduler.events import EVENT_ALL # 导入日志记录器 from loguru import logger import time import logging logging.basicConfig() logging.getLogger('apscheduler').setLevel(logging.DEBUG) # 定时任务执行函数 def my_task(): logger.info("执行task任务") if __name__ == '__main__': try: # 实例化调度器对象 scheduler = BackgroundScheduler() # 添加定时任务,指定任务函数和触发器 my_job = scheduler.add_job(my_task, trigger=IntervalTrigger(seconds=2)) logger.error("开始定时任务") # 开始执行定时任务调度器 scheduler.start() time.sleep(3) except (KeyboardInterrupt, SystemExit): logger.error("进程已结束运行")
INFO:apscheduler.scheduler:Adding job tentatively -- it will be properly scheduled when the scheduler starts 2022-08-28 23:11:43.561 | ERROR | __main__:<module>:28 - 开始定时任务 INFO:apscheduler.scheduler:Added job "my_task" to job store "default" INFO:apscheduler.scheduler:Scheduler started DEBUG:apscheduler.scheduler:Looking for jobs to run DEBUG:apscheduler.scheduler:Next wakeup is due at 2022-08-28 23:11:45.+08:00 (in 1. seconds) DEBUG:apscheduler.scheduler:Looking for jobs to run INFO:apscheduler.executors.default:Running job "my_task (trigger: interval[0:00:02], next run at: 2022-08-28 23:11:45 CST)" (scheduled at 2022-08-28 23:11:45.+08:00) 2022-08-28 23:11:45.564 | INFO | __main__:my_task:19 - 执行task任务 INFO:apscheduler.executors.default:Job "my_task (trigger: interval[0:00:02], next run at: 2022-08-28 23:11:45 CST)" executed successfully DEBUG:apscheduler.scheduler:Next wakeup is due at 2022-08-28 23:11:47.+08:00 (in 1. seconds)
方法 |
描述 |
异常 |
configure |
对给定的调度程序重新设置配置 |
SchedulerAlreadyRunningError –如果调度程序已经在运行 |
start |
启动已配置的执行程序和任务存储,并开始处理计划的任务。 |
SchedulerAlreadyRunningError –如果调度程序已经在运行 RuntimeError –如果在禁用线程的uWSGI下运行 |
shutdown |
关闭调度程序及其执行程序和任务存储。 |
SchedulerNotRunningError –如果尚未启动调度程序 |
pause |
暂停调度程序中的任务处理。 |
|
resume |
在调度程序中恢复任务处理。 |
|
wakeup |
通知调度程序可能有任务需要执行。 |
方法 |
描述 |
异常 |
add_executor |
将执行程序添加到此调度程序。 |
ValueError –如果已有给定别名的执行程序 |
remove_executor |
从此调度程序中删除具有给定别名的执行程序。 |
方法 |
描述 |
异常 |
add_jobstore |
将任务存储添加到此调度程序。 |
ValueError –如果已经存在给定别名的任务存储 |
remove_jobstore |
从此调度程序中通过给定别名删除任务存储。 |
方法 |
描述 |
add_listener |
添加调度程序事件的侦听器。 |
remove_listener |
删除以前添加的事件侦听器。 |
Job 的配置参数和前面 Job 里介绍的差不多, BaseScheduler 只是将 Job 的接口重新封装了
但是也实现了很多任务是如何添加到任务存储,任务是如何分配给执行器等等实现,所以在 Job 那一部分提到,官方是不希望由用户自己实例化 Job
方法 |
描述 |
异常 |
add_job |
将给定的任务添加到任务列表中,如果调度程序已经在运行,则将其唤醒。 |
|
scheduled_job |
使用 scheduled_job 装饰器来动态装饰 Job 的实际函数 |
|
modify_job |
修改单个任务的属性。 |
|
reschedule_job |
为任务构造一个新触发器,并更新其下一个运行时间。 |
|
pause_job |
使给定任务在明确恢复之前不执行。 |
|
resume_job |
恢复给定任务的计划,如果计划已完成,则将其删除。 |
|
get_jobs |
从特定的任务存储或所有任务中返回挂起的任务 |
|
get_job |
返回与给定 job_id 匹配的 Job |
|
remove_job |
删除任务,使其无法再运行。 |
JobLookupError – 如果没有找到任务 |
remove_all_jobs |
从指定的任务存储中删除所有任务,或者如果没有给出任何任务,则删除所有任务存储。 |
|
print_jobs |
打印出当前计划在所有任务存储库或仅特定任务存储库上的所有任务的文本列表。 |
如果你正在开发的Web项目需要实现定时任务的功能,得益于 APScheduler支持多样的调度器,我们可以很容易的将APScheduler和我们的DRF项目结合到一起。
在这里强烈推荐使用django-apscheduler库,对比APScheduler,他添加了以下功能:
pip install django-apscheduler
# 注册app INSTALLED_APPS = ( # ... "django_apscheduler", ) # apscheduler全局配置 APSCHEDULER_DATETIME_FORMAT = "N j, Y, f:s a" # Django admin中显示带秒的时间 APSCHEDULER_RUN_NOW_TIMEOUT = 25 # admin手动触发的作业最大运行时间
python manage.py migrate
django_apscheduler_djangojob:用于存放任务列表
django_apscheduler_djangojobexecution:用于存放任务执行历史
由于在生产环境通常会使用uwsgi启动多个进程运行服务,会导致每个工作进程都有自己独立的定时任务,最终会使得定时任务多次重复执行,因此,在Django中使用apscheduler时,推荐使用自定义命令,在一个单独的专用进程中执行单个定时任务。
添加自定义命令到项目中
创建一个名为public的APP并注册,然后在public目录里面创建commands的python文件夹,最后在commands文件夹下创建crontab.py文件,文件目录结构如下所示:
我们使用BlockingScheduler后台调度器,并使用Django ORM作为任务存储器,并添加了一个my_job的定时任务和一个清理过期记录的定时任务,自定义命令crontab内容如下:
from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.triggers.interval import IntervalTrigger from django.core.management.base import BaseCommand from django_apscheduler import util from apscheduler.triggers.cron import CronTrigger from django_apscheduler.jobstores import DjangoJobStore from django_apscheduler.models import DjangoJobExecution from django.conf import settings from loguru import logger def my_job(): logger.info('定时任务开始执行') logger.info('定时任务执行完毕') @util.close_old_connections def delete_old_job_executions(max_age=604_800): DjangoJobExecution.objects.delete_old_job_executions(max_age) class Command(BaseCommand): help = "Runs APScheduler." def handle(self, *args, **options): scheduler = BlockingScheduler(timezone=settings.TIME_ZONE) scheduler.add_jobstore(DjangoJobStore(), "default") scheduler.add_job( my_job, trigger=IntervalTrigger(seconds=1, timezone=settings.TIME_ZONE), # Every 10 seconds id="my_job", # The `id` assigned to each job MUST be unique max_instances=5, replace_existing=True, misfire_grace_time=60 ) logger.info("添加my_job任务成功") scheduler.add_job( delete_old_job_executions, trigger=CronTrigger( day_of_week="mon", hour="00", minute="00" ), # Midnight on Monday, before start of the next work week. id="delete_old_job_executions", max_instances=1, replace_existing=True ) logger.info("添加delete_old_job_executions任务成功") try: logger.info("scheduler开始执行...") scheduler.start() except KeyboardInterrupt: logger.info("scheduler停止执行...") scheduler.shutdown() logger.info("Scheduler成功停止!")
运行自定义命令并查看结果
(venv) ➜ drf_apscheduler git:(master) ✗ python manage.py crontab 2022-08-29 08:56:47.506 | INFO | public.management.commands.crontab:handle:37 - 添加my_job任务成功 2022-08-29 08:56:47.508 | INFO | public.management.commands.crontab:handle:48 - 添加delete_old_job_executions任务成功 2022-08-29 08:56:47.508 | INFO | public.management.commands.crontab:handle:51 - scheduler开始执行... 2022-08-29 08:56:48.512 | INFO | public.management.commands.crontab:my_job:13 - 定时任务开始执行 2022-08-29 08:56:48.513 | INFO | public.management.commands.crontab:my_job:14 - 定时任务执行完毕
查看admin管理页任务详情
虽然使用django-apscheduler的admin实现了任务历史记录查看、手动执行、删除等操作,但是在实际前后端分离开发过程中,需要提供相关的API接口供前端调用,因此还需要在django-apscheduler的基础上做二次开发,扩展相关功能。
模型分析(venv/lib/python3.10/site-packages/django_apscheduler/models.py):
可以看到一共有两个模型,一个是DjangoJob,用于存放定时任务id和下次运行时间。另一个是DjangoJobExecutionManager,用户存放定时任务执行历史记录。
admin分析(venv/lib/python3.10/site-packages/django_apscheduler/admin.py):
主要关注run_selected_jobsh函数,他实现了手动执行选定的任务的功能。
关键函数分析(venv/lib/python3.10/site-packages/django_apscheduler/jobstores.py):
主要关注DjangoJobStore这个类。DjangoJobStore实现了任务存储器使用Django数据库的功能,同时还封装了一些任务的修改、删除、查询等方法
对django-apscheduler源码分析后可发现,对定时任务系统的需求都可以使用django-apscheduler来实现,梳理一下各个功能模块的开发思路:
路由配置(public/urls.py)
from rest_framework import routers from public import views from django.urls import path app_name = "public" urlpatterns = [ # 定时作业暂停/恢复 path('job_pause/<str:job_id>/', views.JobPauseAPIView.as_view(), name='job_pause'), # 更改定时作业触发器 path('job_triggers/<str:job_id>/', views.JobTriggersAPIView.as_view(), name='job_triggers'), # 立即执行一次定时作业 path('job_run/<str:job_id>/', views.JobRunAPIView.as_view(), name='job_triggers') ] router.register('user', views.UserDemoModelViewSet, 'user') # 获取定时任务执行历史记录 router.register('job_history', views.JobHistoryReadOnlyModelViewSet, 'job_history') # 获取定时任务列表 router.register('job', views.JobModelViewSet, 'job') urlpatterns += router.urls
视图配置(public/views.py)
from django.shortcuts import render from django_apscheduler.models import DjangoJobExecution, DjangoJob from rest_framework import viewsets, status from rest_framework.response import Response from rest_framework.views import APIView from public.serializers import DjangoJobExecutionSerializer, \ DjangoJobSerializer from public.utils import MyPageNumber from apscheduler.triggers.cron import CronTrigger from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.job import Job as AppSchedulerJob from django_apscheduler.jobstores import DjangoJobStore, DjangoMemoryJobStore django_job_store = DjangoJobStore() class JobHistoryReadOnlyModelViewSet(viewsets.ReadOnlyModelViewSet): """ 定时作业执行历史 """ queryset = DjangoJobExecution.objects.all() serializer_class = DjangoJobExecutionSerializer pagination_class = MyPageNumber class JobModelViewSet(viewsets.ModelViewSet): """ 定时作业列表 """ queryset = DjangoJob.objects.all() serializer_class = DjangoJobSerializer # 重写删除方法 def perform_destroy(self, instance): django_job_store.remove_job(instance.id) class JobPauseAPIView(APIView): """ 定时作业暂停/恢复 """ @staticmethod def post(request, job_id): action = request.data.get('action') job: AppSchedulerJob = django_job_store.lookup_job(job_id) if action == 'pause': job.next_run_time = None django_job_store.update_job(job) result = {'id': job_id, 'message': '任务暂停成功!'} else: job_state = job.__getstate__() del job_state['next_run_time'] scheduler = BackgroundScheduler() scheduler.add_jobstore(django_job_store) scheduler.add_job(replace_existing=True, **job_state) scheduler.start() result = {'id': job_id, 'message': '任务恢复成功!'} return Response(result, status=status.HTTP_200_OK) class JobTriggersAPIView(APIView): """ 更改定时作业触发器 """ @staticmethod def post(request, job_id): crontab_exp = request.data.get('crontab_exp') job: AppSchedulerJob = django_job_store.lookup_job(job_id) job.trigger = CronTrigger.from_crontab(crontab_exp) django_job_store.update_job(job) result = {'id': job_id, 'message': '修改定时任务触发器成功!'} return Response(result, status=status.HTTP_200_OK) class JobRunAPIView(APIView): """ 立即手动执行一次定时作业 """ @staticmethod def post(request, job_id): scheduler = BackgroundScheduler() scheduler.add_jobstore(DjangoMemoryJobStore()) scheduler.start() job: AppSchedulerJob = django_job_store.lookup_job(job_id) job_state = job.__getstate__() del job_state['next_run_time'] del job_state['version'] del job_state['executor'] job_state['trigger'] = None scheduler.add_job(replace_existing=True, **job_state) result = {'id': job_id, 'message': '定时任务手动执行成功!'} return Response(result, status=status.HTTP_200_OK)
序列化器配置(public/serializers.py)
import time from datetime import datetime from django_apscheduler.models import DjangoJobExecution, DjangoJob from rest_framework import serializers class DjangoJobExecutionSerializer(serializers.ModelSerializer): """ 定时作业执行历史列化器 """ class Meta: model = DjangoJobExecution fields = "__all__" class DjangoJobSerializer(serializers.ModelSerializer): """ 定时作业列表列化器 """ class Meta: model = DjangoJob exclude = ['job_state']
github:https://github.com/cuiliang0302/drf_template
gitee:https://gitee.com/cuiliang0302/drf_template
API接口文档:https://www.apifox.cn/apidoc/shared-34bb4a27-bf7b-432d-9d51-0a767a259e6e 访问密码 : 4UoQc75S
如果想从头开始写API接口实现apscheduler定时任务的CRUD,也是很容易的,在此提供一个开发思路给大家。
和Django-apscheduler一样,我们首先创建一个模型,用于存放定时任务id,状态,执行时间,关联的函数等信息。
然后定义一个scheduler类,在初始化时选择添加默认的调度器、任务存储器、执行器等参数。
最后通过引入我们创建好的 scheduler 对象之后就可以直接用来做增删改查的操作:
导致这种情况的原因很多,最常见的两种情况是:
示例代码:
from apscheduler.schedulers.background import BackgroundScheduler def myjob(): print('hello') scheduler = BackgroundScheduler() scheduler.start() scheduler.add_job(myjob, 'cron', hour=0)
可见,以上脚本在运行完add_job()之后就直接退出了,因此 scheduler 根本没有机会去运行其调度好的 job 。
方案一:uWSGI 使用了一些技巧来禁用掉 GIL 锁,但多线程的使用对于 APScheduler 的操作来说至关重要。为了修复这个问题,你需要使用--enalbe-threads选项来重新启用 GIL 。
方案二:在一个单独的专用进程中执行单个定时任务。
目前版本是不支持的,但是未来apscheduler4计划会支持这个功能,详情参考文档:https://github.com/agronholm/apscheduler/issues/465
在两个或更多的进程中共享一个持久化的 job store 会导致 scheduler 的行为不正常:如重复执行或作业丢失,等等。这是因为 APScheduler 目前没有任何进程间同步和信号量机制,因此当一个 job 被添加、修改或从 scheduler 中移除时 scheduler 无法得到通知。
变通方案:在专用的进程中来运行 scheduler,然后通过一些远程访问的途径 —— 如 RPyC、gRPC 或一个 HTTP 服务器 —— 来将其连接起来。在源码仓库中包含了一个使用 RPyC 的示例。
如果你想在 Django 中运行,可以考虑django_apscheduler,推荐使用自定义命令,在一个单独的专用进程中执行单个定时任务。
如果你想在 Flask 中使用 APScheduler ,这里也有一个非官方的插件Flask-APScheduler。
date触发器:https://apscheduler.readthedocs.io/en/stable/modules/triggers/date.html#module-apscheduler.triggers.date
interval触发器:https://apscheduler.readthedocs.io/en/stable/modules/triggers/interval.html#module-apscheduler.triggers.interval
crontab触发器:https://apscheduler.readthedocs.io/en/stable/modules/triggers/cron.html#module-apscheduler.triggers.cron
job配置项:https://apscheduler.readthedocs.io/en/stable/modules/job.html#module-apscheduler.job
apscheduler方法示例:https://apscheduler.readthedocs.io/en/stable/py-modindex.html
django-apscheduler地址:https://github.com/jcass77/django-apscheduler
崔亮的博客-专注devops自动化运维,传播优秀it运维技术文章。更多原创运维开发相关文章,欢迎访问www.cuiliangblog.cn
今天的分享到此就结束了,感谢您的阅读,如果确实帮到您,您可以动动手指转发给其他人。
下一篇
已是最新文章