【发布时间】:2021-12-29 03:44:10
【问题描述】:
我有多个协程,每个协程都在等待队列中的内容开始处理。
队列的内容由频道订阅者填充,他们的工作只是接收消息并将项目推送到适当的队列中。
在数据被一个队列处理器使用并生成新数据后,它将被分派到适当的消息通道,在该通道中重复此过程,直到数据准备好中继到提供它的 api。
import asyncio
from random import randint
from Models.ConsumerStrategies import Strategy
from Helpers.Log import Log
import Connectors.datastore as ds
import json
__name__ = "Consumer"
MIN = 1
MAX = 4
async def consume(configuration: dict, queue: str, processor: Strategy) -> None:
"""Consumes new items in queue and publish a message into the appropriate channel with the data generated for the next consumer,
if no new content is available sleep for a random number of seconds between MIN and MAX global variables
Args:
configuration (dict): configuration dictionary
queue (str): queue being consumed
processor (Strategy): consumer strategy
"""
logger = Log().get_logger(processor.__name__, configuration['logFolder'], configuration['logFormat'], configuration['USE'])
while True:
try:
ds_handle = await ds.get_datastore_handle(ds.get_uri(conf=configuration))
token = await ds_handle.lpop(queue)
if token is not None:
result = await processor.consume(json.loads(token), ds_handle)
status = await processor.relay(result, ds_handle)
logger.debug(status)
else:
wait_for = randint(MIN,MAX)
logger.debug(f'queue: {queue} empty waiting: {wait_for} before retry')
await asyncio.sleep(wait_for)
ds_handle.close()
except Exception as e:
logger.error(f"{e}")
logger.error(f"{e.with_traceback}")
我注意到的是,在运行 24 小时后,我收到了以下错误:
Task was destroyed but it is pending!
task: <Task pending name='Task-2' coro=<consume() running at Services/Consumer.py:26> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f86bc29cbe0>()]> cb=[_chain_future.<locals>._call_set_state() at asyncio/futures.py:391]>
Task was destroyed but it is pending!
task: <Task pending name='Task-426485' coro=<RedisConnection._read_data() done, defined at aioredis/connection.py:180> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f86bc29ccd0>()]> cb=[RedisConnection.__init__.<locals>.<lambda>() at aioredis/connection.py:168]>
我不确定如何解释、解决或恢复,我的假设是首先我应该切换到 redis 流而不是使用通道和队列。
但是,回到这个场景,我在不同的进程上有频道订阅者,而消费者在同一个进程中作为循环中的不同任务运行。
我假设这里发生的情况是,由于消费者基本上是在某个时候轮询队列,因此连接池管理器或 redis 本身最终会在消费者打开的连接时挂起并被取消。
因为我没有看到来自该队列处理器的任何进一步消息,但我也看到了我不确定它可能来自消息阅读器上的订阅者 ensure_future 的 wait_for_future
import asyncio
from multiprocessing import process
from Helpers.Log import Log
import Services.Metas as metas
import Models.SubscriberStrategies as processor
import Connectors.datastore as ds_linker
import Models.Exceptions as Exceptions
async def subscriber(conf: dict, channel: str, processor: processor.Strategy) -> None:
"""Subscription handler. Receives the channel name, datastore connection and a parsing strategy.
Creates a task that listens on the channel and process every message and processing strategy for the specific message
Args:
conf (dict): configuration dictionary
channel (str): channel to subscribe to
ds (aioredis.connection): connection handler to datastore
processor (processor.Strategy): processor message handler
"""
async def reader(ch):
while await ch.wait_message():
msg = await ch.get_json()
await processor.handle_message(msg=msg)
ds_uri = ds_linker.get_uri(conf=conf)
ds = await ds_linker.get_datastore_handle(ds_uri)
pub = await ds.subscribe(channel)
ch = pub[0]
tsk = asyncio.ensure_future(reader(ch))
await tsk
我可以借助一些帮助来解决这个问题,并正确理解幕后发生的事情。谢谢
【问题讨论】:
标签: python redis python-asyncio aioredis