【发布时间】:2023-03-20 20:36:01
【问题描述】:
我正在使用 asyncio forever() 事件循环。现在我想在进程或信号或文件更改之后重新启动循环(停止循环并重新创建一个新循环),但我有一些问题:
这里是三个简化的代码 sn-ps,其中演示了一些协程工作者和一个协程循环重启器:
第一次尝试:
import asyncio
async def coro_worker(proc):
print(f'Worker: {proc} started.')
while True:
print(f'Worker: {proc} process.')
await asyncio.sleep(proc)
async def reset_loop(loop):
# Some process
for i in range(5): # Like a process.
print(f'{i} counting for reset the eventloop.')
await asyncio.sleep(1)
main(loop) # Expected close the current loop and start a new loop!
def main(previous_loop=None):
offset = 0
if previous_loop is not None: # Trying for close the last loop if exist.
offset = 1 # An offset to change the process name.
for task in asyncio.Task.all_tasks():
print('Cancel the tasks') # Why it increase up?
task.cancel()
# task.clear()
# task.close()
# task.stop()
print("Done cancelling tasks")
asyncio.get_event_loop().stop()
process = [1 + offset, 2 + offset]
loop = asyncio.get_event_loop()
futures = [loop.create_task(coro_worker(proc)) for proc in process]
futures.append(loop.create_task(reset_loop(loop)))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
except asyncio.CancelledError:
print('Tasks has been canceled')
main() # Recursively
finally:
print("Closing Loop")
loop.close()
main()
输出[1]:
Worker: 1 started.
Worker: 1 process.
Worker: 2 started.
Worker: 2 process.
0 counting for reset the eventloop.
Worker: 1 process.
1 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
2 counting for reset the eventloop.
Worker: 1 process.
3 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
4 counting for reset the eventloop.
Worker: 1 process.
Cancel the tasks
Cancel the tasks
Cancel the tasks
Done cancelling tasks
Closing Loop
Closing Loop
Task exception was never retrieved
future: <Task cancelling coro=<reset_loop() done, defined at reset_asycio.py:11> exception=RuntimeError('Cannot close a running event loop',)>
Traceback (most recent call last):
File "reset_asycio.py", line 40, in main
loop.run_forever()
File "/usr/lib/python3.6/asyncio/base_events.py", line 425, in run_forever
raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "reset_asycio.py", line 17, in reset_loop
main(loop) # Expected close the current loop and start a new loop!
File "reset_asycio.py", line 48, in main
loop.close()
File "/usr/lib/python3.6/asyncio/unix_events.py", line 63, in close
super().close()
File "/usr/lib/python3.6/asyncio/selector_events.py", line 96, in close
raise RuntimeError("Cannot close a running event loop")
RuntimeError: Cannot close a running event loop
Task was destroyed but it is pending!
task: <Task pending coro=<reset_loop() running at reset_asycio.py:11>>
reset_asycio.py:51: RuntimeWarning: coroutine 'reset_loop' was never awaited
main()
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:4>>
reset_asycio.py:51: RuntimeWarning: coroutine 'coro_worker' was never awaited
main()
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:4>>
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:8> wait_for=<Future cancelled>>
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:8> wait_for=<Future cancelled>>
#第二次尝试:
.
.
.
def main(previous_loop=None):
offset = 0
if previous_loop is not None: # Trying for close the last loop if exist.
previous_loop.stop()
previous_loop.close()
offset = 1 # An offset to change the process name.
process = [1 + offset, 2 + offset]
loop = asyncio.get_event_loop()
futures = [loop.create_task(coro_worker(proc)) for proc in process]
futures.append(loop.create_task(reset_loop(loop)))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
except asyncio.CancelledError:
print('Tasks has been canceled')
main() # Recursively
finally:
print("Closing Loop")
loop.close()
main()
输出[2]:
Worker: 1 started.
Worker: 1 process.
Worker: 2 started.
Worker: 2 process.
0 counting for reset the eventloop.
Worker: 1 process.
1 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
2 counting for reset the eventloop.
Worker: 1 process.
3 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
4 counting for reset the eventloop.
Worker: 1 process.
Closing Loop
Task exception was never retrieved
future: <Task finished coro=<reset_loop() done, defined at reset_asycio.py:9> exception=RuntimeError('Cannot close a running event loop',)>
Traceback (most recent call last):
File "reset_asycio.py", line 15, in reset_loop
main(loop) # Expected close the current loop and start new loop!
File "reset_asycio.py", line 21, in main
previous_loop.close()
File "/usr/lib/python3.6/asyncio/unix_events.py", line 63, in close
super().close()
File "/usr/lib/python3.6/asyncio/selector_events.py", line 96, in close
raise RuntimeError("Cannot close a running event loop")
RuntimeError: Cannot close a running event loop
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() done, defined at reset_asycio.py:3> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7efed846f138>()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() done, defined at reset_asycio.py:3> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7efed846f048>()]>>
第三次尝试:
.
.
.
def main(previous_loop=None):
offset = 0
if previous_loop is not None: # Trying for close the last loop if exist.
offset = 1 # An offset to change the process name.
for task in asyncio.Task.all_tasks():
print('Cancel the tasks') # Why it increase up?
task.cancel()
process = [1 + offset, 2 + offset]
loop = asyncio.get_event_loop()
futures = [loop.create_task(coro_worker(proc)) for proc in process]
futures.append(loop.create_task(reset_loop(loop)))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
except asyncio.CancelledError:
print('Tasks has been canceled')
main() # Recursively
finally:
print("Closing Loop")
loop.close()
main()
输出[3]:
Worker: 1 started.
Worker: 1 process.
Worker: 2 started.
Worker: 2 process.
0 counting for reset the eventloop.
Worker: 1 process.
1 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
2 counting for reset the eventloop.
Worker: 1 process.
3 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
4 counting for reset the eventloop.
Worker: 1 process.
Cancel the tasks
Cancel the tasks
Cancel the tasks
Closing Loop
Worker: 2 started.
Worker: 2 process.
Worker: 3 started.
Worker: 3 process.
0 counting for reset the eventloop.
1 counting for reset the eventloop.
Worker: 2 process.
2 counting for reset the eventloop.
Worker: 3 process.
3 counting for reset the eventloop.
Worker: 2 process.
4 counting for reset the eventloop.
Cancel the tasks
Cancel the tasks
Cancel the tasks
Cancel the tasks
Cancel the tasks
Cancel the tasks
Closing Loop
Worker: 2 started.
Worker: 2 process.
Worker: 3 started.
Worker: 3 process.
.
.
.
#问题:
-
在#3rd try中,显然我已经做到了,但是
print('Cancel the tasks')每次重启后都会增加,这是什么原因?! -
有没有更好的方法来解决这个问题?
请原谅我试图简化它的冗长问题!
[注意]:
- 我不是在寻找
asyncio.timeout() - 我还尝试使用另一个线程来重新启动事件循环,但结果不成功。
- 我正在使用 Python 3.6
【问题讨论】:
-
为什么要重置事件循环?这是一个不同寻常的想法。
-
你能解释一下你想用这个实现什么 - 即你要解决什么问题?
-
@user4815162342 我正在尝试使用存储在一个文件中的几个配置来实现一个 SNMP 收集器,我解析这些配置,每个配置都被提供给协程 SNMP 收集器(比如有问题的
coro_worker()) 在初始化循环 (loop.create_task()) 中。我的 SNMP 收集器 (coro_worker()) 有一个无限循环。问题是当配置文件更改时,我无法停止此事件循环 (run_forever()) 并使用新配置重新创建协程 SNMP 收集器。 -
这可能是 XY 问题。当您专注于如何重置事件循环时,您应该专注于如何正确结束您的任务。
-
@KlausD。是否需要确保任务结束才能停止或关闭事件循环?
标签: python python-3.x asynchronous python-asyncio event-loop