APScheduler定时任务框架-程序员宅基地

技术标签: 散装精品  python  APScheduler  

一.APScheduler简介

Advanced Python Scheduler (APScheduler) 是一个 Python 库,可让您安排 Python 代码稍后执行,可以只执行一次,也可以定期执行。您可以随意添加新工作或删除旧工作。如果您将任务存储在数据库中,它们也将在调度器重新启动后幸存下来并保持其状态。当调度器重新启动时,它将运行它在离线时应该运行的所有任务。

除此之外,APScheduler 可以用作跨平台、特定于应用程序的平台特定调度器的替代品,例如 cron 守护程序或 Windows 任务调度器。但是请注意,APScheduler 本身不是守护程序或服务,也不附带任何命令行工具。它主要用于在现有应用程序中运行。也就是说,APScheduler 确实为您提供了一些构建块来构建调度器服务或运行专用调度器进程。

APScheduler 具有三个可以使用的内置调度系统:

  • Cron 式调度(具有可选的开始/结束时间)
  • 基于间隔的执行(以均匀间隔运行任务,具有可选的开始/结束时间)
  • 一次性延迟执行(在设定的日期/时间运行一次任务)

您可以混合搭配调度系统和以您喜欢的任何方式存储任务的后端。支持的用于存储任务的后端包括:
Memory,SQLAlchemy (任何由SQLAlchemy支持的RDBMS都可以工作),MongoDB,Redis,RethinkDB,ZooKeeper

APScheduler还集成了一些常见的Python框架,比如:
asyncio,gevent,Tornado,Twisted,Qt (使用PyQt, PySide2或PySide)

有第三方解决方案可以将 APScheduler 与其他框架集成:
Django,Flask

二.APScheduler基本使用

1.安装

pip安装:

pip install apscheduler

源码安装(https://pypi.python.org/pypi/APScheduler/):

python setup.py install

2.基本概念

APScheduler 有四种组件:
触发器(triggers):触发器包含调度逻辑,描述一个任务何时被触发,按日期或按时间间隔或按 cronjob 表达式三种方式触发。每个任务都有它自己的触发器,用于确定下一次运行任务的时间,除了初始配置之外,触发器是完全无状态的。

任务存储器(job stores):任务存储器包含预定的任务,指定了任务被存放的位置,默认情况下任务保存在内存,也可将任务保存在各种数据库中,当任务的数据在保存到持久任务存储时,它会被序列化,并在从它加载回来时会进行反序列化。任务存储(默认情况除外)不会将任务数据保存在内存中,而是充当在后端保存、加载、更新和搜索任务的中间人。任务存储绝不能在调度器之间共享。

执行器(executors):执行器负责处理任务的运行,通常将指定的任务(调用函数)提交到线程池或进程池中运行,当任务完成时,执行器通知调度器,然后调度器发出适当的事件。

调度器(schedulers):任务调度器,属于控制角色,通过它配置任务存储器、执行器和触发器,添加、修改和删除任务。调度器协调触发器、任务存储器、执行器的运行,通常只有一个调度器运行在应用程序中,开发人员通常不需要直接处理任务存储器、执行器或触发器。相反,调度器提供了适当的接口来处理所有这些。任务存储和执行器的配置是通过调度器完成的。

3.选择正确的调度器、任务存储器、执行器和触发器

选择调度器:对调度器的选择主要取决于你的编程环境以及你将使用 APScheduler 的目的

BlockingScheduler:当调度器是进程中唯一运行的东西时使用。
BackgroundScheduler:当不使用以下任何框架时使用,并且希望调度器在应用程序的后台运行。
AsyncIOScheduler:当应用程序使用 asyncio 模块时使用
GeventScheduler: 当应用程序使用 gevent时使用
TornadoScheduler:当正在构建 Tornado 应用程序时使用
TwistedScheduler:当正在构建 Twisted 应用程序时使用
QtScheduler: 当正在构建 Qt 应用程序时使用

选择任务存储器:你就需要确定是否需要任务持久化。如果你总是在应用程序开始时重新创建任务,那么你可以使用默认值 ( MemoryJobStore)。但是,如果你需要在调度器重新启动或应用程序崩溃时保持你的任务,那么就要选择持久化的任务储存器。但是,如果你是自由选择,则推荐使用SQLAlchemyJobStore并搭配PostgreSQL作为后台数据库,因为它可以提供强大的数据完整性保护功能。

选择执行器:这个同样要看你的实际需求,默认的ThreadPoolExecutor线程池执行器方案可以满足大部分需求。如果你的工作负载涉及 CPU 密集型操作,则应考虑ProcessPoolExecutor进程池执行器方案,以此改为使用多个 CPU 内核来充分利用多核算力。你甚至可以同时使用两者,将ProcessPoolExecutor进程池执行器添加为辅助执行器。

选择触发器:配置一个任务,就需要设置一个任务触发器。触发器可以设定运行任务时计算日期/时间的逻辑。APScheduler 带有三种内置触发器类型:

date: 当你想在某个时间点只运行一次任务时使用

interval: 当你想以固定的时间间隔运行任务时使用

cron:当你想在一天中的特定时间定期运行任务时使用

也可以将多个触发器组合成一个触发器,该触发器可以设定同时满足所有触发器条件而触发,或者满足一项即触发。有关更多信息,请参阅的文档:combining triggers

4.触发器详解

(1)date

作用:在给定的日期时间触发一次。如果run_date留空,则使用当前时间。

date类:

from datetime import date
from apscheduler.schedulers.blocking import BlockingScheduler

sched = BlockingScheduler()

def my_job(text):
    print(text)

# 设定执行时间
sched.add_job(my_job, 'date', run_date=date(2021, 8, 30), args=['test'])

sched.start()

其中run_date参数可以是date类型、datetime类型或文本类型。
datetime类

from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler

sched = BlockingScheduler()

def my_job(text):
    print(text)

# 设定执行时间
sched.add_job(my_job, 'date', run_date=datetime(2021, 8, 30, 17, 10, 0), args=['test'])

sched.start()

运行结果:
在这里插入图片描述
文本类

from apscheduler.schedulers.blocking import BlockingScheduler

sched = BlockingScheduler()


def my_job(text):
    print(text)


# 设定执行时间
sched.add_job(my_job, 'date', run_date='2021-08-30 17:27:00', args=['test'])

sched.start()

运行结果:
在这里插入图片描述
未指定时间,则立即执行

sched.add_job(my_job, args=['test'])

(2)interval

作用:当你想以固定的时间间隔运行任务时使用

class apscheduler.triggers.interval.IntervalTrigger(weeks=0, days=0, hours=0, minutes=0, seconds=0, start_date=None, end_date=None, timezone=None, jitter=None)

在指定的时间间隔内触发,如果指定,则从start_date开始,否则datetime.now() + 间隔。

参数:
周( int ) – 等待的周数
days ( int ) – 等待的天数
小时( int ) – 等待的小时数
分钟( int ) – 等待的分钟数
seconds ( int ) – 等待的秒数
start_date ( datetime|str ) – 间隔计算的起点
end_date ( datetime|str ) – 要触发的最晚可能日期/时间
timezone ( datetime.tzinfo|str ) – 用于日期/时间计算的时区
jitter ( int|None ) –jitter最多延迟作业执行几秒钟

from datetime import datetime

from apscheduler.schedulers.blocking import BlockingScheduler


def job_func():
    print("Hello World")


sched = BlockingScheduler()

# 设定周期开始时间start_date和结束时间end_date,及每10s触发
sched.add_job(job_func, 'interval', start_date="2021-08-30 17:48:20", end_date="2021-08-30 17:49:00", seconds=10)

sched.start()

运行结果:
在这里插入图片描述
也能通过scheduled_job()装饰器实现:

from apscheduler.schedulers.blocking import BlockingScheduler

sched = BlockingScheduler()


@sched.scheduled_job('interval', start_date="2021-08-30 17:48:20", end_date="2021-08-30 17:49:00", seconds=10)
def job_func():
    print("Hello World")

sched.start()

该jitter选项使你能够将随机组件添加到执行时间。如果你有多个服务器并且不希望它们在完全相同的时刻运行作业,或者如果你想防止具有类似选项的多个作业始终同时运行,这可能很有用:

# 在120s内随机选择一个额外延迟
sched.add_job(job_func, 'interval', hours=1, jitter=120)

(3)cron

作用:当你想在一天中的特定时间定期运行任务时使用

classapscheduler.triggers.cron.CronTrigger(year=None, month=None, day=None, week=None, day_of_week=None, hour=None, minute=None, second=None, start_date=None, end_date=None, timezone=None, jitter=None)

当当前时间匹配所有指定的时间限制时触发,类似于 UNIX cron 调度器的工作方式。

参数:
year (int|str) – 4 位数字年份
month (int|str) – 月 (1-12)
day (int|str) – 月中的第几天 (1-31)
week (int|str) – ISO 周 (1-53)
day_of_week (int|str) – 工作日的编号或名称(0-6 或 mon,tue,wed,thu,fri,sat,sun)
hour (int|str) – 小时 (0-23)
minute (int|str) – 分钟 (0-59)
second (int|str) – 秒 (0-59)
start_date (datetime|str) – 最早可能触发的日期/时间(包括)
end_date (datetime|str) – 要触发的最晚可能日期/时间(包括)
timezone (datetime.tzinfo|str) – 用于日期/时间计算的时区(默认为调度器时区)
jitter (int|None) – 最多延迟作业执行几秒钟

介绍
这是 APScheduler 中最强大的内置触发器。你可以在每个字段上指定多种不同的表达式,在确定下一次执行时间时,它会在每个字段中找到满足条件的最早时间。这种行为类似于大多数类 UNIX 操作系统中的“Cron”实用程序。

你还可以分别通过start_date和 end_date参数指定 cron 样式计划的开始日期和结束日期。它们可以作为日期/日期时间对象或文本(采用 ISO 8601格式)给出。

与 crontab 表达式不同,你可以省略不需要的字段。当省略时间参数时,在显式指定参数之前的参数会被设定为*,之后的参数会被设定为最小值,week 和day_of_week的最小值为* 。例如,某任务在每年每个月的第一天每小时 20 分钟执行。下面的代码示例应该进一步说明这种行为。

day=1, minute=20
等同于
year='*', month='*', day=1, week='*', day_of_week='*', hour='*', minute=20, second=0

表达式类型
下表列出了从年份到第二个字段中使用的所有可用表达式。可以在单个字段中给出多个表达式,用逗号分隔。

表达式 参数类型 描述
* any 匹配字段所有取值
*/a any 匹配字段每递增 a 后的值, 从字段最小值开始,包括最小值,比如小时(hour)的 */5,则匹配0,5,10,15,20
a-b any 匹配字段 a 到 b 之间的取值,a 必须小于 b,包括 a 与 b,比如1-5,则匹配1,2,3,4,5
a-b/c any 匹配 a 到 b 之间每递增 c 后的值,包括 a,不一定包括 b,比如1-20/5,则匹配1,6,11,16
xth y day 匹配 y 在当月的第 x 次,比如 3rd fri 指当月的第三个周五
last x day 匹配 x 在当月的最后一次,比如 last fri 指当月的最后一个周五
last day 匹配当月的最后一天
x,y,z any 组合表达式,可以组合确定值或上方的表达式

在month和day_of_week领域接受英文简写月和星期名(jan-dec和mon-sun分别)。

from apscheduler.schedulers.blocking import BlockingScheduler

def job_func():
    print("Hello World")

sched = BlockingScheduler()

# 任务会在6月、7月、8月、11月和12月的第三个周五,00:00、01:00、02:00和03:00触发
sched.add_job(job_func, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')

sched.start()

可以使用start_date和end_date来限制计划运行的总时间:

from apscheduler.schedulers.blocking import BlockingScheduler

def job_func():
    print("Hello World")

sched = BlockingScheduler()

# 在2021-09-02 00:00:00前,每周一到每周五 10:35运行
sched.add_job(job_func, 'cron', day_of_week='mon-fri', hour=10, minute=35, end_date='2021-09-02')
sched.start()

可通过 scheduled_job() 装饰器实现:

from apscheduler.schedulers.blocking import BlockingScheduler

sched = BlockingScheduler()
@sched.scheduled_job('cron', day_of_week='mon-fri', hour=10, minute=35, end_date='2021-09-02')
def job_func():
    print("Hello World")

sched.start()

运行结果:
在这里插入图片描述
使用标准 crontab 表达式安排任务:

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger

sched = BlockingScheduler()

def job_func():
    print("Hello World")

# from_crontab(cls, expr, timezone=None)
# 其中expr: minute, hour, day of month, month, day of week
# 在9月到11月间前15天内的每个星期的10:48触发
sched.add_job(job_func, CronTrigger.from_crontab('48 10 1-15 sep-nov *'))
sched.start()

运行结果:
在这里插入图片描述
也可添加jitter参数:

sched.add_job(job_func, 'cron', hour='*', jitter=120)

夏令时问题
cron 触发器使用所谓的“挂钟”时间。因此,如果所选时区遵守 DST(夏令时),你应该注意它可能会在进入或离开 DST 时导致 cron 触发器出现意外行为。从标准时间切换到夏令时时,时钟会向前移动一小时或半小时,具体取决于时区。同样,当切换回标准时间时,时钟会向后移动一小时或半小时。这将导致一些时间段要么根本不存在,要么重复。如果你的日程安排要在这些时间段之一执行作业,则它的执行频率可能比预期的要高或低。这不是一个错误。如果你希望避免这种情况,请使用不遵守 DST 的时区,可以使用UTC时间,或提前预知并规划好执行的问题。

# 在Europe/Helsinki时区, 在三月最后一个周一就不会触发;在十月最后一个周一会触发两次
sched.add_job(job_function, 'cron', hour=3, minute=30)

5.配置调度器

APScheduler 提供了许多不同的方式来配置调度器。你可以选择直接传字典,也可以将选项作为关键字参数传入。你还可以先实例化调度器,然后添加任务并配置调度器。通过这种方式,你可以在任何环境中获得最大的灵活性。

可以在BaseScheduler该类的 API 参考中找到调度器级别配置选项的完整列表 。调度器子类可能还有其他选项,这些选项记录在它们各自的 API 参考中。单个任务存储和执行器的配置选项同样可以在它们的 API 参考页面上找到。

假设你想使用默认任务存储和默认执行器在你的应用程序中运行 BackgroundScheduler:

from apscheduler.schedulers.background import BackgroundScheduler


scheduler = BackgroundScheduler()

#在此或在计划程序初始化之前初始化应用程序的其余部分,因为是非阻塞的后台调度器,所以程序会继续向下执行

这将为你提供一个 BackgroundScheduler,其有一个名称为default的MemoryJobStore(内存任务储存器)和一个名称是default且默认最大线程是10的ThreadPoolExecutor(线程池执行器)。

现在,如果你希望拥有两个使用两个执行器的任务存储器,并且你还希望调整任务的默认值并设置不同的时区。可参考下面的三个例子,它们是完全等价的:

一个名为“mongo”的 MongoDBJobStore

一个名为“default”的 SQLAlchemyJobStore(使用 SQLite)

一个名为“default”的 ThreadPoolExecutor,工作线程数为 20个

一个名为“processpool”的 ProcessPoolExecutor,工作进程数为 5个

UTC 作为调度器的时区

默认情况下为新任务关闭合并模式

新任务的默认最大实例数限制为 3个

方法一

from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor


jobstores = {
    
    'mongo': MongoDBJobStore(),
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
    
    'default': ThreadPoolExecutor(20),
    'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
    
    'coalesce': False,
    'max_instances': 3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

方法二

from apscheduler.schedulers.background import BackgroundScheduler


# The "apscheduler." prefix is hard coded
scheduler = BackgroundScheduler({
    
    'apscheduler.jobstores.mongo': {
    
         'type': 'mongodb'
    },
    'apscheduler.jobstores.default': {
    
        'type': 'sqlalchemy',
        'url': 'sqlite:///jobs.sqlite'
    },
    'apscheduler.executors.default': {
    
        'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
        'max_workers': '20'
    },
    'apscheduler.executors.processpool': {
    
        'type': 'processpool',
        'max_workers': '5'
    },
    'apscheduler.job_defaults.coalesce': 'false',
    'apscheduler.job_defaults.max_instances': '3',
    'apscheduler.timezone': 'UTC',
})

方法三

from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor


jobstores = {
    
    'mongo': {
    'type': 'mongodb'},
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
    
    'default': {
    'type': 'threadpool', 'max_workers': 20},
    'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {
    
    'coalesce': False,
    'max_instances': 3
}
scheduler = BackgroundScheduler()

# 在这里做些别的事情,可能会增加任务等

scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

6.启动调度器

只需调用start()调度器即可启动调度器。对于除BlockingScheduler之外的调度器,此调用将立即返回,你可以继续应用程序的初始化过程,如调度器添加任务。

对于 BlockingScheduler,你只需要start()在完成任何初始化步骤后调用即可,因程序则会阻塞在start()位置,故要运行的代码必须写在start()之前。 。

注意:调度器启动后,您将无法再更改其设置。

7.添加任务

添加任务的方法如下(两种):
(1)通过调用add_job()
(2)通过装饰器scheduled_job()

第一种方法是最常见的方法。第二种方法主要是为了方便声明在程序运行时不会更改的任务。
第一种方法add_job()方法会返回一个 apscheduler.job.Job实例,你可以稍后使用它来修改或删除任务。

在任何时候你都可以在调度安排任务。但是如果添加任务时调度器尚未运行,则任务将被暂定调度,并且仅在调度器启动时才计算其第一次运行时间。

需要注意的是,如果你使用序列化任务的执行器或任务存储器,它将对你的任务增加一些要求:

a.目标可调用对象必须可全局访问
b.可调用对象的任何参数都必须是可序列化的

在内置任务存储中,只有 MemoryJobStore 不会序列化任务 。在内置执行器中,只有 ProcessPoolExecutor 会序列化任务。

注意
a. 如果你在程序初始化期间在持久任务存储器中安排任务,则必须为任务定义一个具体的ID 并使用replace_existing=True ,否则每次程序重新启动时你都会获得任务的新副本,也就表示任务的状态不会保存。
b.要立即运行任务,可以在添加任务时省略trigger参数。

8.删除任务

当你从调度器中删除任务时,它会从其关联的任务存储器中删除,并且不会再被执行。有两种方法可以实现这一点:

(1)通过remove_job()使用任务的 ID 和任务存储器别名调用

(2)在通过add_job()创建的任务实例上调用remove()方法

第二种方式更方便,但前提必须在创建任务实例时,实例被保存在变量中。对于通过scheduled_job()创建的任务,只能选择第一种方式。

如果任务的调度结束(即它的触发器不会产生任何进一步的运行时间),它会被自动删除。

from apscheduler.schedulers.blocking import BlockingScheduler


sched = BlockingScheduler()


def job_func1():
    print("Hello")


def job_func2():
    print("World")


job = sched.add_job(job_func1, 'cron', hour=11, minute=59)
job.remove()
sched.add_job(job_func2, 'cron', hour=11, minute=59)

sched.start()

运行结果:
在这里插入图片描述

同样,通过任务的具体ID:

from apscheduler.schedulers.blocking import BlockingScheduler


sched = BlockingScheduler()


def job_func1():
    print("Hello")


def job_func2():
    print("World")


sched.add_job(job_func1, 'cron', hour=11, minute=56, id='my_job_id')
sched.remove_job('my_job_id')
sched.add_job(job_func2, 'cron', hour=11, minute=56)
sched.start()

运行结果:
在这里插入图片描述

9.暂停和恢复工作

你可以通过Job实例或调度器本身轻松暂停和恢复任务。当任务暂停时,它的下一次运行时间将被清除,并且在任务恢复之前不会为其计算进一步的运行时间。
暂停任务,请使用以下任一方法:

apscheduler.job.Job.pause()
apscheduler.schedulers.base.BaseScheduler.pause_job()

恢复任务:

apscheduler.job.Job.resume()

apscheduler.schedulers.base.BaseScheduler.resume_job()

10.获取预定任务列表

通过get_jobs()就可以获得一个可修改的任务列表。get_jobs()第二个参数可以指定任务储存器名称,那么就会获得对应任务储存器的任务列表。

为方便起见,print_jobs()可以快速打印格式化的任务列表,包含触发器,下次运行时间等信息。

from apscheduler.schedulers.blocking import BlockingScheduler


sched = BlockingScheduler()


def job_func1():
    print("Hello")


def job_func2():
    print("World")


job = sched.add_job(job_func1, 'cron', hour=14, minute=17)

sched.add_job(job_func2, 'cron', hour=14, minute=17)
sched.print_jobs()
sched.start()

在这里插入图片描述

11.修改任务

通过apscheduler.job.Job.modify()或modify_job(),你可以修改任务当中除了id的任何属性。

job.modify(max_instances=6, name='Alternate name')

如果你想重新安排任务——即更改其触发器,你可以使用 apscheduler.job.Job.reschedule()或 reschedule_job()。这些方法为任务构造一个新的触发器,并根据新的触发器重新计算其下一次运行时间。

scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')

12.关闭调度器

要关闭调度器:

scheduler.shutdown()

默认情况下,调度器关闭其任务存储器和执行器,并等待所有当前正在执行的任务完成。如果你不想等待,你可以这样做:

scheduler.shutdown(wait=False)

这仍将关闭任务存储器和执行器,但不会等待任何正在运行的任务完成。

13.暂停/恢复任务处理

可以暂停正在执行任务的处理:

scheduler.pause()

恢复任务:

scheduler.resume()

也可以在调度器启动时,默认所有任务设为暂停状态:

scheduler.start(paused=True)

13.限制同时执行的任务实例数

默认情况下,每个任务只允许同时运行一个实例。这意味着,如果任务即将运行,但前一次运行尚未完成,则将最新运行视为丢失。为了避免这种情况的发生,通过在添加任务时使用max_instances关键字参数,可以设置调度器允许并发运行的特定任务的最大实例数。

14.丢失任务的执行与合并

有时,任务会由于一些问题没有被执行。最常见的情况就是,在数据库里的任务到了该执行的时间,但调度器被关闭了,那么这个任务就成了“哑弹任务”。错过执行时间后,调度器才打开了。这时,调度器会检查每个任务的misfire_grace_time参数int值,即哑弹上限,来确定是否还执行哑弹任务(这个参数可以全局设定的或者是为每个任务单独设定)。此时,一个哑弹任务,就可能会被连续执行多次。

但这就可能导致一个问题,有些哑弹任务实际上并不需要被执行多次。coalescing合并参数就能把一个多次的哑弹任务揉成一个一次的哑弹任务。也就是说,coalescing为True能把多个排队执行的同一个哑弹任务,变成一个,而不会触发哑弹事件。

注意:如果任务的执行由于池中没有可用的线程或进程而延迟,则执行器可能会由于运行太晚(与其最初指定的运行时间相比)而跳过它。如果这可能发生在你的应用程序中,你可能需要增加执行器中的线程/进程数,或者将misfire_grace_time 设置调整为更高的值。

15.调度器事件

可以将事件侦听器附加到调度器。调度器事件在某些情况下被触发,并且可能在其中包含有关该特定事件详细信息的附加信息,比如当前运行次数等。通过add_listener()为其提供适当的mask参数,或将不同的常量组合在一起,可以只侦听特定类型的事件 。回调对象会有一个参数就是触发的事件。

有关events可用事件及其属性的详细信息,请参阅模块的文档。

def my_listener(event):
    if event.exception:
        print('The job crashed :(')
    else:
        print('The job worked :)')
# 当任务执行完或任务出错时,调用my_listener
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

事件类型:

常量 描述 事件类
EVENT_SCHEDULER_STARTED 调度器已启动 SchedulerEvent
EVENT_SCHEDULER_SHUTDOWN 调度器被关 SchedulerEvent
EVENT_SCHEDULER_PAUSED 调度器中的任务处理已暂停 SchedulerEvent
EVENT_SCHEDULER_RESUMED 调度器中的任务处理已恢复 SchedulerEvent
EVENT_EXECUTOR_ADDED 一个执行器被添加到调度器 SchedulerEvent
EVENT_EXECUTOR_REMOVED 一个执行器被移除到调度器中 SchedulerEvent
EVENT_JOBSTORE_ADDED 任务存储已添加到调度器 SchedulerEvent
EVENT_JOBSTORE_REMOVED 任务存储已从调度器中删除 SchedulerEvent
EVENT_ALL_JOBS_REMOVED 所有任务都已从所有任务存储或一个特定任务存储中删除 SchedulerEvent
EVENT_JOB_ADDED 任务已添加到任务存储 JobEvent
EVENT_JOB_REMOVED 任务已从任务存储中删除 JobEvent
EVENT_JOB_MODIFIED 从调度器外部修改了任务 JobEvent
EVENT_JOB_SUBMITTED 任务已提交给其执行器以运行 JobSubmissionEvent
EVENT_JOB_MAX_INSTANCES 提交给其执行器的任务未被执行器接受,因为该任务已达到其最大并发执行实例数 JobSubmissionEvent
EVENT_JOB_EXECUTED 一个任务执行成功 JobExecutionEvent
EVENT_JOB_ERROR 任务在执行期间引发异常 JobExecutionEvent
EVENT_JOB_MISSED 错过了任务的执行 JobExecutionEvent
EVENT_ALL 包含所有事件类型的全能掩码 N/A

16.故障排除

如果调度器没有按预期工作,将记录apscheduler器的日志记录级别提高到该DEBUG级别会有所帮助 。

如果您还没有首先启用日志记录,您可以这样做:

import logging

logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)

这应该提供许多关于调度器内部发生的事情的有用信息。

还要确保您检查了常见问题部分,看看您的问题是否已经有了解决方案。
报告错误
Github 提供了一个错误跟踪器

文章参考链接
  APScheduler官网-用户指南
  黑色小米粥-APScheduler官方文档翻译

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/yuanfate/article/details/119945735

智能推荐

C语言类型转换_lint dint-程序员宅基地

文章浏览阅读138次。(1):隐式转换定义一个整型常量(int),赋值时却给了一个小数,系统只会将整数部分提取,小数部分舍去(注:是舍去不是四舍五入)。(2)ASCII转换在ASCII表里一个符号或是一个字母就代表一个相应的数字#include<stdio.h>int main{char letter='A';int number=letter;return 0;}A在ASCII表里对应的数字是65,所以当把字符型常量letter的值'A'赋给整型变量number时,所储_lint dint

安装Keras,tensorflow,并将虚拟环境添加到jupyter notebook_scipy库怎么导入jupyter-程序员宅基地

文章浏览阅读4.5k次,点赞8次,收藏69次。安装Keras,tensorflow,并将虚拟环境添加到jupyter notebook_scipy库怎么导入jupyter

ARM开发软件ADS教程_arm.developer.suite使用教程-程序员宅基地

文章浏览阅读4.7k次。ARM开发软件ADS教程 ADS(ARM Developer Suite)是ARM公司推出ARM集成开发环境,操作简单方便,获得广大开发人员的青睐。下面使用ADS v1.2做一个实例教程,帮助大家学会使用ADS编写程序和仿真调试。(使用汇编语言) 首先:下载ADS v1.2版本(英文版)并安装好ADS。安装好ADS之后可以看到开始菜单---所有程序---ARM Developer Suit_arm.developer.suite使用教程

Python的datetime_python 手写datetime-程序员宅基地

文章浏览阅读4.6k次。Python提供了多个内置模块用于操作日期时间,像calendar,time,datetime。time模块我在之前的文章已经有所介绍,它提供的接口与C标准库time.h基本一致。相比于time模块,datetime模块的接口则更直观、更容易调用。今天就来讲讲datetime模块。 datetime模块定义了两个常量:datetime.MINYEAR和datetime.MAXYEAR,分_python 手写datetime

利用Excel数据爬虫_excel爬虫-程序员宅基地

文章浏览阅读2.4k次,点赞2次,收藏8次。URL部分和URL预览填写为目标(需要爬取数据的网址)的URL地址。第三部找到User-Agent的value复制。就可以在Excel表格里看到想要的数据了。1、在Excel里数据里打开自网站。命令超时选择1分钟即可。HTTP请求标头参数。_excel爬虫

Azure OpenAI:使用Completion/ChatCompletion实现更多可能合集(持续更新)_chatcompletion azure-程序员宅基地

文章浏览阅读198次。不过类似的提示词在GPT4模型下直接使用并不能产生类似的结果,GPT4使用的是ChatCompletion,可以在System Message里或者JSON格式的User Message里带更多的上下文信息过去,在这个案例里GPT4过于智能的将两个表进行了交叉查询,看到网上有其他开发人员使用了JSON解析的方法来拼出SQL文,有机会可以尝试。###创建SQL查询问题:Dear, 210430预计什么时间有货可发?###创建SQL查询问题:Dear, 210430预计什么时间有货可发?_chatcompletion azure

随便推点

数据结构 —— 八大排序(超详细图解 & 函数实现)_数据结构排序-程序员宅基地

文章浏览阅读1k次,点赞16次,收藏15次。排序算法主要分为内部排序和外部排序,内部排序是数据记录在内存中进行排序,而外部排序是因排序的数据很大,一次不能容纳全部的排序记录,在排序过程中需要访问外存。常见的内部排序算法有:插入排序、希尔排序、选择排序、冒泡排序、归并排序、快速排序、堆排序、计数排序等。本文将针对上述八大排序算法进行图解剖析。_数据结构排序

《Qt for Symbian》翻译系列之七:第二章 开始(1)_qt manual proxy configuration-程序员宅基地

文章浏览阅读3.8k次。第二章开 始本章主要介绍应用于Symbian平台的QT开发工具。对于Symbian平台的新手,本章首先通过逐步介绍所需的开发工具及其安装指南进行切入。然后介绍如何在仿真器和移动电话上利用Qt for Symbian创建并运行“Hello World”应用。如果已经有了Symbian开发环境,而且熟悉工具及应用的构建过程,作为对某些SDK版本的补充,在进入2.1.7章节关注Qt for Symbian SDK的安装和介绍之前,建议快速浏览本章的第一部分。注意,本章中某些较长的下载链接利用URL缩_qt manual proxy configuration

前端 - Map对象详解_前端map-程序员宅基地

文章浏览阅读6.8k次。Map对象属性、Map对象和普通对象的区别、Map对象和WeakMap对象的区别_前端map

推荐系统中的常用算法——序列深度匹配SDM_sdm算法-程序员宅基地

文章浏览阅读2.8k次,点赞3次,收藏11次。1. 概述2. 算法原理参考文献Lv F , Jin T , Yu C , et al. SDM: Sequential Deep Matching Model for Online Large-scale Recommender System[J]. 2019.[深度模型] 阿里大规模深度召回序列模型SDMSDM:用户长短期兴趣召回模型..._sdm算法

计算机毕业设计 SSM与Vue的勤工助学管理系统(源码+论文)_ssm+vue勤工助学-程序员宅基地

文章浏览阅读199次。Hi,各位同学好呀,这里是M学姐!今天向大家分享一个今年(2022)最新完成的毕业设计项目作品,【基于SSM的勤工助学管理系统】学姐根据实现的难度和等级对项目进行评分(最低0分,满分5分)难度系数:3分工作量:5分创新点:3分界面美化:5分界面美化的补充说明:使用vue的基本都能达到5分本项目完成于2022年6月份,包含内容 : 源码 + 论文 + 答辩PPT在开放的互联网平台面前,勤工助学管理系统的信息管理面临着巨大的挑战。传统的管理模式局限于简单数据的管理,无法适应不断变化的市场格局。_ssm+vue勤工助学

分布式锁原理与实现--Redis分布式锁和ZooKeeper分布式锁-程序员宅基地

文章浏览阅读334次,点赞5次,收藏5次。分布式锁的设计和实现是一个复杂但至关重要的课题,不同的实现方式各具优势,根据系统的实际需求和现有技术栈选择合适的分布式锁方案至关重要。在分布式系统中,由于数据分散存储在不同的节点上,当多个节点同时对同一资源进行操作时,如果没有有效的协调机制,就可能出现并发控制问题,导致数据的不一致和冲突。使用setnx命令尝试设置一个唯一的key-value对,key通常与待保护的资源相关联,value可以是线程ID或随机生成的UUID,如果设置成功,则认为获取到了锁。

推荐文章

热门文章

相关标签