【问题标题】:Can't figure out how to correctly schedule my asyncio functions无法弄清楚如何正确安排我的异步功能
【发布时间】:2020-12-01 00:57:19
【问题描述】:

我对 asyncio 还很陌生,我阅读了一些文档、示例和其他问题,但我在这里找不到任何关于我做错了什么的答案。我不太了解 asyncio 来诊断为什么会发生这种情况。我有一个带有 asyncio 函数的类,其中一个打开一个 websocket,然后创建它执行的三个 asyncio 任务。第一个执行完全正常。我对第二个和第三个任务的执行感到困惑。即使在第二个任务执行了 asyncio.sleep 之后,第三个任务似乎也没有执行 websock.recv(),因此第二个任务执行 websocket.recv() 并发送了我打算让第三个任务接收的数据。我在这里做错了什么?提前感谢您的帮助。

编辑:抱歉,除非您创建或拥有不和谐机器人并在代码中使用它的令牌,否则代码无法运行。

import websockets
import requests
import asyncio
import json

class GatewayConnection:
    def __init__(self):
        self.heartbeat = None
        self.s = None
        self.info = None
        self.connected = False

        self.uri = f"{self.get_gateway()}/?v=8&encoding=json"

    @staticmethod
    def get_gateway():
        endpoint = requests.get(OAUTH+'gateway')
        return endpoint.json()['url']

    async def get_gateway_info(self, websock):
        recv = json.loads(await websock.recv())
        self.heartbeat = recv['d']['heartbeat_interval']
        self.s = recv['s']

    async def finish_gateway_connect(self, websock):
        await websock.send(json.dumps({
            'op': 2,
            'd': {
                'token': TOKEN,
                'intents': 513,
                'properties': {
                    '$os': 'windows',
                    '$browser': 'Sheepp',
                    '$device': 'Sheepp'
                }
            }
        }))

        print("waiting to received ready info")
        self.info = json.loads(await websock.recv())
        print("Ready info received")
        print(self.info)

    async def communicate(self):
        async with websockets.connect(self.uri) as websock:
            self.connected = True

            get = asyncio.create_task(self.get_gateway_info(websock))
            heartbeat = asyncio.create_task(self.send_heartbeat(websock))
            finish = asyncio.create_task(self.finish_gateway_connect(websock))

            await get  # First task
            await heartbeat  # Second task
            await finish  # Third task
        
    async def send_heartbeat(self, websock):
        while self.connected:
            await websock.send(json.dumps({'op': 1, 'd': self.s}))
            print("Sent heartbeat")
            response = json.loads(await websock.recv())
            print("Received 'heartbeat'")
            if response['op'] != 11:
                self.connected = False  # Discord api says to terminate connection upon not receiving a valid heartbeat response
                print(f"The server did not send back a valid heartbeat response, but instead: {response}")
            await asyncio.sleep(self.heartbeat/1000)

gateway = GatewayConnection()
try:
    loop.run_until_complete(gateway.communicate())
except KeyboardInterrupt:
    print("Keyboard Interruption")
finally:
    loop.close()

【问题讨论】:

  • 您的意思是getheartbeatfinish 一个接一个地被处决吗?因为,按照您编写它们的 await 代码的方式,它们将并行执行,尽管出现了。
  • 现在我想我应该首先执行get,而不是作为一个任务。但我的意思是同时执行第二个和第三个任务。我的计划是不断执行心跳,同时执行其他事情,因为必须保持心跳。我还需要在finish之前执行heartbeat
  • 那么也许你可以使用类似await self.get_gateway_info(websock); await asyncio.gather(self.send_heartbeat(websock), self.finish_gateway_connect(websock))
  • 我收到运行时错误:cannot call recv while another coroutine is already waiting for the next message。打印语句的输出顺序如下:Sent heartbeatwaiting to receive ready inforeceived 'heartbeat'sleep。我在send_heartbeat 中的asyncio.sleep 之前添加了最后一个打印语句。错误也发生在finish_gateway_connect中的recv
  • @user4815162342 为什么你认为get、heartbeat和finish会并行运行?他们有一个等待,因此下一行在第一个等待完成之前不会运行。 await one(),await two() 将在一个完成后运行两个。

标签: python-asyncio


【解决方案1】:

为了解决我遇到的错误,我创建了一个单独的类来处理 websocket,以确保及时正确执行 recv 和 send 函数。我不确定我的课程做得如何,但这里是:

class WebsocketHandler:
    def __init__(self, websocket):
        self.ws = websocket
        self.sending = False

        self.sends = []
        self.recvs = []

    async def send(self, data):
        if self.sending:
            self.sends.append(json.dumps(data))
        else:
            self.sending = True
            await self.ws.send(json.dumps(data))
            while self.sends:
                data = self.sends[0]
                self.sends.pop(0)
                await self.ws.send(json.dumps(data))
            self.sending = False

    async def recv(self):
        event = asyncio.Event()
        self.recvs.append(event)
        while self.recvs[0] != event:
            await event.wait()
        try:
            data = await asyncio.wait_for(self.ws.recv(), timeout=1)
        except asyncio.exceptions.TimeoutError:
            self.recvs.pop(0)
            raise asyncio.exceptions.TimeoutError
        self.recvs.pop(0)
        if len(self.recvs) > 0:
            self.recvs[0].set()
        return json.loads(data)

我得到的错误是RuntimeError: cannot call recv while another coroutine is already waiting for the next message 编辑:随意批评我的代码。

编辑 2:我根据需要进行了一些更改以修复我遇到的一些错误以及为了方便起见。

【讨论】:

猜你喜欢
  • 1970-01-01
  • 2019-10-26
  • 1970-01-01
  • 1970-01-01
  • 2013-02-23
  • 2019-03-12
  • 2020-12-01
  • 2020-03-27
  • 1970-01-01
相关资源
最近更新 更多