前言
APScheduler(Advanced Python Scheduler) 是一个python 定时任务管理库,你可以用它进行job的增加、删除,你也可以将你的jobs进行序列化存储到相应数据库。
APScheudler是一个跨平台python库,你可以用它来代替windows平台上的计划任务,也可以用它来替代Linux平台的守时任务
APScheduler基于Quartz的一个Python定时任务框架,实现了Quartz的所有功能,使用起来十分方便。提供了基于日期、固定时间间隔以及crontab类型的任务,并且可以持久化任务。基于这些功能,我们可以很方便的实现一个python定时任务系统。
安装
安装 APShedulter
pip install apshedulter
基本概念
APScheduler四大组件:
触发器
triggers
:用于设定触发任务的条件
作业储存器
job stores
:用于存放任务,把任务存放在内存或数据库中
执行器
executors
: 用于执行任务,可以设定执行模式为单线程或线程池
调度器
schedulers
: 把上方三个组件作为参数,通过创建调度器实例来运行
触发器 (triggers
) 包含调度逻辑,每一个作业有它自己的触发器,用于决定接下来哪一个作业会运行。除了他们自己初始配置意外,触发器完全是无状态的。
作业存储(job store
)存储被调度的作业,默认的作业存储是简单地把作业保存在内存中,其他的作业存储是将作业保存在数据库中。一个作业的数据讲在保存在持久化作业存储时被序列化,并在加载时被反序列化。调度器不能分享同一个作业存储。
执行器(executor
)处理作业的运行,他们通常通过在作业中提交制定的可调用对象到一个线程或者进城池来进行。当作业完成时,执行器将会通知调度器。
调度器(scheduler
)是其他的组成部分。你通常在应用只有一个调度器,应用的开发者通常不会直接处理作业存储、调度器和触发器,相反,调度器提供了处理这些的合适的接口。配置作业存储和执行器可以在调度器中完成,例如添加、修改和移除作业。
简单应用
下面是一个每个5秒钟打印一次本地时间的例子
1 | import time |
Job操作
- 添加作业:
add_job()
或者@sched.scheduled_job
1
2
3
4
5
6
7
8
9
10import time
from apscheduler.schedulers.blocking import BlockingScheduler
sched = BlockingScheduler()
def my_job():
print time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
sched.start()
APScheduler提供了两种方式添加Job,一种是通过调用add_job()
方法添加任务,一种是通过 @sched.scheduled_job()
装饰器添加任务,第一种方式是最经常用的也是推荐的方式,他会返回一个schedulers.job.Job
实例,你可以很方便的修改或者删除它,通过第二种方式添加的job你在程序运行中无法对它进行任何操作
在任何时候你都能配置任务。但是如果调度器还没有启动,此时添加任务,那么任务就处于一个暂存的状态。只有当调度器启动时,才会开始计算下次运行时间。
还有一点要注意,如果你的执行器或任务储存器是会序列化任务的,那么这些任务就必须符合:
- 回调函数必须全局可用
- 回调函数参数必须也是可以被序列化的
内置任务储存器中,只有MemoryJobStore不会序列化任务;内置执行器中,只有ProcessPoolExecutor会序列化任务。
重要提醒!
如果在程序初始化时,是从数据库读取任务的,那么必须为每个任务定义一个明确的ID,并且使用
replace_existing=True
,否则每次重启程序,你都会得到一份新的任务拷贝,也就意味着任务的状态不会保存。
- 移除作业:
remove_job()
或者remove()
1 | job = scheduler.add_job(myfunc, 'interval', minutes=2) |
- 停止和恢复
暂停作业:
1 | apsched.job.Job.pause() |
恢复作业:
1 | apsched.job.Job.resume() |
同时,也可以在调度器启动时,默认所有任务设为暂停状态。
1 | scheduler.start(paused=True) |
通过任务实例或调度器,就能暂停和恢复任务。如果一个任务被暂停了,那么该任务的下一次运行时间就会被移除。在恢复任务前,运行次数计数也不会被统计。
- 获取任务列表
通过get_jobs()
就可以获得一个可修改的任务列表。get_jobs()
第二个参数可以指定任务储存器名称,那么就会获得对应任务储存器的任务列表。
print_jobs()
可以快速打印格式化的任务列表,包含触发器,下次运行时间等信息.
- 修改任务
通过apscheduler.job.Job.modify()
或modify_job()
,你可以修改任务当中除了id的任何属性。
例如:
1 | job.modify(max_instances=6, name='Alternate name') |
如果想要重新调度任务(就是改变触发器),你能通过apscheduler.job.Job.reschedule()
或reschedule_job()
来实现。这些方法会重新创建触发器,并重新计算下次运行时间。
比如:
1 | scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5') |
- 关闭调度器
关闭方法如下:
1 | scheduler.shutdown() |
默认情况下,调度器会先把正在执行的任务处理完,再关闭任务储存器和执行器。但是,如果你就直接关闭,你可以添加参数:
1 | scheduler.shutdown(wait=False) |
- 限制任务执行的实例并行数
默认情况下,在同一时间,一个任务只允许一个执行中的实例在运行。比如说,一个任务是每5秒执行一次,但是这个任务在第一次执行的时候花了6秒,也就是说前一次任务还没执行完,后一次任务又触发了,由于默认一次只允许一个实例执行,所以第二次就丢失了。为了杜绝这种情况,可以在添加任务时,设置max_instances
参数,为指定任务设置最大实例并行数。
- 丢失任务的执行与合并
有时,任务会由于一些问题没有被执行。最常见的情况就是,在数据库里的任务到了该执行的时间,但调度器被关闭了,那么这个任务就成了“哑弹任务”。错过执行时间后,调度器才打开了。这时,调度器会检查每个任务的misfire_grace_time
参数int值,即哑弹上限,来确定是否还执行哑弹任务(这个参数可以全局设定的或者是为每个任务单独设定)。此时,一个哑弹任务,就可能会被连续执行多次。
但这就可能导致一个问题,有些哑弹任务实际上并不需要被执行多次。coalescing
合并参数就能把一个多次的哑弹任务揉成一个一次的哑弹任务。也就是说,coalescing
为True能把多个排队执行的同一个哑弹任务,变成一个,而不会触发哑弹事件。
注!如果是由于线程池/进程池满了导致的任务延迟,执行器就会跳过执行。要避免这个问题,可以添加进程或线程数来实现或把 misfire_grace_time值调高。
作业控制器(trigger)
add_job()
的第二个参数是trigger
,它管理着作业的调度方式。它可以为date
, interval
或者cron
。对于不同的trigger
,对应的参数也相同。
- cron定时调度(某一定时时刻执行)
1
2
3
4
5
6
7
8
9
10
11
12
13
14# 注意参数顺序
class apscheduler.triggers.cron.CronTrigger(
year=None,
month=None,
day=None,
week=None,
day_of_week=None,
hour=None,
minute=None,
second=None,
start_date=None,
end_date=None,
timezone=None,
jitter=None)
当省略时间参数时,在显式指定参数之前的参数会被设定为,之后的参数会被设定为最小值,week 和day_of_week的最小值为。比如,设定day=1, minute=20等同于设定year=’‘, month=’‘, day=1, week=’‘, day_of_week=’‘, hour=’*’, minute=20, second=0,即每个月的第一天,且当分钟到达20时就触发。
表达式 | 参数类型 | 描述 | |
---|---|---|---|
* | 所有 | 通配符。例:minutes=*即每分钟触发 | |
*/a | 所有 | 可被a整除的通配符。 | |
a-b | 所有 | 范围a-b触发 | |
a-b/c | 所有 | 范围a-b,且可被c整除时触发 | |
xth y | 日 | 第几个星期几触发。x为第几个,y为星期几 | |
last x | 日 | 一个月中,最后个星期几触发 | |
last | 日 | 一个月最后一天触发 | |
x,y,z | 所有 | 组合表达式,可以组合确定值或上方的表达式 |
注!month和day_of_week参数分别接受的是英语缩写jan– dec 和 mon – sun
1
2
3
4
5
6
7
8
9
10
11 from apscheduler.schedulers.blocking import BlockingScheduler
def job_function():
print "Hello World"
sched = BlockingScheduler()
# 任务会在6月、7月、8月、11月和12月的第三个周五,00:00、01:00、02:00和03:00触发
sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
sched.start()
start_date 和 end_date 可以用来适用时间范围
1 | # 在2014-05-30 00:00:00前,每周一到每周五 5:30运行 |
通过@sheduled_job()
装饰器实现
1 |
|
使用标准版的corntab表达式
1 | sched.add_job(job_function, CronTrigger.from_crontab('0 0 1-15 may-aug *')) |
也可以添加jitter振动参数
1 | # 每小时上下浮动120秒触发 |
夏令时问题
有些timezone时区可能会有夏令时的问题。这个可能导致令时切换时,任务不执行或任务执行两次。避免这个问题,可以使用UTC时间,或提前预知并规划好执行的问题。
1 | # 在Europe/Helsinki时区, 在三月最后一个周一就不会触发;在十月最后一个周一会触发两次 |
- interval 周期触发任务
具体参数如下:
1 | weeks (int) – number of weeks to wait |
你可以指定间隔时间
1 | from datetime import datetime |
你可以框定周期开始时间start_date和结束时间end_date
1 | # 周期触发的时间范围在2010-10-10 9:30 至 2014-06-15 11:00 |
也可以通过 @scheduled_job()
装饰器实现
1 | from apscheduler.scheduler import BlockingScheduler |
jitter
振动参数,给每次触发添加一个随机浮动秒数,一般适用于多服务器,避免同时运行造成服务拥堵。
1 | # 每小时(上下浮动120秒区间内)运行`job_function` |
- date 定时调度(作业只会执行一次)
1
2
3
4# The job will be executed on November 6th, 2009
sched.add_job(my_job, 'date', run_date=date(2009, 11, 6), args=['text'])
# The job will be executed on November 6th, 2009 at 16:30:05
sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5), args=['text'])
作业调度器配置
APScheduler 有多种不同的配置方法,你可以选择直接传字典或传参的方式创建调度器;也可以先实例一个调度器对象,再添加配置信息。灵活的配置方式可以满足各种应用场景的需要。
整套的配置选项可以参考API文档BaseScheduler
类。一些调度器子类可能有它们自己特有的配置选项,以及独立的任务储存器和执行器也可能有自己特有的配置选项,可以查阅API文档了解。
下面举一个例子,创建一个使用默认任务储存器和执行器的BackgroundScheduler
:
1 | from apscheduler.schedulers.background import BackgroundScheduler |
这样就可以创建了一个后台调度器。这个调度器有一个名称为default的MemoryJobStore
(内存任务储存器)和一个名称是default且最大线程是10的ThreadPoolExecutor
(线程池执行器)。
假如你现在有这样的需求,两个任务储存器分别搭配两个执行器;同时,还要修改任务的默认参数;最后还要改时区。可以参考下面例子,它们是完全等价的。
名称为
mongo
的MongoDBJobStore
名称为
default
的SQLAlchemyJobStore
名称为
ThreadPoolExecutor
的ThreadPoolExecutor
,最大线程20个- 名称
processpool
的ProcessPoolExecutor
,最大进程5个 - UTC时间作为调度器的时区
- 默认为新任务关闭合并模式()
- 设置新任务的默认最大实例数为3
方法一
1 |
|
方法二
1 | from apscheduler.schedulers.background import BackgroundScheduler |
方法三
1 | from pytz import utc |
启动调度器
启动调度器是只需调用start()
即可。除了BlockingScheduler
,非阻塞调度器都会立即返回,可以继续运行之后的代码,比如添加任务等。
对于BlockingScheduler
,程序则会阻塞在start()
位置,所以,要运行的代码必须写在start()
之前。
注!调度器启动后,就不能修改配置了。