【问题标题】:How to access return value from apscheduler?如何从 apscheduler 访问返回值?
【发布时间】:2019-04-02 20:28:01
【问题描述】:

我无法完全拼凑出如何访问 apscheduler 中计划作业的返回值。作业需要每天在不同的时间运行,我需要今天作业的返回值来安排明天的作业。

此链接 (how to get return value from apscheduler jobs) 似乎是此问题的最佳先前答案。它建议向调度程序添加一个侦听器。我添加了一个监听器,但我不确定如何访问它的返回值。我可以访问附加到调度程序的侦听器,但无法访问它们的输出。下面代码中的监听器 job_runs() 将在计划作业运行时打印。

此外,我知道我需要访问一个保存函数返回值的 JobExecutionEvent (https://apscheduler.readthedocs.io/en/latest/modules/events.html#module-apscheduler.events)。

首先,我要访问的函数是 run_all(),其中执行了一堆操作,但我只为测试用例返回 True。

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR, JobExecutionEvent
from datetime import datetime, timedelta
import logging


def run_all():
    return True


def job_runs(event):  # listener function
    if event.exception:
        print('The job did not run')
    else:
        print('The job completed @ {}'.format(datetime.now()))


def job_return_val(event):  # listener function
    return event.retval

然后,我设置调度程序、添加侦听器并添加作业。触发器设置为在作业添加到调度程序后 1 分钟运行函数。

  scheduler = BackgroundScheduler()
  scheduler.add_listener(job_runs, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
  scheduler.add_listener(job_return_val, EVENT_JOB_EXECUTED)
  cron_args = datetime_to_dict(datetime.now() + timedelta(minutes=1))
  job = scheduler.add_job(run_all, "cron", **cron_args)

接下来,我启动调度程序并打印调度的作业。此外,我设置了日志记录,因此我知道调度程序在哪里。

  test = scheduler.start()
  scheduler.print_jobs()
  logging.basicConfig()
  logging.getLogger('apscheduler').setLevel(logging.DEBUG)

启用日志记录后,调度程序会报告作业已运行并从调度程序中删除,正如我所期望的那样。 job_runs() 将正确的输出打印到控制台。通过断点,我知道调用了 job_return_val()。但是,我不知道它返回的值被发送到哪里。该函数似乎在名为 APScheduler 的不同线程中调用。我对线程了解不多,但这是有道理的。但是,我不明白该线程的输出何时返回到主线程。

最后,我尝试使用可从调度程序和作业的属性访问的代码、job_id、jobstore 和 schedule_run_time 实例化 JobExceptionEvent,但 JobExceptionEvent 似乎不知道该事件是在调度程序中运行的。由于前一段中描述的线程,这似乎也很有意义。

对整理此问题有任何帮助!

【问题讨论】:

    标签: python apscheduler


    【解决方案1】:

    侦听器的返回值在任何地方都没有使用(参见code),因此无论如何返回任何值都没有用。如果您需要根据前一个作业的值(通过事件对象在侦听器中获取)安排另一个作业,则必须在该侦听器中正确执行。

    编辑:为了说明如何做到这一点(并证明它是可能的),请参阅此示例代码:

    from datetime import datetime
    import time
    
    from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED
    from apscheduler.schedulers.background import BackgroundScheduler
    
    
    def tick():
        print('Tick! The time is: %s' % datetime.now())
    
    
    def tack():
        print('Tack! The time is: %s' % datetime.now())
    
    
    def listener(event):
        if not event.exception:
            job = scheduler.get_job(event.job_id)
            if job.name == 'tick':
                scheduler.add_job(tack)
    
    
    if __name__ == '__main__':
        scheduler = BackgroundScheduler()
        scheduler.add_listener(listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
        scheduler.add_job(tick, 'interval', seconds=5)
        scheduler.start()
    
        try:
            while True:
                time.sleep(1)
        except (KeyboardInterrupt, SystemExit):
            scheduler.shutdown()
    

    输出:

    (venv) pasmen@nyx:~/tmp/x$ python test.py 
    Tick! The time is: 2019-04-03 19:51:29.192420
    Tack! The time is: 2019-04-03 19:51:29.195878
    Tick! The time is: 2019-04-03 19:51:34.193145
    Tack! The time is: 2019-04-03 19:51:34.194898
    Tick! The time is: 2019-04-03 19:51:39.193207
    Tack! The time is: 2019-04-03 19:51:39.194868
    Tick! The time is: 2019-04-03 19:51:44.193223
    Tack! The time is: 2019-04-03 19:51:44.195066
    ...
    

    【讨论】:

    • 感谢您的回复!您可能会注意到我从接受的答案中改变了这一点。请参阅下面我对亚历克斯回答的评论,了解原因。实际上,当我试图对此进行排序时,我确实查看了您链接到的代码,但是我只是没有花足够的时间来得出您的结论。无论如何,我感谢您的帮助!
    • 为什么无法从监听器添加作业?有什么具体原因吗?
    • @SpencerWeston 请查看我编辑的答案,我添加了一个示例。
    • 嗯。无论出于何种原因,当我在脚本的侦听器中调用调度程序时,都会出现名称错误。但是,您的测试文件自己运行得很好。所以,我一定是搞砸了。尽管如此,这是正确的答案。感谢您提供测试脚本。
    • 能否提供您的脚本和完整的堆栈跟踪,以便我们找出问题所在?
    【解决方案2】:

    您现在可以使用全局变量。这是一个例子:

    from apscheduler.schedulers.blocking import BlockingScheduler
    from apscheduler.triggers.cron import CronTrigger
    
    def fn():
        '''Increase `times` by one and print it.'''
        global times
        times += 1
        print(times)
    
    
    sched = BlockingScheduler()
    times = 0
    
    # Execute fn() each second.
    sched.add_job(fn, trigger=CronTrigger(second='*/1'))
    sched.start()
    

    【讨论】:

    • 这行得通,不知道有什么缺点。
    • 这是其中一种技巧,谢谢!
    【解决方案3】:

    您需要实现stateful jobs feature

    【讨论】:

    • 感谢您的回答!我最初接受了 Tomas 的回答,但在尝试了该解决方案后,我意识到无法从侦听器中安排作业。
    【解决方案4】:

    几天前我遇到了同样的问题。我的第一个解决方案是使用关键字 global,但不久之后我意识到这是有问题的,因为在作业之外定义的变量可能会被意外更改,尤其是当它们是循环中的局部变量时。

    然后我也考虑过使用监听器。但是传入侦听器的回调仅将事件作为单个参数,这意味着您可以从回调中获取的唯一信息是事件本身,这极大地限制了您可以做的事情。

    最后我选择将一个函数传递给要安排的任务,这对我来说很好。您真正需要做的只是使用计划任务的返回值作为 func 的参数,如下面的代码所示。

    from apscheduler.schedulers.blocking import BlockingScheduler
    from datetime import datetime
    
    scheduler = BlockingScheduler()
    
    def job_with_return(a:int, b:int, callback_return):
        # You can do something heavier here with the inputs.
        result = a + b
        print(f'[in job] result is {result}')
        if callback_return:
            callback_return(result)
    
    scheduler.add_job(func=job_with_return,
        trigger='date',
        args=(1, 2, lambda r: print(f'[out of job]: result is {r}')),
        run_date=datetime.now(),
        )
    
    scheduler.start()
    

    输出:

    [in job] result is 3
    [out of job]: result is 3
    

    关于OP的要求,

    作业需要在每天的不同时间运行,我需要 从今天的作业返回值以安排明天的作业。

    您可以另外向 func 提供调度程序以及您希望任务在明天运行的时间,以便您可以在 func 中安排时间表。

    from apscheduler.schedulers.blocking import BlockingScheduler
    from datetime import datetime, timedelta
    
    scheduler = BlockingScheduler()
    today = datetime.now()
    tomorrow = today + timedelta(seconds=1)
    
    def task_today(a:int, b:int, scheduler:BlockingScheduler, callback_return, tomorrow:datetime):
        # What we have to do today is to get the result and use it to schedule tomorrow's task.
        result_today = a + b
        print(f"[{datetime.now().strftime('%H:%M:%S')}] (Today) The result is {result_today}.")
        scheduler.add_job(callback_return, 'date',
                          args=(result_today,),
                          run_date=tomorrow,
                          id='job_tomorrow')
    
    def task_tomorrow(result_from_today:int):
        result_tomrrow = result_from_today * 2
        print(f"[{datetime.now().strftime('%H:%M:%S')}] (Tommorow) The result is {result_tomrrow}.")
    
    scheduler.add_job(func=task_today,
                    trigger='date',
                    args=(1, 2, scheduler, task_tomorrow, tomorrow),
                    run_date=today,
                    id='job_today')
        
    scheduler.start()
    

    输出:

    [22:22:40] (Today) The result is 3.
    [22:22:41] (Tommorow) The result is 6.
    

    您甚至可以使用递归来完成日常任务。

    from apscheduler.schedulers.blocking import BlockingScheduler
    from datetime import datetime, timedelta
    
    scheduler = BlockingScheduler()
    today = datetime.now()
    
    def task_daily(result_yesterday:int, day_counter:int, scheduler:BlockingScheduler, callback_return):
        # You can do something heavier here with more inputs.
        result_today = result_yesterday + 2
        day_counter += 1
        tomorrow = datetime.now() + timedelta(seconds=1)
        print(f"[{datetime.now().strftime('%H:%M:%S')}] (day {day_counter}) The result for today is {result_today}.")
        scheduler.add_job(task_daily, 'date',
                            args=(result_today, day_counter, scheduler, callback_return),
                            run_date=tomorrow)
    
    scheduler.add_job(func=task_daily,
                    trigger='date',
                    args=(0, 0, scheduler, task_daily),
                    run_date=today)
    
    scheduler.start()
    

    输出:

    [22:43:17] (day 1) The result for today is 2.
    [22:43:18] (day 2) The result for today is 4.
    [22:43:19] (day 3) The result for today is 6.
    [22:43:20] (day 4) The result for today is 8.
    [22:43:21] (day 5) The result for today is 10.
    [22:43:22] (day 6) The result for today is 12.
    [22:43:23] (day 7) The result for today is 14.
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2018-06-25
      • 1970-01-01
      • 1970-01-01
      • 2018-09-16
      • 2020-05-26
      • 2015-09-14
      • 2022-01-04
      相关资源
      最近更新 更多