【问题标题】:How to use asyncio with a very long list of tasks (generator)如何将 asyncio 与很长的任务列表(生成器)一起使用
【发布时间】:2020-04-20 23:47:15
【问题描述】:

我有一个小程序,它加载一个相当大的 CSV(超过 800MB,分块,使用pandas.read_csv 来限制内存使用)并“在野外”对服务器执行一些 API 调用,最后构建一个结果然后将对象存储在数据库中。

我已尽可能为网络请求添加缓存,但即便如此,代码仍需要 10 多个小时才能完成。当我使用 PySpy 分析代码时,大部分代码都在等待网络请求。

我尝试将其转换为使用 asyncio 来加快速度,并设法让代码在输入文件的一小部分上工作。然而,对于完整的文件,内存使用变得令人望而却步。

这是我尝试过的:

import pandas as pd
import httpx

async def process_item(item, client):
    # send a few requests with httpx session
    # process results
    await save_results_to_db(res)

async def get_items_from_csv():
    # loads the heavy CSV file
    for chunk in pd.read_csv(filename, ...):
        for row in chunk.itertuples():
            item = item_from_row(row)
            yield item

async def main():
    async with httpx.AsyncClient() as client:
        tasks = []
        for item in get_items_from_csv():
            tasks.append(process_item(item, client))
        await asyncio.gather(*tasks)

asyncio.run(main())

有没有办法避免创建tasks 列表,它会变成一个包含超过 150 万个项目的非常重的对象?另一个缺点是在读取整个文件之前似乎没有处理任何任务,这并不理想。 我使用的是 python 3.7,但如果需要可以轻松升级到 3.8。

【问题讨论】:

  • 不确定,但“并行”执行所有任务真的有意义吗?你为什么不一次只创建 100 个任务,等待/收集并启动下一组任务
  • 我也不确定。直观地说,该过程从磁盘、数据库和网络获取混合数据,然后写回数据库。如果我分批运行,我认为这意味着在下一批开始之前等待一批完成。我怀疑在批次的开始和结束时,只会使用一个“慢”资源,这意味着整个过程不会尽可能快。但同样,这只是一个猜测,我需要弄清楚如何分析代码以确认这种预感。
  • 按照我的建议分批执行不会尽可能快,但我会试一试。我希望它会给你一个相当大的速度。你可能会玩大量的任务。如果此策略有效,那么您可以尝试对其进行更改,以便在至少完成一项任务后添加一个新任务。但正如您所注意到的那样并行运行所有任务,可能不是您想要的

标签: python asynchronous python-asyncio


【解决方案1】:

我认为您在这里寻找的不是批量运行,而是运行 N 工作人员,它们同时从队列中拉出任务。

N = 10  # scale based on the processing power and memory you have

async def main():
    async with httpx.AsyncClient() as client:
        tasks = asyncio.Queue()
        for item in get_items_from_csv():
            tasks.put_nowait(process_item(item, client))

        async def worker():
            while not tasks.empty():
                await tasks.get_nowait()
            # for a server
            # while task := await tasks.get():
            #     await task

        await asyncio.gather(*[worker() for _ in range(N)])

我使用了asyncio.Queue,但您也可以只使用collections.deque,因为所有任务都在启动工作程序之前添加到队列中。前者在运行长时间运行的进程(例如服务器)中运行的工作人员时特别有用,其中项目可能会异步排队。

【讨论】:

  • 运行多个“worker”协程有什么意义,因为它们都将由同一个事件循环运行?我们不应该将工作提交给ThreadPoolExecutor 以同时处理任务吗?
  • @acl worker协程的重点是设置任务处理的异步“幅度”;在异步的上下文中,你的函数应该被实现,这样它们就不会阻塞事件循环“太久”(这取决于你在做什么),如果你需要运行阻塞函数,使用asyncio.to_thread 但是这个由于 GIL,不会给您带来并行性的优势,您可以使用 ProcessPoolExecutor 避开它
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-10-27
  • 2015-01-07
  • 2021-10-11
  • 2016-09-24
相关资源
最近更新 更多