【问题标题】:Please explain "Task was destroyed but it is pending!" after cancelling tasks请解释“任务已被破坏但未决!”取消任务后
【发布时间】:2017-04-15 07:28:11
【问题描述】:

我正在使用 Python 3.4.2 学习 asyncio,并使用它在 IPC 总线上持续监听,而 gbulb 在 DBus 上监听。

我创建了一个函数listen_to_ipc_channel_layer,它持续监听IPC通道上的传入消息并将消息传递给message_handler

我也在听 SIGTERM 和 SIGINT。当我向运行您在底部找到的代码的 python 进程发送 SIGTERM 时,脚本应该正常终止。

我遇到的问题是以下警告:

got signal 15: exit
Task was destroyed but it is pending!
task: <Task pending coro=<listen_to_ipc_channel_layer() running at /opt/mainloop-test.py:23> wait_for=<Future cancelled>>

Process finished with exit code 0

…使用以下代码:

import asyncio
import gbulb
import signal
import asgi_ipc as asgi

def main():
    asyncio.async(listen_to_ipc_channel_layer())
    loop = asyncio.get_event_loop()

    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.add_signal_handler(sig, ask_exit)

    # Start listening on the Linux IPC bus for incoming messages
    loop.run_forever()
    loop.close()

@asyncio.coroutine
def listen_to_ipc_channel_layer():
    """Listens to the Linux IPC bus for messages"""
    while True:
        message_handler(message=channel_layer.receive(["my_channel"]))
        try:
            yield from asyncio.sleep(0.1)
        except asyncio.CancelledError:
            break

def ask_exit():
    loop = asyncio.get_event_loop()
    for task in asyncio.Task.all_tasks():
        task.cancel()
    loop.stop()


if __name__ == "__main__":
    gbulb.install()
    # Connect to the IPC bus
    channel_layer = asgi.IPCChannelLayer(prefix="my_channel")
    main()

我仍然对 asyncio 了解得很少,但我想我知道发生了什么。在等待 yield from asyncio.sleep(0.1) 时,信号处理程序捕获了 SIGTERM 并在该过程中调用 task.cancel()

这不应该在while True: 循环中触发CancelledError 吗? (因为它不是,但这就是我的理解"Calling cancel() will throw a CancelledError to the wrapped coroutine")。

最终调用loop.stop() 停止循环,而无需等待yield from asyncio.sleep(0.1) 返回结果甚至整个协程listen_to_ipc_channel_layer

如果我错了,请纠正我。

我认为我唯一需要做的就是让我的程序等待yield from asyncio.sleep(0.1) 返回结果和/或协程以中断while循环并完成。

我相信我混淆了很多事情。请帮我把这些事情弄清楚,这样我就可以弄清楚如何在没有警告的情况下优雅地关闭事件循环。

【问题讨论】:

    标签: python python-3.x python-3.4 python-asyncio


    【解决方案1】:

    问题来自于取消任务后立即关闭循环。作为cancel() docs state

    “这会安排在下一个循环通过事件循环将 CancelledError 抛出到包装的协程中。”

    获取这段代码:

    import asyncio
    import signal
    
    
    async def pending_doom():
        await asyncio.sleep(2)
        print(">> Cancelling tasks now")
        for task in asyncio.Task.all_tasks():
            task.cancel()
    
        print(">> Done cancelling tasks")
        asyncio.get_event_loop().stop()
    
    
    def ask_exit():
        for task in asyncio.Task.all_tasks():
            task.cancel()
    
    
    async def looping_coro():
        print("Executing coroutine")
        while True:
            try:
                await asyncio.sleep(0.25)
            except asyncio.CancelledError:
                print("Got CancelledError")
                break
    
            print("Done waiting")
    
        print("Done executing coroutine")
        asyncio.get_event_loop().stop()
    
    
    def main():
        asyncio.async(pending_doom())
        asyncio.async(looping_coro())
    
        loop = asyncio.get_event_loop()
        for sig in (signal.SIGINT, signal.SIGTERM):
            loop.add_signal_handler(sig, ask_exit)
    
        loop.run_forever()
    
        # I had to manually remove the handlers to
        # avoid an exception on BaseEventLoop.__del__
        for sig in (signal.SIGINT, signal.SIGTERM):
            loop.remove_signal_handler(sig)
    
    
    if __name__ == '__main__':
        main()
    

    注意ask_exit 取消任务但没有stop 循环,在下一个循环looping_coro() 停止它。如果你取消它,输出是:

    Executing coroutine
    Done waiting
    Done waiting
    Done waiting
    Done waiting
    ^CGot CancelledError
    Done executing coroutine
    

    注意pending_doom 如何在之后立即取消和停止循环。如果你让它运行直到 pending_doom 协程从睡眠中醒来,你会看到同样的警告:

    Executing coroutine
    Done waiting
    Done waiting
    Done waiting
    Done waiting
    Done waiting
    Done waiting
    Done waiting
    >> Cancelling tasks now
    >> Done cancelling tasks
    Task was destroyed but it is pending!
    task: <Task pending coro=<looping_coro() running at canceling_coroutines.py:24> wait_for=<Future cancelled>>
    

    【讨论】:

    • 关于信号处理程序相关的异常:您也可以通过在末尾调用loop.close()来避免此异常。
    • @maarten:不,我不能:我真希望我能...我所有的任务也都取消了...
    【解决方案2】:

    问题的意思是循环没有时间完成所有任务。

    这会安排在事件循环的下一个循环中将 CancelledError 抛出到包装的协程中。

    在您的方法中没有机会执行循环的“下一个循环”。为了使其正确,您应该将停止操作移至单独的非循环协程,以使您的循环有机会完成。

    第二个重要的事情是CancelledError 提高。

    与 Future.cancel() 不同,这并不能保证任务将被取消:异常可能会被捕获并采取行动,延迟取消任务或完全阻止取消。该任务也可能返回一个值或引发不同的异常。

    调用此方法后,cancelled() 将不会立即返回 True(除非任务已被取消)。当包装的协程以 CancelledError 异常终止时,任务将被标记为已取消(即使未调用 cancel())。

    所以在清理之后你的协程必须引发CancelledError 被标记为已取消。

    使用额外的协程停止循环不是问题,因为它不是循环的并且在执行后立即完成。

    def main():                                              
        loop = asyncio.get_event_loop()                      
        asyncio.ensure_future(listen_to_ipc_channel_layer()) 
                                                         
        for sig in (signal.SIGINT, signal.SIGTERM):          
            loop.add_signal_handler(sig, ask_exit)           
        loop.run_forever()                                   
        print("Close")                                       
        loop.close()                                         
                                                         
                                                         
    @asyncio.coroutine                                       
    def listen_to_ipc_channel_layer():                       
        while True:                                          
            try:                                             
                print("Running")                                 
                yield from asyncio.sleep(0.1)                
            except asyncio.CancelledError as e:              
                print("Break it out")                        
                raise e # Raise a proper error
                                                         
                                              
    # Stop the loop concurrently           
    @asyncio.coroutine                                       
    def exit():                                              
        loop = asyncio.get_event_loop()                      
        print("Stop")                                        
        loop.stop()                                          
    
    
    def ask_exit():                          
        for task in asyncio.Task.all_tasks():
            task.cancel()                    
        asyncio.ensure_future(exit())        
                                         
                                         
    if __name__ == "__main__":               
        main()                               
    

    【讨论】:

      【解决方案3】:

      我收到了这条消息,我相信它是由待处理任务的垃圾收集引起的。 Python 开发人员正在讨论在 asyncio 中创建的任务是否应该创建强引用,并决定不应该(在研究了这个问题 2 天后,我强烈反对!...请参阅此处的讨论 https://bugs.python.org/issue21163

      我为自己创建了这个实用程序,以便对任务进行强引用并自动清理它(仍然需要彻底测试它)...

      import asyncio
      
      #create a strong reference to tasks since asyncio doesn't do this for you
      task_references = set()
      
      def register_ensure_future(coro):
          task = asyncio.ensure_future(coro)
          task_references.add(task)
      
          # Setup cleanup of strong reference on task completion...
          def _on_completion(f):
              task_references.remove(f)
          task.add_done_callback(_on_completion)
          
          return task
      

      在我看来,任务只要是活跃的,就应该有很强的参考性!但是 asyncio 不会为你做这些,所以一旦 gc 发生和长时间的调试,你可能会遇到一些不好的惊喜。

      【讨论】:

        【解决方案4】:

        发生这种情况的原因正如@Yeray Diaz Diaz 所解释的那样 就我而言,我想取消所有在第一次完成后没有完成的任务,所以我最终取消了额外的工作,然后使用loop._run_once() 运行循环并让它们停止:

            loop = asyncio.get_event_loop()
            job = asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
            tasks_finished,tasks_pending, = loop.run_until_complete(job)
            tasks_done = [t for t in tasks_finished if t.exception() is None]
            if tasks_done == 0:
                raise Exception("Failed for all tasks.")
            assert len(tasks_done) == 1
            data = tasks_done[0].result()
            for t in tasks_pending:
                t.cancel()
                t.cancel()
            while not all([t.done() for t in tasks_pending]):
                loop._run_once()
        

        【讨论】:

          猜你喜欢
          • 2020-12-14
          • 2018-05-30
          • 2016-12-07
          • 1970-01-01
          • 1970-01-01
          • 2015-09-05
          • 1970-01-01
          • 2016-02-29
          相关资源
          最近更新 更多