【问题标题】:Caching results in an async environment在异步环境中缓存结果
【发布时间】:2021-03-01 18:08:06
【问题描述】:

我正在使用 FastAPI 端点进行 I/O 绑定操作,该操作是异步的以提高效率。但是,这需要时间,所以我想将结果缓存起来,以便在一段时间内重复使用。

我目前有这个:

from fastapi import FastAPI
import asyncio

app = FastAPI()

async def _get_expensive_resource(key) -> None:
    await asyncio.sleep(2)
    return True

@app.get('/')
async def get(key):
    return await _get_expensive_resource(key)

if __name__ == "__main__":
    import uvicorn
    uvicorn.run("test:app")

我正在尝试使用cachetools 包来缓存结果,并且我尝试了类似以下的方法:

import asyncio
from cachetools import TTLCache
from fastapi import FastAPI
  
app = FastAPI()

async def _get_expensive_resource(key) -> None:
    await asyncio.sleep(2)
    return True

class ResourceCache(TTLCache):
    def __missing__(self, key):
        loop = asyncio.get_event_loop()
        resource = loop.run_until_complete(_get_expensive_resource(key))
        self[key] = resource
        return resource

resource_cache = ResourceCache(124, 300)

@app.get('/')
async def get(key: str):
    return resource_cache[key]

if __name__ == "__main__":
    import uvicorn
    uvicorn.run("test2:app")

但是,这失败了,因为据我了解,__missing__ 方法是同步的,您不能从异步调用异步同步。错误是:

RuntimeError: this event loop is already running.

如果我使用纯 asyncio 而不是 uvloop,也会发生类似的错误。

对于 asyncio 事件循环,我尝试过使用 nest_asyncio 包,但它没有修补 uvloop,而且即使将其与 asyncio 一起使用,第一次使用后服务似乎也会冻结。

你知道我该怎么做吗?

【问题讨论】:

  • 这看起来很有趣,但它不是用于缓存_get_expensive_resource 的结果的通用缓存方法,它只缓存端点,但是如果我在整个应用程序的各个端点中使用此资源怎么办? (事实上​​,我愿意)。所以我认为我不能使用它。

标签: python caching python-asyncio fastapi nest-asyncio


【解决方案1】:

自动回复遇到此问题的其他人(包括我自己在 15 天内):

TTLCache 像普通的 python 字典一样工作,访问丢失的键将调用 __missing__ 方法。因此,如果存在,我们希望使用字典中的值,如果不存在,我们可以在此方法中收集资源。该方法还应该在缓存中设置键(以便下次它会出现)并返回值以供这次使用。

class ResourceCache(TTLCache):
    def __missing__(self, key) -> asyncio.Task:
        # Create a task 
        resource_future = asyncio.create_task(_get_expensive_resource(key))
        self[key] = resource_future
        return resource_future

所以,我们有一个将键映射到asyncio.Tasks 的缓存(本质上是一个字典)。任务将在事件循环中异步执行(已由 FastAPI 启动!)。当我们需要结果时,我们可以在端点代码中或实际上在任何地方为它们await,只要它和异步函数!

@app.get("/")
async def get(key:str) -> bool:
    return await resource_cache[key]

第二次调用此端点(在缓存超时内)将使用缓存的资源(在我们的示例中模拟为“true”)。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-27
    • 1970-01-01
    • 1970-01-01
    • 2020-12-12
    • 2019-01-25
    • 2011-10-05
    相关资源
    最近更新 更多