【问题标题】:threading.Timer - repeat function every 'n' secondsthreading.Timer - 每“n”秒重复一次函数
【发布时间】:2012-09-15 06:49:12
【问题描述】:

我想每 0.5 秒触发一次功能,并能够启动和停止以及重置计时器。我不太了解 Python 线程的工作原理,并且在使用 Python 计时器时遇到了困难。

但是,当我执行两次threading.timer.start() 时,我不断收到RuntimeError: threads can only be started once。有解决办法吗?我尝试在每次开始之前申请threading.timer.cancel()

伪代码:

t=threading.timer(0.5,function)
while True:
    t.cancel()
    t.start()

【问题讨论】:

    标签: python multithreading timer python-multithreading


    【解决方案1】:

    最好的方法是启动一次定时器线程。在您的计时器线程中,您将编写以下代码

    class MyThread(Thread):
        def __init__(self, event):
            Thread.__init__(self)
            self.stopped = event
    
        def run(self):
            while not self.stopped.wait(0.5):
                print("my thread")
                # call a function
    

    在启动计时器的代码中,您可以然后set停止的事件来停止计时器。

    stopFlag = Event()
    thread = MyThread(stopFlag)
    thread.start()
    # this will stop the timer
    stopFlag.set()
    

    【讨论】:

    • 然后它将完成它的睡眠并停止。在 python 中没有办法强制挂起一个线程。这是 python 开发人员做出的设计决定。然而,最终结果将是相同的。您的线程仍会运行(睡眠)一小会儿,但不会执行您的功能。
    • 好吧,实际上,如果您希望能够立即停止计时器线程,只需使用threading.Eventwait 而不是sleep。然后,要唤醒它,只需设置事件。您甚至不需要self.stopped,因为您只需检查事件标志。
    • 该事件将严格用于中断定时器线程。通常,event.wait 会超时并表现得像睡眠一样,但如果您想停止(或以其他方式中断线程),您可以设置线程的事件,它会立即唤醒。
    • 我已更新我的答案以使用 event.wait()。感谢您的建议。
    • 只是一个问题,之后我该如何重新启动线程?打电话给thread.start() 给我threads can only be started once
    【解决方案2】:

    来自Equivalent of setInterval in python

    import threading
    
    def setInterval(interval):
        def decorator(function):
            def wrapper(*args, **kwargs):
                stopped = threading.Event()
    
                def loop(): # executed in another thread
                    while not stopped.wait(interval): # until stopped
                        function(*args, **kwargs)
    
                t = threading.Thread(target=loop)
                t.daemon = True # stop if the program exits
                t.start()
                return stopped
            return wrapper
        return decorator
    

    用法:

    @setInterval(.5)
    def function():
        "..."
    
    stop = function() # start timer, the first call is in .5 seconds
    stop.set() # stop the loop
    stop = function() # start new timer
    # ...
    stop.set() 
    

    或者这里是the same functionality but as a standalone function instead of a decorator

    cancel_future_calls = call_repeatedly(60, print, "Hello, World")
    # ...
    cancel_future_calls() 
    

    Here's how to do it without using threads.

    【讨论】:

    • 使用装饰器时如何更改间隔?说我想在运行时将 .5s 更改为 1 秒或其他什么?
    • @lightxx:只需使用@setInterval(1)
    • 嗯。所以要么我有点慢,要么你误解了我。我的意思是在运行时。我知道我可以随时更改源代码中的装饰器。例如,我有三个函数,每个函数都用@setInterval(n) 装饰。现在在运行时我想更改函数 2 的间隔,但不理会函数 1 和 3。
    • @lightxx:你可以使用不同的接口,例如stop = repeat(every=second, call=your_function); ...; stop()
    【解决方案3】:

    Hans Then's answer 上稍作改进,我们可以继承 Timer 函数。以下成为我们的整个“重复计时器”代码,它可以用作 threading.Timer 的替代品,具有所有相同的参数:

    from threading import Timer
    
    class RepeatTimer(Timer):
        def run(self):
            while not self.finished.wait(self.interval):
                self.function(*self.args, **self.kwargs)
    

    使用示例:

    def dummyfn(msg="foo"):
        print(msg)
    
    timer = RepeatTimer(1, dummyfn)
    timer.start()
    time.sleep(5)
    timer.cancel()
    

    产生以下输出:

    foo
    foo
    foo
    foo
    

    timer = RepeatTimer(1, dummyfn, args=("bar",))
    timer.start()
    time.sleep(5)
    timer.cancel()
    

    生产

    bar
    bar
    bar
    bar
    

    【讨论】:

    • 这种方法可以让我启动/取消/启动/取消定时器线程吗?
    • 没有。虽然这种方法允许您使用普通 Timer 执行任何操作,但您无法使用普通 Timer 执行此操作。由于启动/取消与底层线程有关,如果您尝试 .start() 一个之前已被 .cancel()'ed 的线程,那么您将得到一个异常,RuntimeError: threads can only be started once
    • 真正优雅的解决方案!奇怪的是他们不只是包含一个这样做的类。
    • 这个解决方案令人印象深刻,但我很难通过阅读Python3 threading Timer interface documentation 来理解它是如何设计的。答案似乎建立在通过进入 threading.py 模块本身了解实现的基础上。
    • 非常好的解决方案! ...如果您只想调用 print...实际上不需要 dummyfn...RepeatTimer(1, print, args=("my message",)) 也能很好地完成这项工作!
    【解决方案4】:

    使用定时器线程-

    from threading import Timer,Thread,Event
    
    
    class perpetualTimer():
    
       def __init__(self,t,hFunction):
          self.t=t
          self.hFunction = hFunction
          self.thread = Timer(self.t,self.handle_function)
    
       def handle_function(self):
          self.hFunction()
          self.thread = Timer(self.t,self.handle_function)
          self.thread.start()
    
       def start(self):
          self.thread.start()
    
       def cancel(self):
          self.thread.cancel()
    
    def printer():
        print 'ipsem lorem'
    
    t = perpetualTimer(5,printer)
    t.start()
    

    这可以通过t.cancel()停止

    【讨论】:

    • 我相信这段代码在cancel 方法中有一个错误。调用它时,线程要么 1) 未运行,要么 2) 运行。在 1) 我们正在等待运行该函数,因此取消将正常工作。在 2) 我们当前正在运行,所以取消对当前执行没有影响。此外,当前执行会自行重新安排,因此不会对未来产生影响。
    • 这段代码在每次定时器运行结束时创建一个新线程。与公认的答案相比,这是一种巨大的浪费。
    • 应该避免这种解决方案,原因如上所述:它每次都会创建一个新线程
    【解决方案5】:

    为了按照 OP 的要求使用 Timer 提供正确答案,我将改进 swapnil jariwala's answer

    from threading import Timer
    
    
    class InfiniteTimer():
        """A Timer class that does not stop, unless you want it to."""
    
        def __init__(self, seconds, target):
            self._should_continue = False
            self.is_running = False
            self.seconds = seconds
            self.target = target
            self.thread = None
    
        def _handle_target(self):
            self.is_running = True
            self.target()
            self.is_running = False
            self._start_timer()
    
        def _start_timer(self):
            if self._should_continue: # Code could have been running when cancel was called.
                self.thread = Timer(self.seconds, self._handle_target)
                self.thread.start()
    
        def start(self):
            if not self._should_continue and not self.is_running:
                self._should_continue = True
                self._start_timer()
            else:
                print("Timer already started or running, please wait if you're restarting.")
    
        def cancel(self):
            if self.thread is not None:
                self._should_continue = False # Just in case thread is running and cancel fails.
                self.thread.cancel()
            else:
                print("Timer never started or failed to initialize.")
    
    
    def tick():
        print('ipsem lorem')
    
    # Example Usage
    t = InfiniteTimer(0.5, tick)
    t.start()
    

    【讨论】:

    • 这是一个非常好的框架,但确实会导致 _start_timer 每次迭代的 _thread.lock 对象数量不断增加
    【解决方案6】:

    我已经更改了 swapnil-jariwala 代码中的一些代码来制作一个小控制台时钟。

    from threading import Timer, Thread, Event
    from datetime import datetime
    
    class PT():
    
        def __init__(self, t, hFunction):
            self.t = t
            self.hFunction = hFunction
            self.thread = Timer(self.t, self.handle_function)
    
        def handle_function(self):
            self.hFunction()
            self.thread = Timer(self.t, self.handle_function)
            self.thread.start()
    
        def start(self):
            self.thread.start()
    
    def printer():
        tempo = datetime.today()
        h,m,s = tempo.hour, tempo.minute, tempo.second
        print(f"{h}:{m}:{s}")
    
    
    t = PT(1, printer)
    t.start()
    

    输出

    >>> 11:39:11
    11:39:12
    11:39:13
    11:39:14
    11:39:15
    11:39:16
    ...
    

    带有 tkinter 图形界面的定时器

    这段代码用 tkinter 将时钟定时器放在一个小窗口中

    from threading import Timer, Thread, Event
    from datetime import datetime
    import tkinter as tk
    
    app = tk.Tk()
    lab = tk.Label(app, text="Timer will start in a sec")
    lab.pack()
    
    
    class perpetualTimer():
    
        def __init__(self, t, hFunction):
            self.t = t
            self.hFunction = hFunction
            self.thread = Timer(self.t, self.handle_function)
    
        def handle_function(self):
            self.hFunction()
            self.thread = Timer(self.t, self.handle_function)
            self.thread.start()
    
        def start(self):
            self.thread.start()
    
        def cancel(self):
            self.thread.cancel()
    
    
    def printer():
        tempo = datetime.today()
        clock = "{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second)
        try:
            lab['text'] = clock
        except RuntimeError:
            exit()
    
    
    t = perpetualTimer(1, printer)
    t.start()
    app.mainloop()
    

    抽认卡游戏示例(有点)

    from threading import Timer, Thread, Event
    from datetime import datetime
    
    
    class perpetualTimer():
    
        def __init__(self, t, hFunction):
            self.t = t
            self.hFunction = hFunction
            self.thread = Timer(self.t, self.handle_function)
    
        def handle_function(self):
            self.hFunction()
            self.thread = Timer(self.t, self.handle_function)
            self.thread.start()
    
        def start(self):
            self.thread.start()
    
        def cancel(self):
            self.thread.cancel()
    
    
    x = datetime.today()
    start = x.second
    
    
    def printer():
        global questions, counter, start
        x = datetime.today()
        tempo = x.second
        if tempo - 3 > start:
            show_ans()
        #print("\n{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second), end="")
        print()
        print("-" + questions[counter])
        counter += 1
        if counter == len(answers):
            counter = 0
    
    
    def show_ans():
        global answers, c2
        print("It is {}".format(answers[c2]))
        c2 += 1
        if c2 == len(answers):
            c2 = 0
    
    
    questions = ["What is the capital of Italy?",
                 "What is the capital of France?",
                 "What is the capital of England?",
                 "What is the capital of Spain?"]
    
    answers = "Rome", "Paris", "London", "Madrid"
    
    counter = 0
    c2 = 0
    print("Get ready to answer")
    t = perpetualTimer(3, printer)
    t.start()
    

    输出:

    Get ready to answer
    >>> 
    -What is the capital of Italy?
    It is Rome
    
    -What is the capital of France?
    It is Paris
    
    -What is the capital of England?
    ...
    

    【讨论】:

    • 如果 hFunction 被阻塞,这不会给后续的开始时间增加一些延迟吗?也许你可以换行,让handle_function先启动计时器,然后调用hFunction?
    【解决方案7】:

    我必须为一个项目这样做。我最终做的是为函数启动一个单独的线程

    t = threading.Thread(target =heartbeat, args=(worker,))
    t.start()
    

    ****heartbeat 是我的函数,worker 是我的论据之一****

    我的心跳函数内部:

    def heartbeat(worker):
    
        while True:
            time.sleep(5)
            #all of my code
    

    因此,当我启动线程时,该函数将反复等待 5 秒,运行我的所有代码,并无限期地执行此操作。如果您想终止进程,只需终止线程即可。

    【讨论】:

      【解决方案8】:

      我已经实现了一个用作计时器的类。

      我把链接留在这里以防万一有人需要它: https://github.com/ivanhalencp/python/tree/master/xTimer

      【讨论】:

      • 虽然这在理论上可以回答这个问题,it would be preferable 在此处包含答案的基本部分,并提供链接以供参考。
      【解决方案9】:
      from threading import Timer
      def TaskManager():
          #do stuff
          t = Timer( 1, TaskManager )
          t.start()
      
      TaskManager()
      

      这是一个小样本,它将有助于更好地了解它的运行方式。 函数 taskManager() 最后创建延迟函数调用它自己。

      尝试更改“dalay”变量,您将能够看到差异

      from threading import Timer, _sleep
      
      # ------------------------------------------
      DATA = []
      dalay = 0.25 # sec
      counter = 0
      allow_run = True
      FIFO = True
      
      def taskManager():
      
          global counter, DATA, delay, allow_run
          counter += 1
      
          if len(DATA) > 0:
              if FIFO:
                  print("["+str(counter)+"] new data: ["+str(DATA.pop(0))+"]")
              else:
                  print("["+str(counter)+"] new data: ["+str(DATA.pop())+"]")
      
          else:
              print("["+str(counter)+"] no data")
      
          if allow_run:
              #delayed method/function call to it self
              t = Timer( dalay, taskManager )
              t.start()
      
          else:
              print(" END task-manager: disabled")
      
      # ------------------------------------------
      def main():
      
          DATA.append("data from main(): 0")
          _sleep(2)
          DATA.append("data from main(): 1")
          _sleep(2)
      
      
      # ------------------------------------------
      print(" START task-manager:")
      taskManager()
      
      _sleep(2)
      DATA.append("first data")
      
      _sleep(2)
      DATA.append("second data")
      
      print(" START main():")
      main()
      print(" END main():")
      
      _sleep(2)
      DATA.append("last data")
      
      allow_run = False
      

      【讨论】:

      • 您能否详细说明一下为什么会这样?
      • 你的例子有点混乱,第一个代码块就是你需要说的。
      【解决方案10】:

      我喜欢 right2clicky 的回答,尤其是它不需要在每次计时器滴答时都拆除线程并创建一个新线程。此外,创建一个带有定期调用的计时器回调的类是一个简单的覆盖。这是我的正常用例:

      class MyClass(RepeatTimer):
          def __init__(self, period):
              super().__init__(period, self.on_timer)
      
          def on_timer(self):
              print("Tick")
      
      
      if __name__ == "__main__":
          mc = MyClass(1)
          mc.start()
          time.sleep(5)
          mc.cancel()
      

      【讨论】:

        【解决方案11】:

        这是使用函数而不是类的替代实现。受到上述@Andrew Wilkins 的启发。

        因为等待比睡眠更准确(它考虑了函数运行时间):

        import threading
        
        PING_ON = threading.Event()
        
        def ping():
          while not PING_ON.wait(1):
            print("my thread %s" % str(threading.current_thread().ident))
        
        t = threading.Thread(target=ping)
        t.start()
        
        sleep(5)
        PING_ON.set()
        

        【讨论】:

          【解决方案12】:

          我想出了另一种 SingleTon 类的解决方案。请告诉我是否有任何内存泄漏。

          import time,threading
          
          class Singleton:
            __instance = None
            sleepTime = 1
            executeThread = False
          
            def __init__(self):
               if Singleton.__instance != None:
                  raise Exception("This class is a singleton!")
               else:
                  Singleton.__instance = self
          
            @staticmethod
            def getInstance():
               if Singleton.__instance == None:
                  Singleton()
               return Singleton.__instance
          
          
            def startThread(self):
               self.executeThread = True
               self.threadNew = threading.Thread(target=self.foo_target)
               self.threadNew.start()
               print('doing other things...')
          
          
            def stopThread(self):
               print("Killing Thread ")
               self.executeThread = False
               self.threadNew.join()
               print(self.threadNew)
          
          
            def foo(self):
               print("Hello in " + str(self.sleepTime) + " seconds")
          
          
            def foo_target(self):
               while self.executeThread:
                  self.foo()
                  print(self.threadNew)
                  time.sleep(self.sleepTime)
          
                  if not self.executeThread:
                     break
          
          
          sClass = Singleton()
          sClass.startThread()
          time.sleep(5)
          sClass.getInstance().stopThread()
          
          sClass.getInstance().sleepTime = 2
          sClass.startThread()
          

          【讨论】:

            【解决方案13】:

            除了使用线程的上述出色答案之外,如果您必须使用主线程或更喜欢异步方法 - 我围绕 aio_timers Timer 类包装了一个简短的类(以启用重复)

            import asyncio
            from aio_timers import Timer
            
            class RepeatingAsyncTimer():
                def __init__(self, interval, cb, *args, **kwargs):
                    self.interval = interval
                    self.cb = cb
                    self.args = args
                    self.kwargs = kwargs
                    self.aio_timer = None
                    self.start_timer()
                
                def start_timer(self):
                    self.aio_timer = Timer(delay=self.interval, 
                                           callback=self.cb_wrapper, 
                                           callback_args=self.args, 
                                           callback_kwargs=self.kwargs
                                          )
                
                def cb_wrapper(self, *args, **kwargs):
                    self.cb(*args, **kwargs)
                    self.start_timer()
            
            
            from time import time
            def cb(timer_name):
                print(timer_name, time())
            
            print(f'clock starts at: {time()}')
            timer_1 = RepeatingAsyncTimer(interval=5, cb=cb, timer_name='timer_1')
            timer_2 = RepeatingAsyncTimer(interval=10, cb=cb, timer_name='timer_2')
            

            时钟开始于:1602438840.9690785

            timer_1 1602438845.980087

            timer_2 1602438850.9806316

            timer_1 1602438850.9808934

            timer_1 1602438855.9863033

            timer_2 1602438860.9868324

            timer_1 1602438860.9876585

            【讨论】:

            • 太棒了。 :) 比所有其他人都好。并且精确。
            【解决方案14】:

            这是连续运行计时器的示例代码。只需在耗尽时创建一个新计时器并调用相同的函数。这不是最好的方法,但也可以这样做。

            import threading
            import time
            
            
            class ContinousTimer():
                def __init__(self):
                    self.timer = None
            
                def run(self, msg='abc'):
                    print(msg)
            
                    self.timer = threading.Timer(interval=2, function=self.run, args=(msg, ))
                    self.timer.start()
            
            
            if __name__ == "__main__":
                t = ContinousTimer()
                try:
                    t.run(msg="Hello")
                    while True:
                        time.sleep(0.1)
                except KeyboardInterrupt:
                    # Cancel Timer
                    t.timer.cancel()
            

            【讨论】:

              猜你喜欢
              • 1970-01-01
              • 2018-01-06
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 2017-10-28
              • 2011-03-24
              • 2022-12-11
              相关资源
              最近更新 更多