【问题标题】:websocket run async function but returns error: asyncio.run() cannot be called from a running event loopwebsocket 运行异步函数但返回错误:无法从正在运行的事件循环中调用 asyncio.run()
【发布时间】:2020-02-21 14:17:35
【问题描述】:

我正在尝试使用 django-channels 2 来创建 websocket。我需要运行一个应该返回命令输出的异步方法,这样我就可以在我的网站上将数据传回给用户。我的问题是它不会让我运行它并出现错误:

asyncio.run() cannot be called from a running event loop

我做错了什么,我能做些什么?

consumers.py

class ChatConsumer(AsyncConsumer):

    async def websocket_connect(self, event):
        await self.send({
            "type": "websocket.accept"
        })

        user = self.scope['user']
        get_task_id = self.scope['url_route']['kwargs']['task_id']

        await asyncio.run(self.run("golemcli tasks show {}".format(get_task_id)))

        await self.send({
            "type": "websocket.send",
            "text": "hey"
        })
    async def websocket_receive(self, event):
        print("receive", event)

    async def websocket_disconnect(self, event):
        print("disconnected", event)



    async def run(self, cmd):
        proc = await asyncio.create_subprocess_shell(
        cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE)

        stdout, stderr = await proc.communicate()

        print(f'[{cmd!r} exited with {proc.returncode}]')
        if stdout:
            print(f'[stdout]\n{stdout.decode()}')
        if stderr:
            print(f'[stderr]\n{stderr.decode()}')

追溯:

Exception inside application: asyncio.run() cannot be called from a running event loop
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/channels/sessions.py", line 183, in __call__
    return await self.inner(receive, self.send)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/channels/middleware.py", line 41, in coroutine_call
    await inner_instance(receive, send)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/channels/consumer.py", line 62, in __call__
    await await_many_dispatch([receive], self.dispatch)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/channels/utils.py", line 52, in await_many_dispatch
    await dispatch(result)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/channels/consumer.py", line 73, in dispatch
    await handler(message)
  File "/Users/golemgrid/Documents/GitHub/GolemGrid/overview/consumers.py", line 19, in websocket_connect
    await asyncio.run(self.run("golemcli tasks show {}".format(get_task_id)))
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/runners.py", line 34, in run
    "asyncio.run() cannot be called from a running event loop")
  asyncio.run() cannot be called from a running event loop

更新 2 使用以下sn-p时:

await self.run("golemcli tasks show {}".format(get_task_id)

它返回以下回溯:

Exception inside application: Cannot add child handler, the child watcher does not have a loop attached
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/channels/sessions.py", line 183, in __call__
    return await self.inner(receive, self.send)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/channels/middleware.py", line 41, in coroutine_call
    await inner_instance(receive, send)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/channels/consumer.py", line 62, in __call__
    await await_many_dispatch([receive], self.dispatch)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/channels/utils.py", line 52, in await_many_dispatch
    await dispatch(result)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/channels/consumer.py", line 73, in dispatch
    await handler(message)
  File "/Users/golemgrid/Documents/GitHub/GolemGrid/overview/consumers.py", line 19, in websocket_connect
    await self.run("golemcli tasks show {}".format(get_task_id))
  File "/Users/golemgrid/Documents/GitHub/GolemGrid/overview/consumers.py", line 37, in run
    stderr=asyncio.subprocess.PIPE)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/subprocess.py", line 202, in create_subprocess_shell
    stderr=stderr, **kwds)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 1503, in subprocess_shell
    protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/unix_events.py", line 193, in _make_subprocess_transport
    self._child_watcher_callback, transp)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/unix_events.py", line 924, in add_child_handler
    "Cannot add child handler, "
  Cannot add child handler, the child watcher does not have a loop attached

【问题讨论】:

    标签: python django python-3.x django-channels


    【解决方案1】:

    你已经在一个异步运行循环中,所以你也可以根据你想要做什么。

    在消费者运行循环中处理 run 调用

    (由于bug,需要python 3.8)

    await self.run("golemcli tasks show {}".format(get_task_id)) 不需要嵌套的运行循环。

    这意味着如果在subprocess 执行其工作时有任何新消息发送给消费者,它们将被排队并且在subprocess 完成之前不会被执行。 (公平地说,这是最简单的解决方案)。

    这意味着您的hey 消息将在run 方法完成后发送。

    创建嵌套运行循环

    (由于bug,需要python 3.8)

    如果您希望您的消费者能够在您的 subprocess. (这要复杂得多,除非您的 websocket 连接需要此功能,否则您不应该这样做)。

        async def websocket_connect(self, event):
            ...
    
            # start the run
            self.run_task = asyncio.create_task(self.run(...))        
    
            ...
    
        async def websocket_receive(self, event):
            print("receive", event)
    
        async def websocket_disconnect(self, event):
            print("disconnected", event)
    
            if not self.run_task.done():
                # Clean up the task for the queue we created
                self.run_task.cancel()
                try:
                    # let us get any exceptions from the nested loop
                    await self.run_task
                except CancelledError:
                    # Ignore this error as we just triggered it
                    pass
            else:
                # throw any error from this nested loop
                self.run_task.result()
    
    

    在线程池中作为阻塞任务运行

    将运行更改为同步任务。

        # 2. Run in a custom thread pool:
        loop = asyncio.get_running_loop()
        with concurrent.futures.ThreadPoolExecutor() as pool:
            result = await loop.run_in_executor(
                pool, self.run, "your cmd")
            print('custom thread pool', result)
    

    作为单独的安全说明,您的代码很容易让某人在您的服务器上运行 bash 命令,因为您从 url 中的任务 ID 中获取原始字符串。我建议在这里添加一些验证,可能如果您在数据库中有一个任务条目,使用 url 值在数据库中查找任务,然后在构建命令时使用记录中的 id 值,或者如果这至少不可能确保 task_id 具有非常严格的正则表达式验证,以确保它不能包含任何您不期望的字符。

    【讨论】:

    • 嗨!我已经尝试了您的两个建议,它返回与 child... 相同的错误。关于安全性 - 我将检查用户的输入。当前代码是一个简化的演示,只是为了看看它是如何工作的,同时我试图弄清楚这个 async/websockets 的东西。我用回溯更新了我的帖子。
    • 解决此问题的更好方法是我可以使用 celery 在后台“卸载”任务,然后获取其输出吗?
    • 关于外壳保护,请查看您正在使用的子进程命令的 python 文档,其中提到了保护自己的方法。
    • 这是一个实际的错误,Python 本身是子输出的东西!我升级到 3.8.1 并解决了这个问题。我现在可以运行await asyncio.run(self.run....)!感谢马特乌斯的帮助!
    • 感谢您的建议。现在我将使用await self.run,看看我完成我的方法后结果如何。
    猜你喜欢
    • 2022-11-25
    • 1970-01-01
    • 2019-10-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-02-06
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多