【问题标题】:How can I periodically execute a function with asyncio?如何使用 asyncio 定期执行函数?
【发布时间】:2016-09-27 11:42:16
【问题描述】:

我正在从 tornado 迁移到 asyncio,但我找不到与 tornadoPeriodicCallback 等效的 asyncio。 (PeriodicCallback 有两个参数:要运行的函数和调用之间的毫秒数。)

  • asyncio 中是否有这样的等价物?
  • 如果没有,什么是最干净的方法来实现它,而不会冒着在一段时间后获得RecursionError 的风险?

【问题讨论】:

标签: python python-3.x tornado python-3.5 python-asyncio


【解决方案1】:

对于低于 3.5 的 Python 版本:

import asyncio

@asyncio.coroutine
def periodic():
    while True:
        print('periodic')
        yield from asyncio.sleep(1)

def stop():
    task.cancel()

loop = asyncio.get_event_loop()
loop.call_later(5, stop)
task = loop.create_task(periodic())

try:
    loop.run_until_complete(task)
except asyncio.CancelledError:
    pass

对于 Python 3.5 及更高版本:

import asyncio

async def periodic():
    while True:
        print('periodic')
        await asyncio.sleep(1)

def stop():
    task.cancel()

loop = asyncio.get_event_loop()
loop.call_later(5, stop)
task = loop.create_task(periodic())

try:
    loop.run_until_complete(task)
except asyncio.CancelledError:
    pass

【讨论】:

  • 即使在 Tornado 中,对于使用协程的应用程序,我也建议使用这样的循环而不是 PeriodicCallback
  • 请注意:不要直接创建Task 实例;使用ensure_future() 函数或AbstractEventLoop.create_task() 方法。来自asyncio documentation
  • 可以使用 lambda 代替 stop 函数。即:loop.call_later(5, lambda: task.cancel())
  • 或者你可以直接称呼它为loop.call_later(5, task.cancel)
  • Python 3.7 的注释:从asyncio doc,我们应该使用高级asyncio.create_task() 创建Tasks。
【解决方案2】:

当你觉得应该在你的 asyncio 程序的“后台”发生一些事情时,asyncio.Task 可能是一个好方法。您可以阅读this post 了解如何处理任务。

以下是定期执行某些功能的类的可能实现:

import asyncio
from contextlib import suppress


class Periodic:
    def __init__(self, func, time):
        self.func = func
        self.time = time
        self.is_started = False
        self._task = None

    async def start(self):
        if not self.is_started:
            self.is_started = True
            # Start task to call func periodically:
            self._task = asyncio.ensure_future(self._run())

    async def stop(self):
        if self.is_started:
            self.is_started = False
            # Stop task and await it stopped:
            self._task.cancel()
            with suppress(asyncio.CancelledError):
                await self._task

    async def _run(self):
        while True:
            await asyncio.sleep(self.time)
            self.func()

让我们测试一下:

async def main():
    p = Periodic(lambda: print('test'), 1)
    try:
        print('Start')
        await p.start()
        await asyncio.sleep(3.1)

        print('Stop')
        await p.stop()
        await asyncio.sleep(3.1)

        print('Start')
        await p.start()
        await asyncio.sleep(3.1)
    finally:
        await p.stop()  # we should stop task finally


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

输出:

Start
test
test
test

Stop

Start
test
test
test

[Finished in 9.5s]

正如您在start 上看到的,我们只是启动调用一些函数并在无限循环中休眠一段时间的任务。在stop 上,我们只是取消该任务。请注意,该任务应在程序完成时停止。

更重要的一点是,您的回调不应该花费太多时间来执行(否则它会冻结您的事件循环)。如果您打算调用一些长期运行的func,您可能需要to run it in executor

【讨论】:

  • 目前为止最完整最清晰的答案!谢谢。要求 func 成为协程是个好主意,这样我们就可以: await self.func()_run 方法中?
  • @SergeyBelash,当然,会好的。请注意,由于我们在随机时间取消任务,因此您的 func 也可能在随机时间取消。这意味着函数中的每条等待行都可能引发 CancelledError。但它对于每个异步函数都是实际的(就像 KeyboardInterrupt 可以在常规的非异步代码中随机引发)。
  • 我担心这个(和其他答案)重复率不会完全是时间值。如果 func 花费了相当长的时间来执行它甚至不会关闭,并且在很长一段时间内它会漂移,即使 func 花费的时间可以忽略不计。
  • 严格来说start()不一定是async
  • 这个可以升级支持普通函数和异步函数:``` async def _run(self): while True: await asyncio.sleep(self.time) # 支持普通函数和异步函数 res = self.func() if inspect.isawaitable(res): await res ```
【解决方案3】:

没有对定期调用的内置支持,没有。

只需创建您自己的调度程序循环,它会休眠并执行任何已调度的任务:

import math, time

async def scheduler():
    while True:
        # sleep until the next whole second
        now = time.time()
        await asyncio.sleep(math.ceil(now) - now)
       
        # execute any scheduled tasks
        async for task in scheduled_tasks(time.time()):
            await task()

scheduled_tasks() 迭代器应该生成可以在给定时间运行的任务。请注意,从理论上讲,生成时间表和启动所有任务可能需要超过 1 秒的时间;这里的想法是调度程序产生自上次检查以来应该开始的所有任务。

【讨论】:

  • asyncio 事件循环有一个time() 方法,可以用来代替time 模块。
  • @krs013:那是一个不同的时钟;它不一定给你真实世界的时间(它取决于事件循环的实现,并且可以测量 CPU 时间滴答或其他单调递增的时钟测量)。因为不能保证以秒为单位提供测量值,所以这里应该使用它。
  • 哦,很好,谢谢。我认为这对于间隔时间来说已经足够了,但看起来不能保证睡眠线程的准确性。我见过的实现似乎只是在纳秒内使用机器的正常运行时间,但是是的,你是对的。我想我现在有一些代码要修复...
  • loop.time 方法的 docstring 声明“这是一个浮点数,以自一个纪元以来的秒数表示,但未指定纪元、精度、准确度和漂移,并且每个事件循环可能会有所不同。”在这里,我将其解释为“自一个纪元以来的 SI 秒数”,因此 CPU 时间滴答或其他非“统一”时钟不适用于 loop.time()。由于 OP 只是要求每 x 毫秒进行一次定期回调,因此在我看来 loop.time() 就足够了。
  • @StefanoM:是的,它可能足够了,但是依赖于事件循环实现,并且文档字符串为实现提供了足够的余地。这对于重复任务可能已经足够了,但我的回答描述了一个 调度程序,它通常需要做类似 cron 的事情(例如,在特定的现实时间运行任务)。
【解决方案4】:

一个可能有用的变体:如果您希望在上次执行结束和下一次执行开始之间每隔 n 秒而不是 n 秒发生一次重复调用,并且您不希望调用在时间上重叠,下面的比较简单:

async def repeat(interval, func, *args, **kwargs):
    """Run func every interval seconds.

    If func has not finished before *interval*, will run again
    immediately when the previous iteration finished.

    *args and **kwargs are passed as the arguments to func.
    """
    while True:
        await asyncio.gather(
            func(*args, **kwargs),
            asyncio.sleep(interval),
        )

以及使用它在后台运行几个任务的示例:

async def f():
    await asyncio.sleep(1)
    print('Hello')


async def g():
    await asyncio.sleep(0.5)
    print('Goodbye')


async def main():
    t1 = asyncio.ensure_future(repeat(3, f))
    t2 = asyncio.ensure_future(repeat(2, g))
    await t1
    await t2

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

【讨论】:

  • 谢谢!当我的服务器负载很重时,我遇到了这个问题,并且经过多次重复,我们开始出现时钟偏差。这可以优雅地解决它。
  • 为什么在 main() 中使用 ensure_future?为什么不简单地使用await repeat(3, f)await repeat(2, g)
  • 如果你想让 f 或 g 返回一个值怎么办?
【解决方案5】:

python 3.7 带有装饰器的替代版本

import asyncio
import time


def periodic(period):
    def scheduler(fcn):

        async def wrapper(*args, **kwargs):

            while True:
                asyncio.create_task(fcn(*args, **kwargs))
                await asyncio.sleep(period)

        return wrapper

    return scheduler


@periodic(2)
async def do_something(*args, **kwargs):
    await asyncio.sleep(5)  # Do some heavy calculation
    print(time.time())


if __name__ == '__main__':
    asyncio.run(do_something('Maluzinha do papai!', secret=42))

【讨论】:

    【解决方案6】:

    基于@A. Jesse Jiryu Davis answer(使用@Torkel Bjørnson-Langen 和@ReWrite cmets),这是一项避免漂移的改进。

    import time
    import asyncio
    
    @asyncio.coroutine
    def periodic(period):
        def g_tick():
            t = time.time()
            count = 0
            while True:
                count += 1
                yield max(t + count * period - time.time(), 0)
        g = g_tick()
    
        while True:
            print('periodic', time.time())
            yield from asyncio.sleep(next(g))
    
    loop = asyncio.get_event_loop()
    task = loop.create_task(periodic(1))
    loop.call_later(5, task.cancel)
    
    try:
        loop.run_until_complete(task)
    except asyncio.CancelledError:
        pass
    

    【讨论】:

    • periodic 应该优先使用loop.time() 而不是time.time(),因为loop.time()asyncio.sleep() 内部使用的时间参考。 loop.time() 返回单调时间,而 time.time() 返回挂钟时间。两者会有所不同,例如当系统管理员修改系统上的日期,或 NTP 调整挂钟时间时。
    【解决方案7】:

    此解决方案使用来自Fernando José Esteves de Souza 的装饰概念、来自Wojciech Migda 的漂移解决方法和一个超类,以便生成尽可能优雅的代码来处理异步周期性函数。

    无threading.Thread

    解决方案由以下文件组成:

    • periodic_async_thread.py 带有基类供您子类化
    • a_periodic_thread.py 带有示例子类
    • run_me.py 带有示例实例化并运行

    文件periodic_async_thread.py中的PeriodicAsyncThread类:

    import time
    import asyncio
    import abc
    
    class PeriodicAsyncThread:
        def __init__(self, period):
            self.period = period
    
        def periodic(self):
            def scheduler(fcn):
                async def wrapper(*args, **kwargs):
                    def g_tick():
                        t = time.time()
                        count = 0
                        while True:
                            count += 1
                            yield max(t + count * self.period - time.time(), 0)
                    g = g_tick()
    
                    while True:
                        # print('periodic', time.time())
                        asyncio.create_task(fcn(*args, **kwargs))
                        await asyncio.sleep(next(g))
                return wrapper
            return scheduler
    
        @abc.abstractmethod
        async def run(self, *args, **kwargs):
            return
    
        def start(self):
            asyncio.run(self.run())
    

    文件a_periodic_thread.py中的简单子类APeriodicThread的示例:

    from periodic_async_thread import PeriodicAsyncThread
    import time
    import asyncio
    
    class APeriodicThread(PeriodicAsyncThread):
        def __init__(self, period):
            super().__init__(period)
            self.run = self.periodic()(self.run)
        
        async def run(self, *args, **kwargs):
            await asyncio.sleep(2)
            print(time.time())
    

    实例化并运行文件run_me.py中的示例类:

    from a_periodic_thread import APeriodicThread
    apt = APeriodicThread(2)
    apt.start()
    

    这段代码代表了一个优雅的解决方案,它还可以缓解其他解决方案的时间漂移​​问题。输出类似于:

    1642711285.3898764
    1642711287.390698
    1642711289.3924973
    1642711291.3920736
    

    带threading.Thread

    解决方案由以下文件组成:

    • async_thread.py 与 canopy 异步线程类。
    • periodic_async_thread.py 带有基类供您子类化
    • a_periodic_thread.py 带有示例子类
    • run_me.py 带有示例实例化并运行

    文件async_thread.py中的AsyncThread类:

    from threading import Thread
    import asyncio
    import abc
    
    class AsyncThread(Thread):
        def __init__(self, *args, **kwargs) -> None:
            super().__init__(*args, **kwargs)
    
        @abc.abstractmethod
        async def async_run(self, *args, **kwargs):
            pass
    
        def run(self, *args, **kwargs):
            # loop = asyncio.new_event_loop()
            # asyncio.set_event_loop(loop)
    
            # loop.run_until_complete(self.async_run(*args, **kwargs))
            # loop.close()
            asyncio.run(self.async_run(*args, **kwargs))
    
    

    文件periodic_async_thread.py中的PeriodicAsyncThread类:

    import time
    import asyncio
    from .async_thread import AsyncThread
    
    class PeriodicAsyncThread(AsyncThread):
        def __init__(self, period, *args, **kwargs):
            self.period = period
            super().__init__(*args, **kwargs)
            self.async_run = self.periodic()(self.async_run)
    
        def periodic(self):
            def scheduler(fcn):
                async def wrapper(*args, **kwargs):
                    def g_tick():
                        t = time.time()
                        count = 0
                        while True:
                            count += 1
                            yield max(t + count * self.period - time.time(), 0)
                    g = g_tick()
    
                    while True:
                        # print('periodic', time.time())
                        asyncio.create_task(fcn(*args, **kwargs))
                        await asyncio.sleep(next(g))
                return wrapper
            return scheduler
    

    文件a_periodic_thread.py中的简单子类APeriodicThread的示例:

    import time
    from threading import current_thread
    from .periodic_async_thread import PeriodicAsyncThread
    import asyncio
    
    class APeriodicAsyncTHread(PeriodicAsyncThread):
        async def async_run(self, *args, **kwargs):
            print(f"{current_thread().name} {time.time()} Hi!")
            await asyncio.sleep(1)
            print(f"{current_thread().name} {time.time()} Bye!")
    

    实例化并运行文件run_me.py中的示例类:

    from .a_periodic_thread import APeriodicAsyncTHread
    a = APeriodicAsyncTHread(2, name = "a periodic async thread")
    a.start()
    a.join()
    

    这段代码代表了一个优雅的解决方案,它还可以缓解其他解决方案的时间漂移​​问题。输出类似于:

    a periodic async thread 1643726990.505269 Hi!
    a periodic async thread 1643726991.5069854 Bye!
    a periodic async thread 1643726992.506919 Hi!
    a periodic async thread 1643726993.5089169 Bye!
    a periodic async thread 1643726994.5076022 Hi!
    a periodic async thread 1643726995.509422 Bye!
    a periodic async thread 1643726996.5075526 Hi!
    a periodic async thread 1643726997.5093904 Bye!
    a periodic async thread 1643726998.5072556 Hi!
    a periodic async thread 1643726999.5091035 Bye!
    

    【讨论】:

      【解决方案8】:

      这就是我使用 asyncio 测试我的周期性回调理论所做的。我没有使用 Tornado 的经验,所以我不确定定期回调是如何使用它的。不过,我习惯于在 Tkinter 中使用 after(ms, callback) 方法,这就是我想出的。 While True: 即使它是异步的(比全局变量更是如此),我也觉得它很难看。 call_later(s, callback, *args) 方法使用秒而不是毫秒。

      import asyncio
      my_var = 0
      def update_forever(the_loop):
          global my_var
          print(my_var)
          my_var += 1 
          # exit logic could be placed here
          the_loop.call_later(3, update_forever, the_loop)  # the method adds a delayed callback on completion
      
      event_loop = asyncio.get_event_loop()
      event_loop.call_soon(update_forever, event_loop)
      event_loop.run_forever()
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2016-12-23
        相关资源
        最近更新 更多