此解决方案使用来自Fernando José Esteves de Souza 的装饰概念、来自Wojciech Migda 的漂移解决方法和一个超类,以便生成尽可能优雅的代码来处理异步周期性函数。
无threading.Thread
解决方案由以下文件组成:
-
periodic_async_thread.py 带有基类供您子类化
-
a_periodic_thread.py 带有示例子类
-
run_me.py 带有示例实例化并运行
文件periodic_async_thread.py中的PeriodicAsyncThread类:
import time
import asyncio
import abc
class PeriodicAsyncThread:
def __init__(self, period):
self.period = period
def periodic(self):
def scheduler(fcn):
async def wrapper(*args, **kwargs):
def g_tick():
t = time.time()
count = 0
while True:
count += 1
yield max(t + count * self.period - time.time(), 0)
g = g_tick()
while True:
# print('periodic', time.time())
asyncio.create_task(fcn(*args, **kwargs))
await asyncio.sleep(next(g))
return wrapper
return scheduler
@abc.abstractmethod
async def run(self, *args, **kwargs):
return
def start(self):
asyncio.run(self.run())
文件a_periodic_thread.py中的简单子类APeriodicThread的示例:
from periodic_async_thread import PeriodicAsyncThread
import time
import asyncio
class APeriodicThread(PeriodicAsyncThread):
def __init__(self, period):
super().__init__(period)
self.run = self.periodic()(self.run)
async def run(self, *args, **kwargs):
await asyncio.sleep(2)
print(time.time())
实例化并运行文件run_me.py中的示例类:
from a_periodic_thread import APeriodicThread
apt = APeriodicThread(2)
apt.start()
这段代码代表了一个优雅的解决方案,它还可以缓解其他解决方案的时间漂移问题。输出类似于:
1642711285.3898764
1642711287.390698
1642711289.3924973
1642711291.3920736
带threading.Thread
解决方案由以下文件组成:
-
async_thread.py 与 canopy 异步线程类。
-
periodic_async_thread.py 带有基类供您子类化
-
a_periodic_thread.py 带有示例子类
-
run_me.py 带有示例实例化并运行
文件async_thread.py中的AsyncThread类:
from threading import Thread
import asyncio
import abc
class AsyncThread(Thread):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
@abc.abstractmethod
async def async_run(self, *args, **kwargs):
pass
def run(self, *args, **kwargs):
# loop = asyncio.new_event_loop()
# asyncio.set_event_loop(loop)
# loop.run_until_complete(self.async_run(*args, **kwargs))
# loop.close()
asyncio.run(self.async_run(*args, **kwargs))
文件periodic_async_thread.py中的PeriodicAsyncThread类:
import time
import asyncio
from .async_thread import AsyncThread
class PeriodicAsyncThread(AsyncThread):
def __init__(self, period, *args, **kwargs):
self.period = period
super().__init__(*args, **kwargs)
self.async_run = self.periodic()(self.async_run)
def periodic(self):
def scheduler(fcn):
async def wrapper(*args, **kwargs):
def g_tick():
t = time.time()
count = 0
while True:
count += 1
yield max(t + count * self.period - time.time(), 0)
g = g_tick()
while True:
# print('periodic', time.time())
asyncio.create_task(fcn(*args, **kwargs))
await asyncio.sleep(next(g))
return wrapper
return scheduler
文件a_periodic_thread.py中的简单子类APeriodicThread的示例:
import time
from threading import current_thread
from .periodic_async_thread import PeriodicAsyncThread
import asyncio
class APeriodicAsyncTHread(PeriodicAsyncThread):
async def async_run(self, *args, **kwargs):
print(f"{current_thread().name} {time.time()} Hi!")
await asyncio.sleep(1)
print(f"{current_thread().name} {time.time()} Bye!")
实例化并运行文件run_me.py中的示例类:
from .a_periodic_thread import APeriodicAsyncTHread
a = APeriodicAsyncTHread(2, name = "a periodic async thread")
a.start()
a.join()
这段代码代表了一个优雅的解决方案,它还可以缓解其他解决方案的时间漂移问题。输出类似于:
a periodic async thread 1643726990.505269 Hi!
a periodic async thread 1643726991.5069854 Bye!
a periodic async thread 1643726992.506919 Hi!
a periodic async thread 1643726993.5089169 Bye!
a periodic async thread 1643726994.5076022 Hi!
a periodic async thread 1643726995.509422 Bye!
a periodic async thread 1643726996.5075526 Hi!
a periodic async thread 1643726997.5093904 Bye!
a periodic async thread 1643726998.5072556 Hi!
a periodic async thread 1643726999.5091035 Bye!