【问题标题】:APScheduler job is not starting as scheduledAPScheduler 作业未按计划启动
【发布时间】:2017-11-20 10:13:41
【问题描述】:

我正在尝试安排每分钟开始的工作。 我在scheduler.py 脚本中定义了调度程序:

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor


executors = {
    'default': ThreadPoolExecutor(10),
    'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 5
}

scheduler = BackgroundScheduler(executors=executors,job_defaults=job_defaults)

我在模块的__init__.py 中初始化调度器,如下所示:

from scheduler import scheduler

scheduler.start()

我想针对特定操作启动计划作业,如下所示:

def AddJob():
    dbid = repository.database.GetDbid()
    job_id = 'CollectData_{0}'.format(dbid)
    scheduler.scheduled_job(func=TestScheduler(),
                            trigger='interval',
                            minutes=1,
                            id=job_id
                            )

def TestScheduler():
    for i in range(0,29):
        starttime = time()
        print "test"
        sleep(1.0 - ((time() - starttime) % 1.0))

首先:当我在 python 控制台中执行 AddJob() 函数时,它开始按预期运行但不在后台运行,控制台被阻塞,直到 TestScheduler 函数在 30 秒后结束。我期待它在后台运行,因为它是一个后台调度程序。
第二:即使指定 1 分钟的重复间隔,作业也不会再次开始。

我错过了什么?

更新

感谢另一个线程,我发现了这个问题。错误的行是这样的:

scheduler.scheduled_job(func=TestScheduler(),
                            trigger='interval',
                            minutes=1,
                            id=job_id
                            )

我改成:

scheduler.add_job(func=TestScheduler,
                            trigger='interval',
                            minutes=1,
                            id=job_id
                            )

TestScheduler() 变为 TestScheduler。使用 TestScheduler() 会导致函数 TestScheduler() 的结果作为 add_job() 的参数传递。

【问题讨论】:

    标签: python django apscheduler


    【解决方案1】:

    第一个问题似乎是您在__init__.py 中初始化调度程序,这似乎不是推荐的方式。
    __init__.py 中存在的代码在第一次导入特定文件夹中的模块时执行。例如,想象一下这个结构:

    my_module
    |--__init__.py
    |--test.py
    

    __init__.py:

    from scheduler import scheduler
    
    scheduler.start()
    

    scheduler.start() 命令在from my_module import something 时执行。所以它要么根本不从__init__.py 开始,要么开始多次(取决于你的代码的其余部分!)。

    另一个问题一定是scheduler.scheduled_job()方法的使用。如果您阅读文档on adding jobs,您会发现推荐的方法是使用add_job() 方法,而不是使用方便的装饰器scheduled_job()

    我会建议这样的事情:

    1. 保持 my_scheduler.py 不变。
    2. __init__.py 中删除scheduler.start() 行。
    3. 如下更改你的主文件:

      from my_scheduler import scheduler
      
      if not scheduler.running: # Clause suggested by @CyrilleMODIANO
          scheduler.start()
      
      def AddJob():
          dbid = repository.database.GetDbid()
          job_id = 'CollectData_{0}'.format(dbid)
          scheduler.add_job(
              func=TestScheduler,
              trigger='interval',
              minutes=1,
              id=job_id
          )
      
      ...
      

    【讨论】:

    • 将解决方案添加到我的帖子中,根据您的建议从我的 init.py 中删除了 scheduler.start()。还添加了一个子句以避免在调度程序已经运行时重新启动它:如果不是 scheduler.running: scheduler.start() 谢谢
    • 很高兴知道@CyrilleMODIANO 如果你愿意,你可以接受这个答案:)
    • 在里面添加了你的子句@CyrilleMODIANO
    • 这对生产有用吗?有多个工人和流程?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-05-30
    • 2022-06-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-10-09
    相关资源
    最近更新 更多