【发布时间】:2017-11-20 10:13:41
【问题描述】:
我正在尝试安排每分钟开始的工作。
我在scheduler.py 脚本中定义了调度程序:
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
executors = {
'default': ThreadPoolExecutor(10),
'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False,
'max_instances': 5
}
scheduler = BackgroundScheduler(executors=executors,job_defaults=job_defaults)
我在模块的__init__.py 中初始化调度器,如下所示:
from scheduler import scheduler
scheduler.start()
我想针对特定操作启动计划作业,如下所示:
def AddJob():
dbid = repository.database.GetDbid()
job_id = 'CollectData_{0}'.format(dbid)
scheduler.scheduled_job(func=TestScheduler(),
trigger='interval',
minutes=1,
id=job_id
)
def TestScheduler():
for i in range(0,29):
starttime = time()
print "test"
sleep(1.0 - ((time() - starttime) % 1.0))
首先:当我在 python 控制台中执行 AddJob() 函数时,它开始按预期运行但不在后台运行,控制台被阻塞,直到 TestScheduler 函数在 30 秒后结束。我期待它在后台运行,因为它是一个后台调度程序。
第二:即使指定 1 分钟的重复间隔,作业也不会再次开始。
我错过了什么?
更新
感谢另一个线程,我发现了这个问题。错误的行是这样的:
scheduler.scheduled_job(func=TestScheduler(),
trigger='interval',
minutes=1,
id=job_id
)
我改成:
scheduler.add_job(func=TestScheduler,
trigger='interval',
minutes=1,
id=job_id
)
TestScheduler() 变为 TestScheduler。使用 TestScheduler() 会导致函数 TestScheduler() 的结果作为 add_job() 的参数传递。
【问题讨论】:
标签: python django apscheduler