【发布时间】:2021-03-27 03:49:42
【问题描述】:
我正在通过使用一些自定义属性从 asyncio.Future 继承来创建作业类,并期望作业实例的功能与原始 Future 一样。
当我在协程中调用job.set_result 时,它会引发Future object is not initialized error,然后我尝试通过调用asyncio.ensure_future 来初始化future,并出现同样的错误。
我尝试了更多,发现未来通常是由loop.create_future() 创建的,但是没有选项可以创建我的自定义未来。
以下是一个示例,如何初始化我的自定义未来?
import asyncio
from dataclasses import dataclass
@dataclass
class Job(asyncio.Future):
job_task: Callable
real_future: asyncio.Future = None
something: str = None
def schedule(self):
async def run():
res = await self.job_task()
self.set_result(res) # raise error, future not initialized
return res
self.real_future = asyncio.ensure_future(run())
async def main():
async def task():
await asyncio.sleep(1)
return 1
job = Job(task)
job.schedule()
await job
asyncio.run(main())
【问题讨论】:
-
@user4815162342 哇,它成功了。谢谢,你能回答我吗?
标签: python python-3.x python-dataclasses