【发布时间】:2019-02-28 05:54:12
【问题描述】:
在处理从 websocket 服务器推送到我的 client.py 的数据时,我遇到了长时间(3 小时)的延迟(编辑:延迟起初很短暂,然后在一天中变得更长)。我知道它没有被服务器延迟。
例如,我每 5 秒看到一次 keep_alive 日志事件及其各自的时间戳。这样就顺利进行了。 但是当我看到在日志中处理的数据帧实际上是在服务器发送它的 3 小时后。我是否在做一些事情来延迟这个过程?
我是否正确调用我的协程“keep_alive”? keep_alive 只是给服务器的一条消息,以保持连接处于活动状态。服务器回显消息。我也记录太多了吗?这可能会延迟处理(我不这么认为,因为我看到日志事件立即发生)。
async def keep_alive(websocket):
"""
This only needs to happen every 30 minutes. I currently have it set to every 5 seconds.
"""
await websocket.send('Hello')
await asyncio.sleep(5)
async def open_connection_test():
"""
Establishes web socket (WSS). Receives data and then stores in csv.
"""
async with websockets.connect(
'wss://{}:{}@localhost.urlname.com/ws'.format(user,pswd), ssl=True, ) as websocket:
while True:
"""
Handle message from server.
"""
message = await websocket.recv()
if message.isdigit():
# now = datetime.datetime.now()
rotating_logger.info ('Keep alive message: {}'.format(str(message)))
else:
jasonified_message = json.loads(message)
for key in jasonified_message:
rotating_logger.info ('{}: \n\t{}\n'.format(key,jasonified_message[key]))
"""
Store in a csv file.
"""
try:
convert_and_store(jasonified_message)
except PermissionError:
convert_and_store(jasonified_message, divert = True)
"""
Keep connection alive.
"""
await keep_alive(websocket)
"""
Logs any exceptions in logs file.
"""
try:
asyncio.get_event_loop().run_until_complete(open_connection())
except Exception as e:
rotating_logger.info (e)
编辑: 来自documentation - 我认为这可能与它有关 - 但我还没有把这些点联系起来。
max_queue 参数设置队列的最大长度 保存传入的消息。默认值为 32。0 禁用 限制。消息在收到时被添加到内存队列中; 然后 recv() 从该队列中弹出。为了防止内存过多 收到消息的速度比它们可能的速度快时的消耗 处理后,队列必须是有界的。如果队列已满,则 协议停止处理传入数据,直到调用 recv()。在 这种情况下,各种接收缓冲区(至少在 asyncio 和 操作系统)将被填满,然后 TCP 接收窗口将缩小,变慢 向下传输以避免丢包。
编辑 2018 年 9 月 28 日:我正在测试它而没有保持活动消息,这似乎不是问题。它可能与 convert_and_store() 函数有关吗?这是否需要异步定义然后等待?
def convert_and_store(data, divert = False, test = False):
if test:
data = b
fields = data.keys()
file_name = parse_call_type(data, divert = divert)
json_to_csv(data, file_name, fields)
EDIT 10/1/2018:keep-alive 消息和 convert_and_store 似乎都有问题;如果我将 keep-alive 消息延长到 60 秒 - 那么 convert_and_store 将每 60 秒运行一次。所以 convert_and_store 正在等待 keep_alive()...
【问题讨论】:
-
您是否尝试过简单的
send和recv消息,就像“Hello”和“Hello back”一样?延迟还会出现吗? -
好点。这实际上就是我对“保持活力”信息所做的事情。所以这没有延迟。但是处理实际的(非保持活动状态)消息,然后将它们附加到 csv 文件中,这一天变得越来越延迟。
-
在这种情况下,您可以尝试使用
multiprocessing。将处理csv的部分移到一个函数中,使用p = Process(target=my_function, args=argss)将任务从async-await中转移出来,看看能不能避免问题。 -
这听起来像是一个可靠的备份谢谢。我将查看文档并对其进行测试。在没有任何多处理经验的情况下,我宁愿坚持使用 asyncio 库。
标签: python python-3.x websocket python-asyncio