【问题标题】:Schedule a repeating event in Python 3在 Python 3 中安排重复事件
【发布时间】:2011-01-24 19:32:59
【问题描述】:

我正在尝试安排一个重复事件在 Python 3 中每分钟运行一次。

我看过sched.scheduler 的课程,但我想知道是否还有其他方法可以做到这一点。我听说过我可以为此使用多个线程,我不介意这样做。

我基本上是在请求一些 JSON,然后对其进行解析;它的值会随着时间而变化。

要使用sched.scheduler,我必须创建一个循环来请求它安排甚至运行一小时:

scheduler = sched.scheduler(time.time, time.sleep)

# Schedule the event. THIS IS UGLY!
for i in range(60):
    scheduler.enter(3600 * i, 1, query_rate_limit, ())

scheduler.run()

还有哪些其他方法可以做到这一点?

【问题讨论】:

标签: python python-3.x scheduled-tasks timing


【解决方案1】:

文档:Advanced Python Scheduler

@sched.cron_schedule(day='last sun')
def some_decorated_task():
    print("I am printed at 00:00:00 on the last Sunday of every month!")

可用字段:

| Field       | Description                                                    |
|-------------|----------------------------------------------------------------|
| year        | 4-digit year number                                            |
| month       | month number (1-12)                                            |
| day         | day of the month (1-31)                                        |
| week        | ISO week number (1-53)                                         |
| day_of_week | number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) |
| hour        | hour (0-23)                                                    |
| minute      | minute (0-59)                                                  |
| second      | second (0-59)                                                  |

【讨论】:

    【解决方案2】:

    不久前我遇到了类似的问题,所以我制作了一个 python 模块event-scheduler 来解决这个问题。它的 API 与 sched 库非常相似,但有一些不同:

    1. 它利用后台线程,并且始终能够在后台接受和运行作业,直到调度程序显式停止(无需 while 循环)。
    2. 它带有一个 API,用于以用户指定的时间间隔安排重复事件,直到明确取消。

    pip install event-scheduler可以安装

    from event_scheduler import EventScheduler
    
    event_scheduler = EventScheduler()
    event_scheduler.start()
    # Schedule the recurring event to print "hello world" every 60 seconds with priority 1
    # You can use the event_id to cancel the recurring event later
    event_id = event_scheduler.enter_recurring(60, 1, print, ("hello world",))
    

    【讨论】:

      【解决方案3】:

      有一个名为ischedule 的新包。对于这种情况,解决方案可能如下:

      from ischedule import schedule, run_loop
      from datetime import timedelta
      
      
      def query_rate_limit():
          print("query_rate_limit")
      
      schedule(query_rate_limit, interval=60)
      run_loop(return_after=timedelta(hours=1))
      

      一切都在主线程上运行,并且在 run_loop 内没有忙于等待。启动时间非常精确,通常在指定时间的几分之一毫秒内。

      【讨论】:

        【解决方案4】:

        您可以使用threading.Timer,但这也安排一次性事件,类似于调度程序对象的.enter 方法。

        将一次性调度器转换为周期性调度器的正常模式(在任何语言中)是让每个事件以指定的时间间隔重新调度自身。例如,对于sched,我不会像你那样使用循环,而是像这样:

        def periodic(scheduler, interval, action, actionargs=()):
            scheduler.enter(interval, 1, periodic,
                            (scheduler, interval, action, actionargs))
            action(*actionargs)
        

        并通过调用启动整个“永久定期计划”

        periodic(scheduler, 3600, query_rate_limit)
        

        或者,我可以使用threading.Timer 代替scheduler.enter,但模式非常相似。

        如果您需要更精细的变化(例如,在给定时间或在某些条件下停止定期重新安排),用一些额外的参数来适应并不难。

        【讨论】:

        • 嗯,在java中我有 timer.scheduleAtFixedRate() 和真正的多线程。每个人都说在 python 中我们编写的代码更少……嗯嗯……只是说……
        • @user1685095 遗憾的是,任何这样的概括性陈述都有总是例外。
        • @Wallacoloo 这是否意味着不是总是例外? :)
        • @user1685095 没那么快!尝试在没有多次导入的情况下对其进行编码,扩展 TimerTask,提供run 方法,并添加一个单独的类来运行计时器以及创建所有这些对象。大概有 15 行代码。 (除非你有更简洁的方法;我在 Java 方面不是最棒的。)
        • 基于 Alex Martelli 的回答,我实现了更容易集成的装饰器版本。 stackoverflow.com/a/48758861/482899
        【解决方案5】:

        这是一个使用Thread 的快速而肮脏的非阻塞循环:

        #!/usr/bin/env python3
        import threading,time
        
        def worker():
            print(time.time())
            time.sleep(5)
            t = threading.Thread(target=worker)
            t.start()
        
        
        threads = []
        t = threading.Thread(target=worker)
        threads.append(t)
        t.start()
        time.sleep(7)
        print("Hello World")
        

        没有什么特别的,worker 会延迟创建一个自己的新线程。可能不是最有效的,但足够简单。如果您需要更复杂的解决方案,northtree's answer 将是您的最佳选择。

        基于this,我们可以做同样的事情,只需要Timer

        #!/usr/bin/env python3
        import threading,time
        
        def hello():
            t = threading.Timer(10.0, hello)
            t.start()
            print( "hello, world",time.time() )
        
        t = threading.Timer(10.0, hello)
        t.start()
        time.sleep(12)
        print("Oh,hai",time.time())
        time.sleep(4)
        print("How's it going?",time.time())
        

        【讨论】:

        • 您知道如何添加检查以检查此线程 t 是否存在吗?我尝试使用 try except 块并将 t.start() 放入 try 但它不会启动线程!我只需要运行 isAlive() 来检查它是否还活着
        • @toing_toing 坦率地说,我不知道。如果您在 Linux 环境中,我会建议由线程或锁定文件更新全局变量,但这是我头脑中的一个想法,而不是基于“良好实践”,只是对 Linux 的熟悉。也可以选择使用 shared memory 来完成此类任务。现在,我对多线程编程不是很了解,所以我建议在网站上提问并参考这个答案,这样人们就可以看到你正在尝试处理哪些代码。
        【解决方案6】:

        查看我的示例

        import sched, time
        
        def myTask(m,n):
          print n+' '+m
        
        def periodic_queue(interval,func,args=(),priority=1):
          s = sched.scheduler(time.time, time.sleep)
          periodic_task(s,interval,func,args,priority)
          s.run()
        
        def periodic_task(scheduler,interval,func,args,priority):
          func(*args)
          scheduler.enter(interval,priority,periodic_task,
                           (scheduler,interval,func,args,priority))
        
        periodic_queue(1,myTask,('world','hello'))
        

        【讨论】:

        • 你能解释一下为什么这比其他 8 个答案更好吗?
        【解决方案7】:

        根据 Alex Martelli 的回答,我实现了 decorator 版本,它更易于集成。

        import sched
        import time
        import datetime
        from functools import wraps
        from threading import Thread
        
        
        def async(func):
            @wraps(func)
            def async_func(*args, **kwargs):
                func_hl = Thread(target=func, args=args, kwargs=kwargs)
                func_hl.start()
                return func_hl
            return async_func
        
        
        def schedule(interval):
            def decorator(func):
                def periodic(scheduler, interval, action, actionargs=()):
                    scheduler.enter(interval, 1, periodic,
                                    (scheduler, interval, action, actionargs))
                    action(*actionargs)
        
                @wraps(func)
                def wrap(*args, **kwargs):
                    scheduler = sched.scheduler(time.time, time.sleep)
                    periodic(scheduler, interval, func)
                    scheduler.run()
                return wrap
            return decorator
        
        
        @async
        @schedule(1)
        def periodic_event():
            print(datetime.datetime.now())
        
        
        if __name__ == '__main__':
            print('start')
            periodic_event()
            print('end')
        

        【讨论】:

        • 这个装饰器解决方案真的很棒,但是我有一个稍微改进的建议:将*args**kwargs添加到预定的函数调用中,如下所示:def decorator(func, *args, **kwargs): def periodic(scheduler, interval, action, actionargs=(), kwargs={}): scheduler.enter(interval, 1, periodic, (scheduler, interval, action, actionargs, kwargs)) action(*actionargs, **kwargs)及以下periodic(scheduler, interval, func, args, kwargs)这个可以使用参数来调度函数。
        • @opt12 是否为每个计划更改了参数?
        • 不,每个计划的参数都是相同的,但是您可以装饰任何要定期计划的功能。不管他们是否接受参数。这些参数对于每次运行都是相同的,但至少它们可以作为初始参数提供。
        【解决方案8】:

        您可以使用schedule。它适用于 Python 2.7 和 3.3,而且相当轻量级:

        import schedule
        import time
        
        def job():
           print("I'm working...")
        
        schedule.every(10).minutes.do(job)
        schedule.every().hour.do(job)
        schedule.every().day.at("10:30").do(job)
        
        while 1:
           schedule.run_pending()
           time.sleep(1)
        

        【讨论】:

        • 为什么是while循环?,它不会像cron作业一样运行吗?
        • @Jaydev 如果代码在主线程中运行,则需要 while 循环
        • webserver 停止使用这个包提供文件
        • 使用 GUI 时会挂起
        【解决方案9】:

        基于 MestreLion 的回答,它解决了多线程的一个小问题:

        from threading import Timer, Lock
        
        
        class Periodic(object):
            """
            A periodic task running in threading.Timers
            """
        
            def __init__(self, interval, function, *args, **kwargs):
                self._lock = Lock()
                self._timer = None
                self.function = function
                self.interval = interval
                self.args = args
                self.kwargs = kwargs
                self._stopped = True
                if kwargs.pop('autostart', True):
                    self.start()
        
            def start(self, from_run=False):
                self._lock.acquire()
                if from_run or self._stopped:
                    self._stopped = False
                    self._timer = Timer(self.interval, self._run)
                    self._timer.start()
                    self._lock.release()
        
            def _run(self):
                self.start(from_run=True)
                self.function(*self.args, **self.kwargs)
        
            def stop(self):
                self._lock.acquire()
                self._stopped = True
                self._timer.cancel()
                self._lock.release()
        

        【讨论】:

        • 哈,我只是将锁放入原件中。这确实是必要的。感谢这是 MestreLion 的正确版本
        【解决方案10】:

        使用Celery

        from celery.task import PeriodicTask
        from datetime import timedelta
        
        
        class ProcessClicksTask(PeriodicTask):
            run_every = timedelta(minutes=30)
        
            def run(self, **kwargs):
                #do something
        

        【讨论】:

          【解决方案11】:

          您可以使用Advanced Python Scheduler。它甚至还有一个类似 cron 的界面。

          【讨论】:

            【解决方案12】:

            我对这个问题的谦虚:

            from threading import Timer
            
            class RepeatedTimer(object):
                def __init__(self, interval, function, *args, **kwargs):
                    self._timer     = None
                    self.function   = function
                    self.interval   = interval
                    self.args       = args
                    self.kwargs     = kwargs
                    self.is_running = False
                    self.start()
            
                def _run(self):
                    self.is_running = False
                    self.start()
                    self.function(*self.args, **self.kwargs)
            
                def start(self):
                    if not self.is_running:
                        self._timer = Timer(self.interval, self._run)
                        self._timer.start()
                        self.is_running = True
            
                def stop(self):
                    self._timer.cancel()
                    self.is_running = False
            

            用法:

            from time import sleep
            
            def hello(name):
                print "Hello %s!" % name
            
            print "starting..."
            rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
            try:
                sleep(5) # your long-running job goes here...
            finally:
                rt.stop() # better in a try/finally block to make sure the program ends!
            

            特点:

            • 仅标准库,无外部依赖项
            • 使用 Alex Martnelli 建议的模式
            • 即使计时器已经启动/停止,start()stop() 也可以安全调用多次
            • 要调用的函数可以有位置参数和命名参数
            • 您可以随时更改interval,下次运行后生效。 argskwargs 甚至 function 也一样!

            【讨论】:

            • 漂亮的类,但如果 start() 在循环中执行,它会有一个小问题。由于在另一个线程中执行了 _run 函数,它可能会通过 is_running 检查。所以最后一个 self._timer 被重新分配并且不能停止。查看我的答案以获得正确的版本。
            • @fdb:我不确定我是否理解你的观点。如果您使用 same 类实例在循环中执行start(),它不会做任何事情。如果您创建一个 new 实例,它将触发不同的计时器(允许您同时拥有多个计时器)。至于多线程,是的,除了每个start()(或__init__()要在同一个线程中调用
            • 这是我对“循环”一词的错误:我的意思是对 start() 函数的快速调用(用 do...loop 实现)。快到比 _run() 函数设置“is_running”标志要快。
            猜你喜欢
            • 2016-07-16
            • 1970-01-01
            • 2015-12-06
            • 1970-01-01
            • 2012-05-28
            • 2012-07-13
            • 1970-01-01
            相关资源
            最近更新 更多