【发布时间】:2022-06-13 01:00:52
【问题描述】:
我收到以下错误
致命的 Python 错误:在规范化异常时无法从 MemoryErrors 中恢复。 当前线程 0x0000ffff88de5010(最近的调用优先): 文件“test.py”,wrap_get_fuzzy_match 中的第 173 行 文件“/usr/lib64/python3.7/asyncio/events.py”,_run 中的第 88 行 文件“/usr/lib64/python3.7/asyncio/base_events.py”,_run_once 中的第 1786 行 文件“/usr/lib64/python3.7/asyncio/base_events.py”,run_forever 中的第 541 行 文件“/usr/lib64/python3.7/asyncio/base_events.py”,run_until_complete 中的第 574 行 文件“test.py”,第 224 行 中止
async def get_valuation(url, params, api_header, session, semaphore):
async with semaphore:
async with session.get(url, headers=api_header) as response:
status_code = response.status
try:
if status_code != 200:
mmr = {params: 'not found' + ',' + ' ' + str(status_code)}
else:
asynch_response = await response.json()
mmr = await get_best_match(params, asynch_response, str(status_code))
return mmr
except Exception as ex:
LOGGER.error(f"Error in get valuation and error was {ex}")
return ex
async def wrap_get_fuzzy_match(func, *args, **kwargs):
try:
return await func(*args, **kwargs)
except Exception as err:
LOGGER.error(f"Error in wrap_get_fuzzy_match and error was {err}")
return err
async def main(headers, file):
tasks = []
sema = asyncio.Semaphore(500)
BATCH_SIZE = 1000000
async with ClientSession() as session:
with open(file) as f:
while True:
batch = [line.strip('\n') for line in islice(f, BATCH_SIZE)]
if not batch:
break
for param in batch:
task = asyncio.ensure_future(wrap_get_fuzzy_match(
get_valuation,
url= API + param,
params=param,
api_header=headers,
session=session,
semaphore=sema,
))
tasks.append(task)
responses = await asyncio.gather(*tasks)
return responses
【问题讨论】:
-
您的批处理机制没有做任何事情。您收集一百万行,创建一百万个任务,然后再次循环,直到您尽快为文件中的每一行创建一个任务。每个任务最终都会返回一个未知大小的 json 对象,所有这些数据都通过收集到一个巨大的列表中来收集。因此,您试图在内存中同时保存未知数量的数据结构(可能数百万),每个数据结构的大小都未知。内存不足并不奇怪。
-
您的 Python 实现在这里的内存使用效率低下,并且与 AWS 无关 - 您可以轻松提升您的实例,但我建议提高它的效率(我不确定如何,但其他人可以照亮它)。我将删除与 AWS 相关的标签,因为这根本与 AWS 无关。
-
@PaulCornelius:将
responses = await asyncio.gather(*tasks)缩进while循环并将其更改为responses.extend(await asyncio.gather(*tasks))(在循环外使用responses = []),并在@987654327之后添加tasks = []@-ed 他们都可能有帮助;您仍然会存储所有结果,但只能按预期一次安排/存储一百万个任务。 -
嗯,大概有某种指向程序,例如分析数据或将其写入文件或其他东西。收集一百万个任务,然后让它们在收集下一百万个任务之前收集垃圾似乎很麻烦。内存问题的正确解决方案必须依赖于了解程序的用途,而我们不知道。
标签: python-3.x async-await python-asyncio