【问题标题】:asyncio python coroutine cancelled while task still pending reading from redis channelasyncio python协程取消,而任务仍在等待从redis通道读取
【发布时间】:2021-12-29 03:44:10
【问题描述】:

我有多个协程,每个协程都在等待队列中的内容开始处理。

队列的内容由频道订阅者填充,他们的工作只是接收消息并将项目推送到适当的队列中。

在数据被一个队列处理器使用并生成新数据后,它将被分派到适当的消息通道,在该通道中重复此过程,直到数据准备好中继到提供它的 api。

import asyncio
from random import randint
from Models.ConsumerStrategies import Strategy
from Helpers.Log import Log
import Connectors.datastore as ds

import json
__name__ = "Consumer"

MIN = 1
MAX = 4

async def consume(configuration: dict, queue: str, processor: Strategy) -> None:
    """Consumes new items in queue and publish a message into the appropriate channel with the data generated for the next consumer,
    if no new content is available sleep for a random number of seconds between MIN and MAX global variables

    Args:
        configuration (dict): configuration dictionary
        queue (str): queue being consumed
        processor (Strategy): consumer strategy
    """
    
    logger = Log().get_logger(processor.__name__, configuration['logFolder'], configuration['logFormat'], configuration['USE'])
    while True:
        try:
            ds_handle = await ds.get_datastore_handle(ds.get_uri(conf=configuration))
            token = await ds_handle.lpop(queue)
            if token is not None:
                result = await processor.consume(json.loads(token), ds_handle)
                status = await processor.relay(result, ds_handle)
                logger.debug(status)
            else:
                wait_for = randint(MIN,MAX)
                logger.debug(f'queue: {queue} empty waiting: {wait_for} before retry')
                await asyncio.sleep(wait_for)
            ds_handle.close()
        except Exception as e:
            logger.error(f"{e}")
            logger.error(f"{e.with_traceback}")

我注意到的是,在运行 24 小时后,我收到了以下错误:

Task was destroyed but it is pending!
task: <Task pending name='Task-2' coro=<consume() running at Services/Consumer.py:26> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f86bc29cbe0>()]> cb=[_chain_future.<locals>._call_set_state() at asyncio/futures.py:391]>
Task was destroyed but it is pending!
task: <Task pending name='Task-426485' coro=<RedisConnection._read_data() done, defined at aioredis/connection.py:180> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f86bc29ccd0>()]> cb=[RedisConnection.__init__.<locals>.<lambda>() at aioredis/connection.py:168]>

我不确定如何解释、解决或恢复,我的假设是首先我应该切换到 redis 流而不是使用通道和队列。

但是,回到这个场景,我在不同的进程上有频道订阅者,而消费者在同一个进程中作为循环中的不同任务运行。

我假设这里发生的情况是,由于消费者基本上是在某个时候轮询队列,因此连接池管理器或 redis 本身最终会在消费者打开的连接时挂起并被取消。

因为我没有看到来自该队列处理器的任何进一步消息,但我也看到了我不确定它可能来自消息阅读器上的订阅者 ensure_future 的 wait_for_future

import asyncio
from multiprocessing import process
from Helpers.Log import Log
import Services.Metas as metas
import Models.SubscriberStrategies as processor
import Connectors.datastore as ds_linker
import Models.Exceptions as Exceptions

async def subscriber(conf: dict, channel: str, processor: processor.Strategy) -> None:
    """Subscription handler. Receives the channel name, datastore connection and a parsing strategy.
    Creates a task that listens on the channel and process every message and processing strategy for the specific message

    Args:
        conf (dict): configuration dictionary
        channel (str): channel to subscribe to
        ds (aioredis.connection): connection handler to datastore
        processor (processor.Strategy): processor message handler
    """
    async def reader(ch):
        while await ch.wait_message():
            msg = await ch.get_json()
            await processor.handle_message(msg=msg)

    ds_uri = ds_linker.get_uri(conf=conf)
    ds = await ds_linker.get_datastore_handle(ds_uri)
    pub = await ds.subscribe(channel)
    ch = pub[0]
    tsk = asyncio.ensure_future(reader(ch))
    await tsk

我可以借助一些帮助来解决这个问题,并正确理解幕后发生的事情。谢谢

【问题讨论】:

    标签: python redis python-asyncio aioredis


    【解决方案1】:

    为了重现问题,我花了几天时间解决,我在 aioredis github repo 的问题中找到了同样问题的人。

    所以我必须通过 redis 打开/关闭所有连接才能确保添加:

            ds_handle.close()
            await ds_handle.wait_closed()
    

    我还着手改进消费者的异常管理:

    while True:
        try:
            ds_handle = await ds.get_datastore_handle(ds.get_uri(conf=configuration))
            token = await ds_handle.lpop(queue)
            if token is not None:
                result = await processor.consume(json.loads(token), ds_handle)
                status = await processor.relay(result, ds_handle)
                logger.debug(status)
            else:
                wait_for = randint(MIN,MAX)
                logger.debug(f'queue: {queue} empty waiting: {wait_for} before retry')
                await asyncio.sleep(wait_for)   
        except Exception as e:
            logger.error(f"{e}")
            logger.error(f"{traceback.print_exc()}")
        finally:
            ds_handle.close()
            await ds_handle.wait_closed()
    

    生产者也是如此:

    try:
        async def reader(ch):
            while await ch.wait_message():
                msg = await ch.get_json()
                await processor.handle_message(msg=msg)
    
        ds_uri = ds_linker.get_uri(conf=conf)
        ds = await ds_linker.get_datastore_handle(ds_uri)
        pub = await ds.subscribe(channel)
        ch = pub[0]
        tsk = asyncio.ensure_future(reader(ch))
        await tsk
    except Exception as e:
        logger.debug(f'{e}')
        logger.error(f'{traceback.format_exc()}')
    finally:
        ds.close()
        await ds.wait_closed()
    

    因此,永远不会有与 redis 保持打开状态的连接,随着时间的推移,这些连接可能最终会杀死处理器的协程之一。

    对我来说,它解决了这个问题,因为在我写这篇文章的时候,已经超过 2 周的正常运行时间,没有再报告过同类事故。

    无论如何,还有一个新的 aioredis 主要版本,这确实是最近的新闻(这是在 1.3.1 和 2.0.0 上应该使用与 redis-py 相同的模型,所以这一次情况也发生了变化) .

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-07-03
      • 2020-10-10
      • 1970-01-01
      • 1970-01-01
      • 2018-12-09
      • 1970-01-01
      • 2018-07-26
      • 2022-11-19
      相关资源
      最近更新 更多