【问题标题】:Processing millions of records using asyncio causes memory error使用 asyncio 处理数百万条记录会导致内存错误
【发布时间】:2022-06-13 01:00:52
【问题描述】:

我收到以下错误

致命的 Python 错误:在规范化异常时无法从 MemoryErrors 中恢复。 当前线程 0x0000ffff88de5010(最近的调用优先): 文件“test.py”,wrap_get_fuzzy_match 中的第 173 行 文件“/usr/lib64/python3.7/asyncio/events.py”,_run 中的第 88 行 文件“/usr/lib64/python3.7/asyncio/base_events.py”,_run_once 中的第 1786 行 文件“/usr/lib64/python3.7/asyncio/base_events.py”,run_forever 中的第 541 行 文件“/usr/lib64/python3.7/asyncio/base_events.py”,run_until_complete 中的第 574 行 文件“test.py”,第 224 行 中止

async def get_valuation(url, params, api_header, session, semaphore):
        async with semaphore:
            async with session.get(url, headers=api_header) as response:
                status_code = response.status
                try:
                    if status_code != 200:
                        mmr = {params: 'not found' + ',' + ' ' + str(status_code)}
                    else:
                        asynch_response = await response.json()
                        mmr = await get_best_match(params, asynch_response, str(status_code))
                    return mmr
                except Exception as ex:
                    LOGGER.error(f"Error in get valuation and error was {ex}")
                    return ex


async def wrap_get_fuzzy_match(func, *args, **kwargs):
       try:
           return await func(*args, **kwargs)
       except Exception as err:
           LOGGER.error(f"Error in wrap_get_fuzzy_match and error was {err}")
           return err

async def main(headers, file):
        tasks = []
        sema = asyncio.Semaphore(500)
        BATCH_SIZE = 1000000
        async with ClientSession() as session:
            with open(file) as f:
                while True:
                    batch = [line.strip('\n') for line in islice(f, BATCH_SIZE)]
                    if not batch:
                        break
                    for param in batch:
                        task = asyncio.ensure_future(wrap_get_fuzzy_match(
                            get_valuation,
                            url= API + param,
                            params=param,
                            api_header=headers,
                            session=session,
                            semaphore=sema,
                        ))
                        tasks.append(task)
            responses = await asyncio.gather(*tasks)
            return responses

【问题讨论】:

  • 您的批处理机制没有做任何事情。您收集一百万行,创建一百万个任务,然后再次循环,直到您尽快为文件中的每一行创建一个任务。每个任务最终都会返回一个未知大小的 json 对象,所有这些数据都通过收集到一个巨大的列表中来收集。因此,您试图在内存中同时保存未知数量的数据结构(可能数百万),每个数据结构的大小都未知。内存不足并不奇怪。
  • 您的 Python 实现在这里的内存使用效率低下,并且与 AWS 无关 - 您可以轻松提升您的实例,但我建议提高它的效率(我不确定如何,但其他人可以照亮它)。我将删除与 AWS 相关的标签,因为这根本与 AWS 无关。
  • @PaulCornelius:将responses = await asyncio.gather(*tasks)缩进while循环并将其更改为responses.extend(await asyncio.gather(*tasks))(在循环外使用responses = []),并在@987654327之后添加tasks = [] @-ed 他们都可能有帮助;您仍然会存储所有结果,但只能按预期一次安排/存储一百万个任务。
  • 嗯,大概有某种指向程序,例如分析数据或将其写入文件或其他东西。收集一百万个任务,然后让它们在收集下一百万个任务之前收集垃圾似乎很麻烦。内存问题的正确解决方案必须依赖于了解程序的用途,而我们不知道。

标签: python-3.x async-await python-asyncio


【解决方案1】:

我通过分块传递数据并在循环中调用主函数解决了这个问题。

async def get_valuation(url, params, api_header, session, semaphore):
    """
    Call fuzzy match api
    :param url:
    :param api_header:
    :param session:
    :param semaphore:
    :return:
    """
    async with semaphore:
        async with session.get(url, headers=api_header) as response:
            status_code = response.status

            try:
                if status_code != 200:
                    mmr = {params: 'not found' + ',' + ' ' + str(status_code)}
                else:
                    asynch_response = await response.json()
                    mmr = await get_best_match(params, asynch_response, str(status_code))
                return mmr
            except Exception as ex:
                LOGGER.error(f"Error in get valuation and error was {ex}")
                return ex


async def wrap_get_fuzzy_match(func, *args, **kwargs):
    try:
        return await func(*args, **kwargs)
    except Exception as err:
        LOGGER.error(f"Error in wrap_get_fuzzy_match and error was {err}")
        return err


async def main(params, headers):
    tasks = []
    sema = asyncio.Semaphore(100)
    async with ClientSession() as session:
        async with timeout(None):
            LOGGER.info(f"Number of urls to process: {len(tasks)}")
            for param in params:
                task = asyncio.ensure_future(wrap_get_fuzzy_match(
                    get_valuation,
                    url=API,
                    params=param,
                    api_header=headers,
                    session=session,
                    semaphore=sema,
                ))
                tasks.append(task)
            responses = await asyncio.gather(*tasks)
            return responses


if __name__ == '__main__':
    LOGGER.info("Start Processing")
    BATCH_SIZE = <size of each batch>
    loop = asyncio.get_event_loop()
    try:
        with open(INPUT) as file:
            inputs = file.readlines()
    except IOError:
        LOGGER.exception("Unable to read valuation input file")
        raise
    chunked_list = list(divide_chunks(big_list=inputs, chunk_size=BATCH_SIZE))
    LOGGER.info(
        f"Chunked size- {len(chunked_list)}"
    )
    batch_counter = 0
    for params in chunked_list:
        batch_counter += 1
        LOGGER.info(
            f"Starting batch number [{batch_counter}] out of [{len(chunked_list)}] "
        )
        results = loop.run_until_complete(
            asyncio.ensure_future(
                main(params= params,headers=hdr)
            )
        )
        
    LOGGER.info("Processing Completed!!")
    loop.close()

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2012-10-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-12-01
    • 1970-01-01
    • 2019-03-31
    相关资源
    最近更新 更多