【问题标题】:Long delay in using asyncio and websockets in Python 3在 Python 3 中使用 asyncio 和 websockets 的长时间延迟
【发布时间】:2019-02-28 05:54:12
【问题描述】:

在处理从 websocket 服务器推送到我的 client.py 的数据时,我遇到了长时间(3 小时)的延迟(编辑:延迟起初很短暂,然后在一天中变得更长)。我知道它没有被服务器延迟。

例如,我每 5 秒看到一次 keep_alive 日志事件及其各自的时间戳。这样就顺利进行了。 但是当我看到在日志中处理的数据帧实际上是在服务器发送它的 3 小时后我是否在做一些事情来延迟这个过程?

我是否正确调用我的协程“keep_alive”? keep_alive 只是给服务器的一条消息,以保持连接处于活动状态。服务器回显消息。我也记录太多了吗?这可能会延迟处理(我不这么认为,因为我看到日志事件立即发生)。

async def keep_alive(websocket):
                """
                 This only needs to happen every 30 minutes. I currently have it set to every 5 seconds.
                """
                await websocket.send('Hello')   
                await asyncio.sleep(5)

async def open_connection_test():
    """
    Establishes web socket (WSS). Receives data and then stores in csv.
    """
    async with websockets.connect( 
            'wss://{}:{}@localhost.urlname.com/ws'.format(user,pswd), ssl=True, ) as websocket:
        while True:    
            """
            Handle message from server.
            """
            message = await websocket.recv()
            if message.isdigit():
                # now = datetime.datetime.now()
                rotating_logger.info ('Keep alive message: {}'.format(str(message)))
            else:
                jasonified_message = json.loads(message)
                for key in jasonified_message:
                    rotating_logger.info ('{}: \n\t{}\n'.format(key,jasonified_message[key]))    
                """
                Store in a csv file.
                """
                try:            
                    convert_and_store(jasonified_message)
                except PermissionError:
                    convert_and_store(jasonified_message, divert = True)                        
            """
            Keep connection alive.
            """            
            await keep_alive(websocket)

"""
Logs any exceptions in logs file.
"""
try:
    asyncio.get_event_loop().run_until_complete(open_connection())
except Exception as e:
    rotating_logger.info (e)

编辑: 来自documentation - 我认为这可能与它有关 - 但我还没有把这些点联系起来。

max_queue 参数设置队列的最大长度 保存传入的消息。默认值为 32。0 禁用 限制。消息在收到时被添加到内存队列中; 然后 recv() 从该队列中弹出。为了防止内存过多 收到消息的速度比它们可能的速度快时的消耗 处理后,队列必须是有界的。如果队列已满,则 协议停止处理传入数据,直到调用 recv()。在 这种情况下,各种接收缓冲区(至少在 asyncio 和 操作系统)将被填满,然后 TCP 接收窗口将缩小,变慢 向下传输以避免丢包。

编辑 2018 年 9 月 28 日:我正在测试它而没有保持活动消息,这似乎不是问题。它可能与 convert_and_store() 函数有关吗?这是否需要异步定义然后等待?

def convert_and_store(data, divert = False, test = False):
    if test:
        data = b
    fields = data.keys()
    file_name =  parse_call_type(data, divert = divert)
    json_to_csv(data, file_name, fields)

EDIT 10/1/2018:keep-alive 消息和 convert_and_store 似乎都有问题;如果我将 keep-alive 消息延长到 60 秒 - 那么 convert_and_store 将每 60 秒运行一次。所以 convert_and_store 正在等待 keep_alive()...

【问题讨论】:

  • 您是否尝试过简单的sendrecv 消息,就像“Hello”和“Hello back”一样?延迟还会出现吗?
  • 好点。这实际上就是我对“保持活力”信息所做的事情。所以这没有延迟。但是处理实际的(非保持活动状态)消息,然后将它们附加到 csv 文件中,这一天变得越来越延迟。
  • 在这种情况下,您可以尝试使用multiprocessing。将处理csv的部分移到一个函数中,使用p = Process(target=my_function, args=argss)将任务从async-await中转移出来,看看能不能避免问题。
  • 这听起来像是一个可靠的备份谢谢。我将查看文档并对其进行测试。在没有任何多处理经验的情况下,我宁愿坚持使用 asyncio 库。

标签: python python-3.x websocket python-asyncio


【解决方案1】:

会不会和convert_and_store()函数有关?

是的,它可能是。不应直接调用阻塞代码。如果函数执行 CPU 密集型计算 1 秒,所有 asyncio 任务和 IO 操作都会延迟 1 秒。

执行器可用于在不同的线程/进程中运行阻塞代码:

import asyncio
import concurrent.futures
import time

def long_runned_job(x):
    time.sleep(2)
    print("Done ", x)

async def test():
    loop = asyncio.get_event_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        for i in range(5):
            loop.run_in_executor(pool, long_runned_job, i)
            print(i, " is runned")
            await asyncio.sleep(0.5)
loop = asyncio.get_event_loop()
loop.run_until_complete(test())

在您的情况下,它应该如下所示:

import concurrent.futures

async def open_connection_test():
    loop = asyncio.get_event_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        async with websockets.connect(...) as websocket:
            while True:    
                ...
                loop.run_in_executor(pool, convert_and_store, args)

已编辑

keep-alive 消息和 convert_and_store 似乎都有问题

你可以在后台运行keep_alive

async def keep_alive(ws):
    while ws.open:
        await ws.ping(...)   
        await asyncio.sleep(...)

async with websockets.connect(...) as websocket:
    asyncio.ensure_future(keep_alive(websocket))
    while True:    
        ...

【讨论】:

  • 这就像@Kr98 的例子似乎仍然在keep_alive() 后面排队。这种行为告诉你什么(来自我的编辑):2018 年 10 月 1 日:keep-alive 消息和 convert_and_store 似乎都有问题;如果我将 keep-alive 消息延长到 60 秒 - 那么 convert_and_store 将每 60 秒运行一次。所以 convert_and_store 正在等待 keep_alive()
  • @LiamHanninen 你可以在后台运行keep_alive。我在答案中添加了一个示例
  • 由于您坚持使用 asyncio 库并帮助我了解它,因此向您授予赏金 - 但我需要在将其标记为正确之前进行测试。谢谢!
  • 太棒了,这成功了。最后一个问题:“loop.run_in_executor(pool, convert_and_store, args)” 似乎与 asyncio.ensure_future() 类似 (asyncio.ensure_future(loop.run_in_executor(pool, convert_and_store, args))) 以及没有 asyncio.ensure_future( )。我应该使用 asyncio.ensure_future() 还是不使用?
  • run_in_executor 返回一个asyncio.Future 对象,所以这里不需要asyncio.ensure_future
【解决方案2】:

您确实必须为此keep_alive() 函数创建一个新线程。

对于async-await,承诺在进行下一步之前已完成所有任务。

因此,await keep_alive(websocket) 实际上在这个意义上阻塞了线程。您可能不会在此处等待keep_alive 以便该过程可以继续,但可以肯定的是,这不是您想要的,我确定。

实际上,你想要的是两个时间范围,一个用于与服务器通信,一个用于保持服务器活动。它们应该分开,因为它们在不同的协程中。

所以,正确的方法是使用Thread,在这种情况下不需要使用asyncio,保持简单。

首先,将keep_alive()更改为关注。

def keep_alive():
    """
        This only needs to happen every 30 minutes. I currently have it set to every 5 seconds.
    """
    while True:
        websocket.send('Hello') 
        time.sleep(1)

open_connection_test()

async def open_connection_test():
    """
    Establishes web socket (WSS). Receives data and then stores in csv.
    """
    thread = threading.Thread(target=keep_alive, args=())
    thread.daemon = True   # Daemonize
    thread.start()
    async with websockets.connect(...) as websocket:
        ....
        #No need this line anymore.
        #await keep_alive(websocket) 

【讨论】:

  • 谢谢! keep_alive 需要 websocket 作为参数。因此,它必须与 websockets.connect(...) 作为 websocket 进行异步:使用 threading.Thread() 会有问题吗?
  • @LiamHanninen,没问题,感谢 GIL,它不会造成任何问题,而且您选择的 asyncio ans 确实在做同样的事情。但我确实建议使用Thread 而不是asyncio,因为执行后台任务不是asyncio 的目的。
  • 谢谢 - 如果我的长期测试没有成功,我会回到这个。我目前在 30 分钟后收到 1006 错误。 GIL 是什么或是谁?
  • @LiamHanninen,GIL 代表 Global Interpreter Lock,这意味着一次只有一个线程可以控制 Python 解释器。所以使用 Thread 可以保证线程安全,在这种情况下,您不需要像在 multiprocessing 中那样腌制 websockect 。而asyncio 不能保证线程安全(尽管asyncio 库中有线程安全方法)。
  • 仅供参考 - 我使用相同的代码添加了一个新问题 - 关于日志记录:stackoverflow.com/questions/52708756/…
【解决方案3】:

我觉得这样会更清楚,使用ThreadPoolExecutor让阻塞代码在后台运行

from concurrent.futures import ThreadPoolExecutor
pool = ThreadPoolExecutor(max_workers=4)

def convert_and_store(data, divert=False, test=False):
    loop = asyncio.get_event_loop()
    loop.run_in_executor(pool, _convert_and_store, divert, test)


def _convert_and_store(data, divert=False, test=False):
    if test:
        data = b
    fields = data.keys()
    file_name = parse_call_type(data, divert=divert)
    json_to_csv(data, file_name, fields)

异步发送保持活动消息演示

async def kepp_alive(websocket):
    while True:
        await websocket.send_str(ping)
        await asyncio.sleep(10)

【讨论】:

  • 谢谢。我试过这个,但它仍然挂在保持活动的消息上(请参阅我将在一分钟内添加的编辑)。但是,就像我原来的脚本一样,这仍然要等到 keep-alive 消息执行后再运行。
  • 我建议你使用 asyncio send keep alive msg ,我会编辑我的 anwser
猜你喜欢
  • 2020-03-09
  • 2019-02-20
  • 2015-07-16
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-05-09
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多